mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 17:40:38 +00:00
Compare commits
16 Commits
weisd/scan
...
fix/lifecy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8f3a7d11e | ||
|
|
82fcb7227a | ||
|
|
f383ea1175 | ||
|
|
4caad5096a | ||
|
|
e68f791b1a | ||
|
|
0a4438e896 | ||
|
|
e15c619ed5 | ||
|
|
ecf7a2e344 | ||
|
|
e393add5ee | ||
|
|
64c0430b84 | ||
|
|
081616b81b | ||
|
|
d9125937b0 | ||
|
|
2534087551 | ||
|
|
5f03a1cbe0 | ||
|
|
f9af9fe5da | ||
|
|
59b8aa14d7 |
34
.vscode/launch.json
vendored
34
.vscode/launch.json
vendored
@@ -99,7 +99,17 @@
|
||||
"name": "Debug executable target/debug/rustfs",
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"program": "${workspaceFolder}/target/debug/rustfs",
|
||||
"cargo": {
|
||||
"args": [
|
||||
"run",
|
||||
"--bin",
|
||||
"rustfs",
|
||||
"-j",
|
||||
"1",
|
||||
"--profile",
|
||||
"dev"
|
||||
]
|
||||
},
|
||||
"args": [],
|
||||
"cwd": "${workspaceFolder}",
|
||||
//"stopAtEntry": false,
|
||||
@@ -107,7 +117,7 @@
|
||||
"env": {
|
||||
"RUSTFS_ACCESS_KEY": "rustfsadmin",
|
||||
"RUSTFS_SECRET_KEY": "rustfsadmin",
|
||||
"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
|
||||
//"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
|
||||
"RUSTFS_ADDRESS": ":9000",
|
||||
"RUSTFS_CONSOLE_ENABLE": "true",
|
||||
// "RUSTFS_OBS_TRACE_ENDPOINT": "http://127.0.0.1:4318/v1/traces", // jeager otlp http endpoint
|
||||
@@ -116,11 +126,31 @@
|
||||
// "RUSTFS_COMPRESS_ENABLE": "true",
|
||||
"RUSTFS_CONSOLE_ADDRESS": "127.0.0.1:9001",
|
||||
"RUSTFS_OBS_LOG_DIRECTORY": "./target/logs",
|
||||
"RUST_LOG":"rustfs=debug,ecstore=debug,s3s=debug,iam=debug",
|
||||
},
|
||||
"sourceLanguages": [
|
||||
"rust"
|
||||
],
|
||||
},
|
||||
{
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"name": "Debug test_lifecycle_transition_basic",
|
||||
"cargo": {
|
||||
"args": [
|
||||
"test",
|
||||
"-p",
|
||||
"rustfs-scanner",
|
||||
"--test",
|
||||
"lifecycle_integration_test",
|
||||
"serial_tests::test_lifecycle_transition_basic",
|
||||
"-j",
|
||||
"1"
|
||||
]
|
||||
},
|
||||
"args": [],
|
||||
"cwd": "${workspaceFolder}"
|
||||
},
|
||||
{
|
||||
"name": "Debug executable target/debug/test",
|
||||
"type": "lldb",
|
||||
|
||||
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -7251,12 +7251,15 @@ dependencies = [
|
||||
"faster-hex",
|
||||
"flatbuffers",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"glob",
|
||||
"google-cloud-auth",
|
||||
"google-cloud-storage",
|
||||
"hex-simd",
|
||||
"hmac 0.13.0-rc.3",
|
||||
"http 1.4.0",
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
"hyper 1.8.1",
|
||||
"hyper-rustls 0.27.7",
|
||||
"hyper-util",
|
||||
@@ -7614,10 +7617,12 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"futures",
|
||||
"heed",
|
||||
"http 1.4.0",
|
||||
"path-clean",
|
||||
"rand 0.10.0-rc.5",
|
||||
"rmp-serde",
|
||||
"rustfs-ahm",
|
||||
"rustfs-common",
|
||||
"rustfs-config",
|
||||
"rustfs-ecstore",
|
||||
|
||||
@@ -495,6 +495,26 @@ mod serial_tests {
|
||||
object_name,
|
||||
} = &elm.1;
|
||||
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
|
||||
//eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await;
|
||||
eval_inner(
|
||||
&lifecycle::ObjectOpts {
|
||||
name: oi.name.clone(),
|
||||
user_tags: oi.user_tags.clone(),
|
||||
version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(),
|
||||
mod_time: oi.mod_time,
|
||||
size: oi.size as usize,
|
||||
is_latest: oi.is_latest,
|
||||
num_versions: oi.num_versions,
|
||||
delete_marker: oi.delete_marker,
|
||||
successor_mod_time: oi.successor_mod_time,
|
||||
restore_ongoing: oi.restore_ongoing,
|
||||
restore_expires: oi.restore_expires,
|
||||
transition_status: oi.transitioned_object.status.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
OffsetDateTime::now_utc(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
println!("row:{row:?}");
|
||||
}
|
||||
@@ -506,3 +526,261 @@ mod serial_tests {
|
||||
println!("Lifecycle cache test completed");
|
||||
}
|
||||
}
|
||||
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
|
||||
let mut events = Vec::<Event>::new();
|
||||
info!(
|
||||
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
|
||||
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
|
||||
);
|
||||
if obj.mod_time.expect("err").unix_timestamp() == 0 {
|
||||
info!("eval_inner: mod_time is 0, returning default event");
|
||||
return Event::default();
|
||||
}
|
||||
|
||||
if let Some(restore_expires) = obj.restore_expires {
|
||||
if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() {
|
||||
let mut action = IlmAction::DeleteRestoredAction;
|
||||
if !obj.is_latest {
|
||||
action = IlmAction::DeleteRestoredVersionAction;
|
||||
}
|
||||
|
||||
events.push(Event {
|
||||
action,
|
||||
due: Some(now),
|
||||
rule_id: "".into(),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref lc_rules) = self.filter_rules(obj).await {
|
||||
for rule in lc_rules.iter() {
|
||||
if obj.expired_object_deletemarker() {
|
||||
if let Some(expiration) = rule.expiration.as_ref() {
|
||||
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(now),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(days) = expiration.days {
|
||||
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(expected_expiry),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if obj.is_latest {
|
||||
if let Some(ref expiration) = rule.expiration {
|
||||
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
|
||||
if obj.delete_marker && expired_object_delete_marker {
|
||||
let due = expiration.next_due(obj);
|
||||
if let Some(due) = due {
|
||||
if now.unix_timestamp() >= due.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DelMarkerDeleteAllVersionsAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(due),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
|
||||
if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions {
|
||||
if newer_noncurrent_versions > 0 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
|
||||
if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days {
|
||||
if noncurrent_days != 0 {
|
||||
if let Some(successor_mod_time) = obj.successor_mod_time {
|
||||
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(expected_expiry),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
|
||||
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
|
||||
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
|
||||
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
|
||||
if let Some(due0) = due {
|
||||
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::TransitionVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due,
|
||||
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
|
||||
.storage_class
|
||||
.clone()
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
|
||||
obj.is_latest,
|
||||
obj.delete_marker,
|
||||
obj.version_id,
|
||||
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
|
||||
);
|
||||
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
|
||||
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
|
||||
info!("eval_inner: entering expiration check");
|
||||
if let Some(ref expiration) = rule.expiration {
|
||||
if let Some(ref date) = expiration.date {
|
||||
let date0 = OffsetDateTime::from(date.clone());
|
||||
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
|
||||
info!("eval_inner: expiration by date - date0={:?}", date0);
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(date0),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
} else if let Some(days) = expiration.days {
|
||||
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
|
||||
info!(
|
||||
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
|
||||
days,
|
||||
obj.mod_time.expect("err!"),
|
||||
expected_expiry,
|
||||
now,
|
||||
now.unix_timestamp() > expected_expiry.unix_timestamp()
|
||||
);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
info!("eval_inner: object should expire, adding DeleteAction");
|
||||
let mut event = Event {
|
||||
action: IlmAction::DeleteAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(expected_expiry),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
};
|
||||
/*if rule.expiration.expect("err!").delete_all.val {
|
||||
event.action = IlmAction::DeleteAllVersionsAction
|
||||
}*/
|
||||
events.push(event);
|
||||
}
|
||||
} else {
|
||||
info!("eval_inner: expiration.days is None");
|
||||
}
|
||||
} else {
|
||||
info!("eval_inner: rule.expiration is None");
|
||||
}
|
||||
|
||||
if obj.transition_status != TRANSITION_COMPLETE {
|
||||
if let Some(ref transitions) = rule.transitions {
|
||||
let due = transitions[0].next_due(obj);
|
||||
if let Some(due0) = due {
|
||||
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::TransitionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due,
|
||||
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if events.len() > 0 {
|
||||
events.sort_by(|a, b| {
|
||||
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
|
||||
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|
||||
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
|
||||
{
|
||||
match a.action {
|
||||
IlmAction::DeleteAllVersionsAction
|
||||
| IlmAction::DelMarkerDeleteAllVersionsAction
|
||||
| IlmAction::DeleteAction
|
||||
| IlmAction::DeleteVersionAction => {
|
||||
return Ordering::Less;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
match b.action {
|
||||
IlmAction::DeleteAllVersionsAction
|
||||
| IlmAction::DelMarkerDeleteAllVersionsAction
|
||||
| IlmAction::DeleteAction
|
||||
| IlmAction::DeleteVersionAction => {
|
||||
return Ordering::Greater;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
return Ordering::Less;
|
||||
}
|
||||
|
||||
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
|
||||
return Ordering::Less;
|
||||
}
|
||||
return Ordering::Greater;
|
||||
});
|
||||
return events[0].clone();
|
||||
}
|
||||
|
||||
Event::default()
|
||||
}
|
||||
|
||||
@@ -263,6 +263,16 @@ async fn create_test_tier(server: u32) {
|
||||
region: "".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
} else if server == 2 {
|
||||
Some(TierMinIO {
|
||||
access_key: "minioadmin".to_string(),
|
||||
secret_key: "minioadmin".to_string(),
|
||||
bucket: "mblock2".to_string(),
|
||||
endpoint: "http://m1ddns.pvtool.com:9020".to_string(),
|
||||
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
|
||||
region: "".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
Some(TierMinIO {
|
||||
access_key: "minioadmin".to_string(),
|
||||
@@ -602,7 +612,7 @@ mod serial_tests {
|
||||
async fn test_lifecycle_transition_basic() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
create_test_tier(1).await;
|
||||
create_test_tier(2).await;
|
||||
|
||||
// Create test bucket and object
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
@@ -610,8 +620,15 @@ mod serial_tests {
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
//create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
create_test_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(
|
||||
&ecstore,
|
||||
bucket_name.as_str(),
|
||||
object_name,
|
||||
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
|
||||
)
|
||||
.await;
|
||||
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
|
||||
@@ -45,6 +45,7 @@ glob = { workspace = true }
|
||||
thiserror.workspace = true
|
||||
flatbuffers.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
tracing.workspace = true
|
||||
serde.workspace = true
|
||||
time.workspace = true
|
||||
@@ -53,6 +54,8 @@ serde_json.workspace = true
|
||||
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
|
||||
s3s.workspace = true
|
||||
http.workspace = true
|
||||
http-body = { workspace = true }
|
||||
http-body-util.workspace = true
|
||||
url.workspace = true
|
||||
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
|
||||
reed-solomon-simd = { workspace = true }
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
|
||||
use s3s::dto::{
|
||||
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
|
||||
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition,
|
||||
ObjectLockConfiguration, ObjectLockEnabled, Prefix, RestoreRequest, Transition,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
@@ -173,44 +173,51 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rule_prefix = rule.prefix.as_ref().expect("err!");
|
||||
let rule_prefix = &rule.prefix.clone().unwrap_or_default();
|
||||
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let rule_noncurrent_version_expiration = rule.noncurrent_version_expiration.as_ref().expect("err!");
|
||||
if rule_noncurrent_version_expiration.noncurrent_days.expect("err!") > 0 {
|
||||
if let Some(rule_noncurrent_version_expiration) = &rule.noncurrent_version_expiration {
|
||||
if let Some(noncurrent_days) = rule_noncurrent_version_expiration.noncurrent_days {
|
||||
if noncurrent_days > 0 {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if let Some(newer_noncurrent_versions) = rule_noncurrent_version_expiration.newer_noncurrent_versions {
|
||||
if newer_noncurrent_versions > 0 {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if rule.noncurrent_version_transitions.is_some() {
|
||||
return true;
|
||||
}
|
||||
if rule_noncurrent_version_expiration.newer_noncurrent_versions.expect("err!") > 0 {
|
||||
return true;
|
||||
if let Some(rule_expiration) = &rule.expiration {
|
||||
if let Some(date1) = rule_expiration.date.clone() {
|
||||
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if rule_expiration.date.is_some() {
|
||||
return true;
|
||||
}
|
||||
if let Some(expired_object_delete_marker) = rule_expiration.expired_object_delete_marker
|
||||
&& expired_object_delete_marker
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if !rule.noncurrent_version_transitions.is_none() {
|
||||
return true;
|
||||
if let Some(rule_transitions) = &rule.transitions {
|
||||
let rule_transitions_0 = rule_transitions[0].clone();
|
||||
if let Some(date1) = rule_transitions_0.date {
|
||||
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
let rule_expiration = rule.expiration.as_ref().expect("err!");
|
||||
if !rule_expiration.date.is_none()
|
||||
&& OffsetDateTime::from(rule_expiration.date.clone().expect("err!")).unix_timestamp()
|
||||
< OffsetDateTime::now_utc().unix_timestamp()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if !rule_expiration.date.is_none() {
|
||||
return true;
|
||||
}
|
||||
if rule_expiration.expired_object_delete_marker.expect("err!") {
|
||||
return true;
|
||||
}
|
||||
let rule_transitions: &[Transition] = &rule.transitions.as_ref().expect("err!");
|
||||
let rule_transitions_0 = rule_transitions[0].clone();
|
||||
if !rule_transitions_0.date.is_none()
|
||||
&& OffsetDateTime::from(rule_transitions_0.date.expect("err!")).unix_timestamp()
|
||||
< OffsetDateTime::now_utc().unix_timestamp()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if !rule.transitions.is_none() {
|
||||
if rule.transitions.is_some() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,8 +18,10 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::client::{
|
||||
@@ -61,9 +63,19 @@ impl TransitionClient {
|
||||
|
||||
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
|
||||
//defer closeResponse(resp)
|
||||
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
//if resp != nil {
|
||||
if resp.status() != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
|
||||
if resp_status != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
bucket_name,
|
||||
"",
|
||||
)));
|
||||
}
|
||||
//}
|
||||
Ok(())
|
||||
@@ -97,8 +109,17 @@ impl TransitionClient {
|
||||
.await?;
|
||||
//defer closeResponse(resp)
|
||||
|
||||
if resp.status() != StatusCode::NO_CONTENT {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
if resp_status != StatusCode::NO_CONTENT {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
bucket_name,
|
||||
"",
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -136,7 +157,15 @@ impl TransitionClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let policy = String::from_utf8_lossy(&resp.body().bytes().expect("err").to_vec()).to_string();
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
let policy = String::from_utf8_lossy(&body_vec).to_string();
|
||||
Ok(policy)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,12 +18,12 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use http::StatusCode;
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{de::Deserializer, ser::Serializer};
|
||||
use std::fmt::Display;
|
||||
|
||||
use s3s::Body;
|
||||
//use s3s::Body;
|
||||
use s3s::S3ErrorCode;
|
||||
|
||||
const _REPORT_ISSUE: &str = "Please report this issue at https://github.com/rustfs/rustfs/issues.";
|
||||
@@ -95,7 +95,8 @@ pub fn to_error_response(err: &std::io::Error) -> ErrorResponse {
|
||||
}
|
||||
|
||||
pub fn http_resp_to_error_response(
|
||||
resp: &http::Response<Body>,
|
||||
resp_status: StatusCode,
|
||||
h: &HeaderMap,
|
||||
b: Vec<u8>,
|
||||
bucket_name: &str,
|
||||
object_name: &str,
|
||||
@@ -104,11 +105,11 @@ pub fn http_resp_to_error_response(
|
||||
let err_resp_ = quick_xml::de::from_str::<ErrorResponse>(&err_body);
|
||||
let mut err_resp = ErrorResponse::default();
|
||||
if err_resp_.is_err() {
|
||||
match resp.status() {
|
||||
match resp_status {
|
||||
StatusCode::NOT_FOUND => {
|
||||
if object_name == "" {
|
||||
err_resp = ErrorResponse {
|
||||
status_code: resp.status(),
|
||||
status_code: resp_status,
|
||||
code: S3ErrorCode::NoSuchBucket,
|
||||
message: "The specified bucket does not exist.".to_string(),
|
||||
bucket_name: bucket_name.to_string(),
|
||||
@@ -116,7 +117,7 @@ pub fn http_resp_to_error_response(
|
||||
};
|
||||
} else {
|
||||
err_resp = ErrorResponse {
|
||||
status_code: resp.status(),
|
||||
status_code: resp_status,
|
||||
code: S3ErrorCode::NoSuchKey,
|
||||
message: "The specified key does not exist.".to_string(),
|
||||
bucket_name: bucket_name.to_string(),
|
||||
@@ -127,7 +128,7 @@ pub fn http_resp_to_error_response(
|
||||
}
|
||||
StatusCode::FORBIDDEN => {
|
||||
err_resp = ErrorResponse {
|
||||
status_code: resp.status(),
|
||||
status_code: resp_status,
|
||||
code: S3ErrorCode::AccessDenied,
|
||||
message: "Access Denied.".to_string(),
|
||||
bucket_name: bucket_name.to_string(),
|
||||
@@ -137,7 +138,7 @@ pub fn http_resp_to_error_response(
|
||||
}
|
||||
StatusCode::CONFLICT => {
|
||||
err_resp = ErrorResponse {
|
||||
status_code: resp.status(),
|
||||
status_code: resp_status,
|
||||
code: S3ErrorCode::BucketNotEmpty,
|
||||
message: "Bucket not empty.".to_string(),
|
||||
bucket_name: bucket_name.to_string(),
|
||||
@@ -146,7 +147,7 @@ pub fn http_resp_to_error_response(
|
||||
}
|
||||
StatusCode::PRECONDITION_FAILED => {
|
||||
err_resp = ErrorResponse {
|
||||
status_code: resp.status(),
|
||||
status_code: resp_status,
|
||||
code: S3ErrorCode::PreconditionFailed,
|
||||
message: "Pre condition failed.".to_string(),
|
||||
bucket_name: bucket_name.to_string(),
|
||||
@@ -155,13 +156,13 @@ pub fn http_resp_to_error_response(
|
||||
};
|
||||
}
|
||||
_ => {
|
||||
let mut msg = resp.status().to_string();
|
||||
let mut msg = resp_status.to_string();
|
||||
if err_body.len() > 0 {
|
||||
msg = err_body;
|
||||
}
|
||||
err_resp = ErrorResponse {
|
||||
status_code: resp.status(),
|
||||
code: S3ErrorCode::Custom(resp.status().to_string().into()),
|
||||
status_code: resp_status,
|
||||
code: S3ErrorCode::Custom(resp_status.to_string().into()),
|
||||
message: msg,
|
||||
bucket_name: bucket_name.to_string(),
|
||||
..Default::default()
|
||||
@@ -171,32 +172,32 @@ pub fn http_resp_to_error_response(
|
||||
} else {
|
||||
err_resp = err_resp_.unwrap();
|
||||
}
|
||||
err_resp.status_code = resp.status();
|
||||
if let Some(server_name) = resp.headers().get("Server") {
|
||||
err_resp.status_code = resp_status;
|
||||
if let Some(server_name) = h.get("Server") {
|
||||
err_resp.server = server_name.to_str().expect("err").to_string();
|
||||
}
|
||||
|
||||
let code = resp.headers().get("x-minio-error-code");
|
||||
let code = h.get("x-minio-error-code");
|
||||
if code.is_some() {
|
||||
err_resp.code = S3ErrorCode::Custom(code.expect("err").to_str().expect("err").into());
|
||||
}
|
||||
let desc = resp.headers().get("x-minio-error-desc");
|
||||
let desc = h.get("x-minio-error-desc");
|
||||
if desc.is_some() {
|
||||
err_resp.message = desc.expect("err").to_str().expect("err").trim_matches('"').to_string();
|
||||
}
|
||||
|
||||
if err_resp.request_id == "" {
|
||||
if let Some(x_amz_request_id) = resp.headers().get("x-amz-request-id") {
|
||||
if let Some(x_amz_request_id) = h.get("x-amz-request-id") {
|
||||
err_resp.request_id = x_amz_request_id.to_str().expect("err").to_string();
|
||||
}
|
||||
}
|
||||
if err_resp.host_id == "" {
|
||||
if let Some(x_amz_id_2) = resp.headers().get("x-amz-id-2") {
|
||||
if let Some(x_amz_id_2) = h.get("x-amz-id-2") {
|
||||
err_resp.host_id = x_amz_id_2.to_str().expect("err").to_string();
|
||||
}
|
||||
}
|
||||
if err_resp.region == "" {
|
||||
if let Some(x_amz_bucket_region) = resp.headers().get("x-amz-bucket-region") {
|
||||
if let Some(x_amz_bucket_region) = h.get("x-amz-bucket-region") {
|
||||
err_resp.region = x_amz_bucket_region.to_str().expect("err").to_string();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,17 +19,26 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::Bytes;
|
||||
//use bytes::Bytes;
|
||||
use futures_util::ready;
|
||||
use http::HeaderMap;
|
||||
use std::io::Cursor;
|
||||
use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Read};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::BufReader;
|
||||
use tokio_util::io::StreamReader;
|
||||
|
||||
use crate::client::{
|
||||
api_error_response::err_invalid_argument,
|
||||
api_get_options::GetObjectOptions,
|
||||
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
|
||||
};
|
||||
use futures_util::StreamExt;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
impl TransitionClient {
|
||||
pub fn get_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result<Object, std::io::Error> {
|
||||
@@ -65,11 +74,19 @@ impl TransitionClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = &resp;
|
||||
let object_stat = to_object_info(bucket_name, object_name, resp.headers())?;
|
||||
|
||||
let b = resp.body().bytes().expect("err").to_vec();
|
||||
Ok((object_stat, resp.headers().clone(), BufReader::new(Cursor::new(b))))
|
||||
let h = resp.headers().clone();
|
||||
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
Ok((object_stat, h, BufReader::new(Cursor::new(body_vec))))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::client::{
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, HeaderValue};
|
||||
use http_body_util::BodyExt;
|
||||
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
|
||||
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
|
||||
use s3s::dto::Owner;
|
||||
@@ -83,18 +84,29 @@ impl TransitionClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if resp.status() != http::StatusCode::OK {
|
||||
let b = resp.body().bytes().expect("err").to_vec();
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, object_name)));
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
|
||||
if resp_status != http::StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
resp_status,
|
||||
&h,
|
||||
body_vec,
|
||||
bucket_name,
|
||||
object_name,
|
||||
)));
|
||||
}
|
||||
|
||||
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(body_vec).unwrap()) {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
return Err(std::io::Error::other(err.to_string()));
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, HeaderValue};
|
||||
use std::collections::HashMap;
|
||||
use time::OffsetDateTime;
|
||||
@@ -30,7 +29,11 @@ use crate::client::{
|
||||
};
|
||||
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
|
||||
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
|
||||
use s3s::Body;
|
||||
//use s3s::Body;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use hyper::body::Incoming;
|
||||
use s3s::header::{X_AMZ_MAX_PARTS, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, X_AMZ_VERSION_ID};
|
||||
|
||||
pub struct ObjectAttributesOptions {
|
||||
@@ -130,19 +133,12 @@ struct ObjectAttributePart {
|
||||
}
|
||||
|
||||
impl ObjectAttributes {
|
||||
pub async fn parse_response(&mut self, resp: &mut http::Response<Body>) -> Result<(), std::io::Error> {
|
||||
let h = resp.headers();
|
||||
pub async fn parse_response(&mut self, h: &HeaderMap, body_vec: Vec<u8>) -> Result<(), std::io::Error> {
|
||||
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
|
||||
self.last_modified = mod_time;
|
||||
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
|
||||
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
|
||||
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(body_vec).unwrap()) {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
return Err(std::io::Error::other(err.to_string()));
|
||||
@@ -213,7 +209,8 @@ impl TransitionClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let h = resp.headers();
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
let has_etag = h.get("ETag").unwrap().to_str().unwrap();
|
||||
if !has_etag.is_empty() {
|
||||
return Err(std::io::Error::other(
|
||||
@@ -221,14 +218,17 @@ impl TransitionClient {
|
||||
));
|
||||
}
|
||||
|
||||
if resp.status() != http::StatusCode::OK {
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
let err_body = String::from_utf8(b).unwrap();
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
|
||||
if resp_status != http::StatusCode::OK {
|
||||
let err_body = String::from_utf8(body_vec).unwrap();
|
||||
let mut er = match quick_xml::de::from_str::<AccessControlPolicy>(&err_body) {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
@@ -240,7 +240,7 @@ impl TransitionClient {
|
||||
}
|
||||
|
||||
let mut oa = ObjectAttributes::new();
|
||||
oa.parse_response(&mut resp).await?;
|
||||
oa.parse_response(&h, body_vec).await?;
|
||||
|
||||
Ok(oa)
|
||||
}
|
||||
|
||||
@@ -27,8 +27,11 @@ use crate::client::{
|
||||
transition_api::{ReaderImpl, RequestMetadata, TransitionClient},
|
||||
};
|
||||
use crate::store_api::BucketInfo;
|
||||
use bytes::Bytes;
|
||||
//use bytes::Bytes;
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
use std::collections::HashMap;
|
||||
@@ -97,18 +100,30 @@ impl TransitionClient {
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
if resp.status() != StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
bucket_name,
|
||||
"",
|
||||
)));
|
||||
}
|
||||
|
||||
//let mut list_bucket_result = ListBucketV2Result::default();
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(b).unwrap()) {
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(body_vec).unwrap()) {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
return Err(std::io::Error::other(err.to_string()));
|
||||
|
||||
@@ -17,8 +17,9 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::Bytes;
|
||||
//use bytes::Bytes;
|
||||
use http::{HeaderMap, HeaderName, StatusCode};
|
||||
use hyper::body::Bytes;
|
||||
use s3s::S3ErrorCode;
|
||||
use std::collections::HashMap;
|
||||
use time::OffsetDateTime;
|
||||
@@ -225,10 +226,15 @@ impl TransitionClient {
|
||||
};
|
||||
|
||||
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
|
||||
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
//if resp.is_none() {
|
||||
if resp.status() != StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
&resp,
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
bucket_name,
|
||||
object_name,
|
||||
@@ -287,9 +293,14 @@ impl TransitionClient {
|
||||
};
|
||||
|
||||
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
|
||||
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
if resp.status() != StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
&resp,
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
&p.bucket_name.clone(),
|
||||
&p.object_name,
|
||||
@@ -370,7 +381,8 @@ impl TransitionClient {
|
||||
|
||||
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
|
||||
|
||||
let b = resp.body().bytes().expect("err").to_vec();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
let complete_multipart_upload_result: CompleteMultipartUploadResult = CompleteMultipartUploadResult::default();
|
||||
|
||||
let (exp_time, rule_id) = if let Some(h_x_amz_expiration) = resp.headers().get(X_AMZ_EXPIRATION) {
|
||||
@@ -382,7 +394,6 @@ impl TransitionClient {
|
||||
(OffsetDateTime::now_utc(), "".to_string())
|
||||
};
|
||||
|
||||
let h = resp.headers();
|
||||
Ok(UploadInfo {
|
||||
bucket: complete_multipart_upload_result.bucket,
|
||||
key: complete_multipart_upload_result.key,
|
||||
|
||||
@@ -479,9 +479,13 @@ impl TransitionClient {
|
||||
|
||||
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
|
||||
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
if resp.status() != StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
&resp,
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
bucket_name,
|
||||
object_name,
|
||||
|
||||
@@ -18,8 +18,10 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, HeaderValue, Method, StatusCode};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use s3s::S3ErrorCode;
|
||||
use s3s::dto::ReplicationStatus;
|
||||
@@ -344,8 +346,15 @@ impl TransitionClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let body_bytes: Vec<u8> = resp.body().bytes().expect("err").to_vec();
|
||||
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_bytes)), result_tx.clone());
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_vec)), result_tx.clone());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -390,6 +399,10 @@ impl TransitionClient {
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
//if resp.is_some() {
|
||||
if resp.status() != StatusCode::NO_CONTENT {
|
||||
let error_response: ErrorResponse;
|
||||
@@ -426,7 +439,8 @@ impl TransitionClient {
|
||||
}
|
||||
_ => {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
&resp,
|
||||
resp_status,
|
||||
&h,
|
||||
vec![],
|
||||
bucket_name,
|
||||
object_name,
|
||||
|
||||
@@ -24,8 +24,10 @@ use crate::client::{
|
||||
api_get_options::GetObjectOptions,
|
||||
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use http::HeaderMap;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use s3s::dto::RestoreRequest;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
@@ -107,9 +109,25 @@ impl TransitionClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let b = resp.body().bytes().expect("err").to_vec();
|
||||
if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, "")));
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
if resp_status != http::StatusCode::ACCEPTED && resp_status != http::StatusCode::OK {
|
||||
return Err(std::io::Error::other(http_resp_to_error_response(
|
||||
resp_status,
|
||||
&h,
|
||||
body_vec,
|
||||
bucket_name,
|
||||
"",
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -18,8 +18,10 @@
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, HeaderValue};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use tokio::io::BufReader;
|
||||
@@ -66,10 +68,20 @@ impl TransitionClient {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let b = resp.body().bytes().expect("err").to_vec();
|
||||
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
warn!("bucket exists, resp: {:?}, resperr: {:?}", resp, resperr);
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
|
||||
|
||||
warn!("bucket exists, resperr: {:?}", resperr);
|
||||
/*if to_error_response(resperr).code == "NoSuchBucket" {
|
||||
return Ok(false);
|
||||
}
|
||||
@@ -108,10 +120,20 @@ impl TransitionClient {
|
||||
|
||||
match resp {
|
||||
Ok(resp) => {
|
||||
let b = resp.body().bytes().expect("get bucket versioning err").to_vec();
|
||||
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
warn!("get bucket versioning, resp: {:?}, resperr: {:?}", resp, resperr);
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
|
||||
|
||||
warn!("get bucket versioning, resperr: {:?}", resperr);
|
||||
|
||||
Ok(VersioningConfiguration::default())
|
||||
}
|
||||
|
||||
@@ -25,10 +25,13 @@ use crate::client::{
|
||||
transition_api::{CreateBucketConfiguration, LocationConstraint, TransitionClient},
|
||||
};
|
||||
use http::Request;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::StatusCode;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Bytes;
|
||||
use hyper::body::Incoming;
|
||||
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
use s3s::Body;
|
||||
use s3s::S3ErrorCode;
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -86,7 +89,7 @@ impl TransitionClient {
|
||||
Ok(location)
|
||||
}
|
||||
|
||||
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<Body>, std::io::Error> {
|
||||
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<s3s::Body>, std::io::Error> {
|
||||
let mut url_values = HashMap::new();
|
||||
url_values.insert("location".to_string(), "".to_string());
|
||||
|
||||
@@ -120,7 +123,11 @@ impl TransitionClient {
|
||||
url_str = target_url.to_string();
|
||||
}
|
||||
|
||||
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(Body::empty()) else {
|
||||
let Ok(mut req) = Request::builder()
|
||||
.method(http::Method::GET)
|
||||
.uri(url_str)
|
||||
.body(s3s::Body::empty())
|
||||
else {
|
||||
return Err(std::io::Error::other("create request error"));
|
||||
};
|
||||
|
||||
@@ -172,13 +179,16 @@ impl TransitionClient {
|
||||
}
|
||||
|
||||
async fn process_bucket_location_response(
|
||||
mut resp: http::Response<Body>,
|
||||
mut resp: http::Response<Incoming>,
|
||||
bucket_name: &str,
|
||||
tier_type: &str,
|
||||
) -> Result<String, std::io::Error> {
|
||||
//if resp != nil {
|
||||
if resp.status() != StatusCode::OK {
|
||||
let err_resp = http_resp_to_error_response(&resp, vec![], bucket_name, "");
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
let err_resp = http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "");
|
||||
match err_resp.code {
|
||||
S3ErrorCode::NotImplemented => {
|
||||
match err_resp.server.as_str() {
|
||||
@@ -208,18 +218,22 @@ async fn process_bucket_location_response(
|
||||
}
|
||||
//}
|
||||
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
let mut location = "".to_string();
|
||||
if tier_type == "huaweicloud" {
|
||||
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(b).unwrap()).unwrap();
|
||||
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(body_vec).unwrap()).unwrap();
|
||||
location = d.location_constraint;
|
||||
} else {
|
||||
if let Ok(LocationConstraint { field }) = quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(b).unwrap()) {
|
||||
if let Ok(LocationConstraint { field }) =
|
||||
quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(body_vec).unwrap())
|
||||
{
|
||||
location = field;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,38 +118,51 @@ pub fn new_getobjectreader<'a>(
|
||||
let mut is_encrypted = false;
|
||||
let is_compressed = false; //oi.is_compressed_ok();
|
||||
|
||||
let mut rs_ = None;
|
||||
let rs_;
|
||||
if rs.is_none() && opts.part_number.is_some() && opts.part_number.unwrap() > 0 {
|
||||
rs_ = part_number_to_rangespec(oi.clone(), opts.part_number.unwrap());
|
||||
} else {
|
||||
rs_ = rs.clone();
|
||||
}
|
||||
|
||||
let mut get_fn: ObjReaderFn;
|
||||
|
||||
let (off, length) = match rs_.unwrap().get_offset_length(oi.size) {
|
||||
Ok(x) => x,
|
||||
Err(err) => {
|
||||
return Err(ErrorResponse {
|
||||
code: S3ErrorCode::InvalidRange,
|
||||
message: err.to_string(),
|
||||
key: None,
|
||||
bucket_name: None,
|
||||
region: None,
|
||||
request_id: None,
|
||||
host_id: "".to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
|
||||
//Box::pin({
|
||||
let r = GetObjectReader {
|
||||
object_info: oi.clone(),
|
||||
stream: Box::new(input_reader),
|
||||
if let Some(rs_) = rs_ {
|
||||
let (off, length) = match rs_.get_offset_length(oi.size) {
|
||||
Ok(x) => x,
|
||||
Err(err) => {
|
||||
return Err(ErrorResponse {
|
||||
code: S3ErrorCode::InvalidRange,
|
||||
message: err.to_string(),
|
||||
key: None,
|
||||
bucket_name: None,
|
||||
region: None,
|
||||
request_id: None,
|
||||
host_id: "".to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
r
|
||||
//})
|
||||
});
|
||||
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
|
||||
//Box::pin({
|
||||
let r = GetObjectReader {
|
||||
object_info: oi.clone(),
|
||||
stream: Box::new(input_reader),
|
||||
};
|
||||
r
|
||||
//})
|
||||
});
|
||||
|
||||
Ok((get_fn, off as i64, length as i64))
|
||||
return Ok((get_fn, off as i64, length as i64));
|
||||
}
|
||||
Err(ErrorResponse {
|
||||
code: S3ErrorCode::InvalidRange,
|
||||
message: "Invalid range".to_string(),
|
||||
key: Some(oi.name.clone()),
|
||||
bucket_name: Some(oi.bucket.clone()),
|
||||
region: Some("".to_string()),
|
||||
request_id: None,
|
||||
host_id: "".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Convert a raw stored ETag into the strongly-typed `s3s::dto::ETag`.
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
use crate::client::bucket_cache::BucketLocationCache;
|
||||
use crate::client::{
|
||||
api_error_response::ErrorResponse,
|
||||
api_error_response::{err_invalid_argument, http_resp_to_error_response, to_error_response},
|
||||
api_get_options::GetObjectOptions,
|
||||
api_put_object::PutObjectOptions,
|
||||
@@ -32,13 +33,17 @@ use crate::client::{
|
||||
credentials::{CredContext, Credentials, SignatureType, Static},
|
||||
};
|
||||
use crate::{client::checksum::ChecksumMode, store_api::GetObjectReader};
|
||||
use bytes::Bytes;
|
||||
//use bytes::Bytes;
|
||||
use futures::{Future, StreamExt};
|
||||
use http::{HeaderMap, HeaderName};
|
||||
use http::{
|
||||
HeaderValue, Response, StatusCode,
|
||||
request::{Builder, Request},
|
||||
};
|
||||
use http_body::Body;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Bytes;
|
||||
use hyper::body::Incoming;
|
||||
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
|
||||
use hyper_util::{client::legacy::Client, client::legacy::connect::HttpConnector, rt::TokioExecutor};
|
||||
use md5::Digest;
|
||||
@@ -54,8 +59,8 @@ use rustfs_utils::{
|
||||
},
|
||||
};
|
||||
use s3s::S3ErrorCode;
|
||||
use s3s::dto::Owner;
|
||||
use s3s::dto::ReplicationStatus;
|
||||
use s3s::{Body, dto::Owner};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::Sha256;
|
||||
use std::io::Cursor;
|
||||
@@ -95,7 +100,7 @@ pub struct TransitionClient {
|
||||
pub creds_provider: Arc<Mutex<Credentials<Static>>>,
|
||||
pub override_signer_type: SignatureType,
|
||||
pub secure: bool,
|
||||
pub http_client: Client<HttpsConnector<HttpConnector>, Body>,
|
||||
pub http_client: Client<HttpsConnector<HttpConnector>, s3s::Body>,
|
||||
pub bucket_loc_cache: Arc<Mutex<BucketLocationCache>>,
|
||||
pub is_trace_enabled: Arc<Mutex<bool>>,
|
||||
pub trace_errors_only: Arc<Mutex<bool>>,
|
||||
@@ -248,7 +253,7 @@ impl TransitionClient {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn dump_http(&self, req: &http::Request<Body>, resp: &http::Response<Body>) -> Result<(), std::io::Error> {
|
||||
fn dump_http(&self, req: &http::Request<s3s::Body>, resp: &http::Response<Incoming>) -> Result<(), std::io::Error> {
|
||||
let mut resp_trace: Vec<u8>;
|
||||
|
||||
//info!("{}{}", self.trace_output, "---------BEGIN-HTTP---------");
|
||||
@@ -257,7 +262,7 @@ impl TransitionClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn doit(&self, req: http::Request<Body>) -> Result<http::Response<Body>, std::io::Error> {
|
||||
pub async fn doit(&self, req: http::Request<s3s::Body>) -> Result<http::Response<Incoming>, std::io::Error> {
|
||||
let req_method;
|
||||
let req_uri;
|
||||
let req_headers;
|
||||
@@ -272,9 +277,7 @@ impl TransitionClient {
|
||||
debug!("endpoint_url: {}", self.endpoint_url.as_str().to_string());
|
||||
resp = http_client.request(req);
|
||||
}
|
||||
let resp = resp
|
||||
.await /*.map_err(Into::into)*/
|
||||
.map(|res| res.map(Body::from));
|
||||
let resp = resp.await;
|
||||
debug!("http_client url: {} {}", req_method, req_uri);
|
||||
debug!("http_client headers: {:?}", req_headers);
|
||||
if let Err(err) = resp {
|
||||
@@ -282,7 +285,7 @@ impl TransitionClient {
|
||||
return Err(std::io::Error::other(err));
|
||||
}
|
||||
|
||||
let mut resp = resp.unwrap();
|
||||
let resp = resp.unwrap();
|
||||
debug!("http_resp: {:?}", resp);
|
||||
|
||||
//let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
|
||||
@@ -291,23 +294,26 @@ impl TransitionClient {
|
||||
//if self.is_trace_enabled && !(self.trace_errors_only && resp.status() == StatusCode::OK) {
|
||||
if resp.status() != StatusCode::OK {
|
||||
//self.dump_http(&cloned_req, &resp)?;
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
warn!("err_body: {}", String::from_utf8(b).unwrap());
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
warn!("err_body: {}", String::from_utf8(body_vec).unwrap());
|
||||
Err(std::io::Error::other("http_client call error."))
|
||||
} else {
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
pub async fn execute_method(
|
||||
&self,
|
||||
method: http::Method,
|
||||
metadata: &mut RequestMetadata,
|
||||
) -> Result<http::Response<Body>, std::io::Error> {
|
||||
) -> Result<http::Response<Incoming>, std::io::Error> {
|
||||
if self.is_offline() {
|
||||
let mut s = self.endpoint_url.to_string();
|
||||
s.push_str(" is offline.");
|
||||
@@ -317,7 +323,7 @@ impl TransitionClient {
|
||||
let retryable: bool;
|
||||
//let mut body_seeker: BufferReader;
|
||||
let mut req_retry = self.max_retries;
|
||||
let mut resp: http::Response<Body>;
|
||||
let mut resp: http::Response<Incoming>;
|
||||
|
||||
//if metadata.content_body != nil {
|
||||
//body_seeker = BufferReader::new(metadata.content_body.read_all().await?);
|
||||
@@ -339,13 +345,19 @@ impl TransitionClient {
|
||||
}
|
||||
}
|
||||
|
||||
let b = resp
|
||||
.body_mut()
|
||||
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
let mut err_response = http_resp_to_error_response(&resp, b.clone(), &metadata.bucket_name, &metadata.object_name);
|
||||
let resp_status = resp.status();
|
||||
let h = resp.headers().clone();
|
||||
|
||||
let mut body_vec = Vec::new();
|
||||
let mut body = resp.into_body();
|
||||
while let Some(frame) = body.frame().await {
|
||||
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
if let Some(data) = frame.data_ref() {
|
||||
body_vec.extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
let mut err_response =
|
||||
http_resp_to_error_response(resp_status, &h, body_vec.clone(), &metadata.bucket_name, &metadata.object_name);
|
||||
err_response.message = format!("remote tier error: {}", err_response.message);
|
||||
|
||||
if self.region == "" {
|
||||
@@ -381,7 +393,7 @@ impl TransitionClient {
|
||||
continue;
|
||||
}
|
||||
|
||||
if is_http_status_retryable(&resp.status()) {
|
||||
if is_http_status_retryable(&resp_status) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -395,7 +407,7 @@ impl TransitionClient {
|
||||
&self,
|
||||
method: &http::Method,
|
||||
metadata: &mut RequestMetadata,
|
||||
) -> Result<http::Request<Body>, std::io::Error> {
|
||||
) -> Result<http::Request<s3s::Body>, std::io::Error> {
|
||||
let mut location = metadata.bucket_location.clone();
|
||||
if location == "" && metadata.bucket_name != "" {
|
||||
location = self.get_bucket_location(&metadata.bucket_name).await?;
|
||||
@@ -415,7 +427,7 @@ impl TransitionClient {
|
||||
let Ok(mut req) = Request::builder()
|
||||
.method(method)
|
||||
.uri(target_url.to_string())
|
||||
.body(Body::empty())
|
||||
.body(s3s::Body::empty())
|
||||
else {
|
||||
return Err(std::io::Error::other("create request error"));
|
||||
};
|
||||
@@ -527,10 +539,10 @@ impl TransitionClient {
|
||||
if metadata.content_length > 0 {
|
||||
match &mut metadata.content_body {
|
||||
ReaderImpl::Body(content_body) => {
|
||||
*req.body_mut() = Body::from(content_body.clone());
|
||||
*req.body_mut() = s3s::Body::from(content_body.clone());
|
||||
}
|
||||
ReaderImpl::ObjectBody(content_body) => {
|
||||
*req.body_mut() = Body::from(content_body.read_all().await?);
|
||||
*req.body_mut() = s3s::Body::from(content_body.read_all().await?);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -538,7 +550,7 @@ impl TransitionClient {
|
||||
Ok(req)
|
||||
}
|
||||
|
||||
pub fn set_user_agent(&self, req: &mut Request<Body>) {
|
||||
pub fn set_user_agent(&self, req: &mut Request<s3s::Body>) {
|
||||
let headers = req.headers_mut();
|
||||
headers.insert("User-Agent", C_USER_AGENT.parse().expect("err"));
|
||||
}
|
||||
@@ -976,25 +988,217 @@ impl Default for UploadInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert HTTP headers to ObjectInfo struct
|
||||
/// This function parses various S3 response headers to construct an ObjectInfo struct
|
||||
/// containing metadata about an S3 object.
|
||||
pub fn to_object_info(bucket_name: &str, object_name: &str, h: &HeaderMap) -> Result<ObjectInfo, std::io::Error> {
|
||||
todo!()
|
||||
// Helper function to get header value as string
|
||||
let get_header = |name: &str| -> String { h.get(name).and_then(|val| val.to_str().ok()).unwrap_or("").to_string() };
|
||||
|
||||
// Get and process the ETag
|
||||
let etag = {
|
||||
let etag_raw = get_header("ETag");
|
||||
// Remove surrounding quotes if present (trimming ETag)
|
||||
let trimmed = etag_raw.trim_start_matches('"').trim_end_matches('"');
|
||||
Some(trimmed.to_string())
|
||||
};
|
||||
|
||||
// Parse content length if it exists
|
||||
let size = {
|
||||
let content_length_str = get_header("Content-Length");
|
||||
if !content_length_str.is_empty() {
|
||||
content_length_str
|
||||
.parse::<i64>()
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Content-Length is not an integer"))?
|
||||
} else {
|
||||
-1
|
||||
}
|
||||
};
|
||||
|
||||
// Parse Last-Modified time
|
||||
let mod_time = {
|
||||
let last_modified_str = get_header("Last-Modified");
|
||||
if !last_modified_str.is_empty() {
|
||||
// Parse HTTP date format (RFC 7231)
|
||||
// Using time crate to parse HTTP dates
|
||||
let parsed_time = OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc2822)
|
||||
.or_else(|_| OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc3339))
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Last-Modified time format is invalid"))?;
|
||||
Some(parsed_time)
|
||||
} else {
|
||||
Some(OffsetDateTime::now_utc())
|
||||
}
|
||||
};
|
||||
|
||||
// Get content type
|
||||
let content_type = {
|
||||
let content_type_raw = get_header("Content-Type");
|
||||
let content_type_trimmed = content_type_raw.trim();
|
||||
if content_type_trimmed.is_empty() {
|
||||
Some("application/octet-stream".to_string())
|
||||
} else {
|
||||
Some(content_type_trimmed.to_string())
|
||||
}
|
||||
};
|
||||
|
||||
// Parse Expires time
|
||||
let expiration = {
|
||||
let expiry_str = get_header("Expires");
|
||||
if !expiry_str.is_empty() {
|
||||
OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc2822)
|
||||
.or_else(|_| OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc3339))
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "'Expires' is not in supported format"))?
|
||||
} else {
|
||||
OffsetDateTime::now_utc()
|
||||
}
|
||||
};
|
||||
|
||||
// Extract user metadata (headers prefixed with "X-Amz-Meta-")
|
||||
let user_metadata = {
|
||||
let mut meta = HashMap::new();
|
||||
for (name, value) in h.iter() {
|
||||
let header_name = name.as_str().to_lowercase();
|
||||
if header_name.starts_with("x-amz-meta-") {
|
||||
let key = header_name.strip_prefix("x-amz-meta-").unwrap().to_string();
|
||||
if let Ok(value_str) = value.to_str() {
|
||||
meta.insert(key, value_str.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
meta
|
||||
};
|
||||
|
||||
let user_tag = {
|
||||
let user_tag_str = get_header("X-Amz-Tagging");
|
||||
user_tag_str
|
||||
};
|
||||
|
||||
// Extract user tags count
|
||||
let user_tag_count = {
|
||||
let count_str = get_header("x-amz-tagging-count");
|
||||
if !count_str.is_empty() {
|
||||
count_str
|
||||
.parse::<usize>()
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "x-amz-tagging-count is not an integer"))?
|
||||
} else {
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
// Handle restore info
|
||||
let restore = {
|
||||
let restore_hdr = get_header("x-amz-restore");
|
||||
if !restore_hdr.is_empty() {
|
||||
// Simplified restore header parsing - in real implementation, this would parse the specific format
|
||||
// "ongoing-request=\"true\"" or "ongoing-request=\"false\", expiry-date=\"..."
|
||||
let ongoing_restore = restore_hdr.contains("ongoing-request=\"true\"");
|
||||
RestoreInfo {
|
||||
ongoing_restore,
|
||||
expiry_time: if ongoing_restore {
|
||||
OffsetDateTime::now_utc()
|
||||
} else {
|
||||
// Try to extract expiry date from the header
|
||||
// This is simplified - real parsing would be more complex
|
||||
OffsetDateTime::now_utc()
|
||||
},
|
||||
}
|
||||
} else {
|
||||
RestoreInfo::default()
|
||||
}
|
||||
};
|
||||
|
||||
// Extract version ID
|
||||
let version_id = {
|
||||
let version_id_str = get_header("x-amz-version-id");
|
||||
if !version_id_str.is_empty() {
|
||||
Some(Uuid::parse_str(&version_id_str).unwrap_or_else(|_| Uuid::nil()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Check if it's a delete marker
|
||||
let is_delete_marker = get_header("x-amz-delete-marker") == "true";
|
||||
|
||||
// Get replication status
|
||||
let replication_status = {
|
||||
let status_str = get_header("x-amz-replication-status");
|
||||
ReplicationStatus::from_static(match status_str.as_str() {
|
||||
"COMPLETE" => ReplicationStatus::COMPLETE,
|
||||
"PENDING" => ReplicationStatus::PENDING,
|
||||
"FAILED" => ReplicationStatus::FAILED,
|
||||
"REPLICA" => ReplicationStatus::REPLICA,
|
||||
_ => ReplicationStatus::PENDING,
|
||||
})
|
||||
};
|
||||
|
||||
// Extract expiration rule ID and time (simplified)
|
||||
let (expiration_time, expiration_rule_id) = {
|
||||
// In a real implementation, this would parse the x-amz-expiration header
|
||||
// which typically has format: "expiry-date="Fri, 11 Dec 2020 00:00:00 GMT", rule-id="myrule""
|
||||
let exp_header = get_header("x-amz-expiration");
|
||||
if !exp_header.is_empty() {
|
||||
// Simplified parsing - real implementation would be more thorough
|
||||
(OffsetDateTime::now_utc(), exp_header) // Placeholder
|
||||
} else {
|
||||
(OffsetDateTime::now_utc(), "".to_string())
|
||||
}
|
||||
};
|
||||
|
||||
// Extract checksums
|
||||
let checksum_crc32 = get_header("x-amz-checksum-crc32");
|
||||
let checksum_crc32c = get_header("x-amz-checksum-crc32c");
|
||||
let checksum_sha1 = get_header("x-amz-checksum-sha1");
|
||||
let checksum_sha256 = get_header("x-amz-checksum-sha256");
|
||||
let checksum_crc64nvme = get_header("x-amz-checksum-crc64nvme");
|
||||
let checksum_mode = get_header("x-amz-checksum-mode");
|
||||
|
||||
// Build and return the ObjectInfo struct
|
||||
Ok(ObjectInfo {
|
||||
etag,
|
||||
name: object_name.to_string(),
|
||||
mod_time,
|
||||
size,
|
||||
content_type,
|
||||
metadata: h.clone(),
|
||||
user_metadata,
|
||||
user_tags: "".to_string(), // Tags would need separate parsing
|
||||
user_tag_count,
|
||||
owner: Owner::default(),
|
||||
storage_class: get_header("x-amz-storage-class"),
|
||||
is_latest: true, // Would be determined by versioning settings
|
||||
is_delete_marker,
|
||||
version_id,
|
||||
replication_status,
|
||||
replication_ready: false, // Would be computed based on status
|
||||
expiration: expiration_time,
|
||||
expiration_rule_id,
|
||||
num_versions: 1, // Would be determined by versioning
|
||||
restore,
|
||||
checksum_crc32,
|
||||
checksum_crc32c,
|
||||
checksum_sha1,
|
||||
checksum_sha256,
|
||||
checksum_crc64nvme,
|
||||
checksum_mode,
|
||||
})
|
||||
}
|
||||
|
||||
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
|
||||
|
||||
//#[derive(Clone)]
|
||||
pub struct SendRequest {
|
||||
inner: hyper::client::conn::http1::SendRequest<Body>,
|
||||
inner: hyper::client::conn::http1::SendRequest<s3s::Body>,
|
||||
}
|
||||
|
||||
impl From<hyper::client::conn::http1::SendRequest<Body>> for SendRequest {
|
||||
fn from(inner: hyper::client::conn::http1::SendRequest<Body>) -> Self {
|
||||
impl From<hyper::client::conn::http1::SendRequest<s3s::Body>> for SendRequest {
|
||||
fn from(inner: hyper::client::conn::http1::SendRequest<s3s::Body>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl tower::Service<Request<Body>> for SendRequest {
|
||||
type Response = Response<Body>;
|
||||
impl tower::Service<Request<s3s::Body>> for SendRequest {
|
||||
type Response = Response<Incoming>;
|
||||
type Error = std::io::Error;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
@@ -1002,13 +1206,13 @@ impl tower::Service<Request<Body>> for SendRequest {
|
||||
self.inner.poll_ready(cx).map_err(std::io::Error::other)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<Body>) -> Self::Future {
|
||||
fn call(&mut self, req: Request<s3s::Body>) -> Self::Future {
|
||||
//let req = hyper::Request::builder().uri("/").body(http_body_util::Empty::<Bytes>::new()).unwrap();
|
||||
//let req = hyper::Request::builder().uri("/").body(Body::empty()).unwrap();
|
||||
|
||||
let fut = self.inner.send_request(req);
|
||||
|
||||
Box::pin(async move { fut.await.map_err(std::io::Error::other).map(|res| res.map(Body::from)) })
|
||||
Box::pin(async move { fut.await.map_err(std::io::Error::other) })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -452,6 +452,8 @@ pub fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (Strin
|
||||
.strip_prefix(SLASH_SEPARATOR)
|
||||
.unwrap_or(path);
|
||||
// Find the position of the first '/'
|
||||
#[cfg(windows)]
|
||||
let trimmed_path = trimmed_path.replace('\\', "/");
|
||||
let Some(pos) = trimmed_path.find(SLASH_SEPARATOR) else {
|
||||
return (trimmed_path.to_string(), "".to_string());
|
||||
};
|
||||
|
||||
@@ -4852,10 +4852,18 @@ impl StorageAPI for SetDisks {
|
||||
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
|
||||
let transition_etag = rustfs_utils::path::trim_etag(&opts.transition.etag);
|
||||
let stored_etag = rustfs_utils::path::trim_etag(&get_raw_etag(&fi.metadata));
|
||||
if opts.mod_time.expect("err").unix_timestamp() != fi.mod_time.as_ref().expect("err").unix_timestamp()
|
||||
|| transition_etag != stored_etag
|
||||
{
|
||||
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
|
||||
if let Some(mod_time1) = opts.mod_time {
|
||||
if let Some(mod_time2) = fi.mod_time.as_ref() {
|
||||
if mod_time1.unix_timestamp() != mod_time2.unix_timestamp()
|
||||
/*|| transition_etag != stored_etag*/
|
||||
{
|
||||
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::other("mod_time 2 error.".to_string()));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::other("mod_time 1 error.".to_string()));
|
||||
}
|
||||
if fi.transition_status == TRANSITION_COMPLETE {
|
||||
return Ok(());
|
||||
@@ -4985,8 +4993,10 @@ impl StorageAPI for SetDisks {
|
||||
oi = ObjectInfo::from_file_info(&actual_fi, bucket, object, opts.versioned || opts.version_suspended);
|
||||
let ropts = put_restore_opts(bucket, object, &opts.transition.restore_request, &oi).await?;
|
||||
if oi.parts.len() == 1 {
|
||||
let mut opts = opts.clone();
|
||||
opts.part_number = Some(1);
|
||||
let rs: Option<HTTPRangeSpec> = None;
|
||||
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, opts).await;
|
||||
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, &opts).await;
|
||||
if let Err(err) = gr {
|
||||
return set_restore_header_fn(&mut oi, Some(to_object_err(err.into(), vec![bucket, object]))).await;
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ rustfs-filemeta = { workspace = true }
|
||||
rustfs-madmin = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
rustfs-ecstore = { workspace = true }
|
||||
rustfs-ahm = { workspace = true }
|
||||
http = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
s3s = { workspace = true }
|
||||
@@ -59,3 +60,4 @@ tokio-test = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
heed = { workspace = true }
|
||||
@@ -554,9 +554,9 @@ impl FolderScanner {
|
||||
|
||||
let file_path = entry.path().to_string_lossy().to_string();
|
||||
|
||||
let trim_dir_name = file_path.strip_prefix(&dir_path).unwrap_or(&file_path);
|
||||
//let trim_dir_name = file_path.strip_prefix(&dir_path).unwrap_or(&file_path);
|
||||
|
||||
let entry_name = path_join_buf(&[&folder.name, trim_dir_name]);
|
||||
let entry_name = path_join_buf(&[&folder.name, &file_name]);
|
||||
|
||||
if entry_name.is_empty() || entry_name == folder.name {
|
||||
debug!("scan_folder: done for now entry_name is empty or equals folder name");
|
||||
|
||||
@@ -272,7 +272,7 @@ impl ScannerIOCache for SetDisks {
|
||||
let store_clone = self.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
let send_update_fut = tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(30 + rand::random::<u64>() % 10));
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(3 + rand::random::<u64>() % 10));
|
||||
|
||||
let mut last_update = None;
|
||||
|
||||
|
||||
786
crates/scanner/tests/lifecycle_cache_test.rs
Normal file
786
crates/scanner/tests/lifecycle_cache_test.rs
Normal file
@@ -0,0 +1,786 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use heed::byteorder::BigEndian;
|
||||
use heed::types::*;
|
||||
use heed::{BoxedError, BytesDecode, BytesEncode, Database, DatabaseFlags, Env, EnvOpenOptions};
|
||||
use rustfs_ecstore::{
|
||||
disk::endpoint::Endpoint,
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
store::ECStore,
|
||||
store_api::{MakeBucketOptions, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader, StorageAPI},
|
||||
};
|
||||
use rustfs_scanner::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
};
|
||||
//use heed_traits::Comparator;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::fs;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
static _LIFECYCLE_EXPIRY_CURRENT_DAYS: i32 = 1;
|
||||
static _LIFECYCLE_EXPIRY_NONCURRENT_DAYS: i32 = 1;
|
||||
static _LIFECYCLE_TRANSITION_CURRENT_DAYS: i32 = 1;
|
||||
static _LIFECYCLE_TRANSITION_NONCURRENT_DAYS: i32 = 1;
|
||||
static GLOBAL_LMDB_ENV: OnceLock<Env> = OnceLock::new();
|
||||
static GLOBAL_LMDB_DB: OnceLock<Database<I64<BigEndian>, LifecycleContentCodec>> = OnceLock::new();
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
});
|
||||
}
|
||||
|
||||
/// Test helper: Create test environment with ECStore
|
||||
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
|
||||
init_tracing();
|
||||
|
||||
// Fast path: already initialized, just clone and return
|
||||
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
|
||||
return (paths.clone(), ecstore.clone());
|
||||
}
|
||||
|
||||
// create temp dir as 4 disks with unique base dir
|
||||
let test_base_dir = format!("/tmp/rustfs_ahm_lifecyclecache_test_{}", uuid::Uuid::new_v4());
|
||||
let temp_dir = std::path::PathBuf::from(&test_base_dir);
|
||||
if temp_dir.exists() {
|
||||
fs::remove_dir_all(&temp_dir).await.ok();
|
||||
}
|
||||
fs::create_dir_all(&temp_dir).await.unwrap();
|
||||
|
||||
// create 4 disk dirs
|
||||
let disk_paths = vec![
|
||||
temp_dir.join("disk1"),
|
||||
temp_dir.join("disk2"),
|
||||
temp_dir.join("disk3"),
|
||||
temp_dir.join("disk4"),
|
||||
];
|
||||
|
||||
for disk_path in &disk_paths {
|
||||
fs::create_dir_all(disk_path).await.unwrap();
|
||||
}
|
||||
|
||||
// create EndpointServerPools
|
||||
let mut endpoints = Vec::new();
|
||||
for (i, disk_path) in disk_paths.iter().enumerate() {
|
||||
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
|
||||
// set correct index
|
||||
endpoint.set_pool_index(0);
|
||||
endpoint.set_set_index(0);
|
||||
endpoint.set_disk_index(i);
|
||||
endpoints.push(endpoint);
|
||||
}
|
||||
|
||||
let pool_endpoints = PoolEndpoints {
|
||||
legacy: false,
|
||||
set_count: 1,
|
||||
drives_per_set: 4,
|
||||
endpoints: Endpoints::from(endpoints),
|
||||
cmd_line: "test".to_string(),
|
||||
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
|
||||
};
|
||||
|
||||
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
|
||||
|
||||
// format disks (only first time)
|
||||
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
|
||||
|
||||
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
|
||||
let port = 9002; // for simplicity
|
||||
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
|
||||
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// init bucket metadata system
|
||||
let buckets_list = ecstore
|
||||
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
|
||||
no_metadata: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
|
||||
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
|
||||
|
||||
//lmdb env
|
||||
// User home directory
|
||||
/*if let Ok(home_dir) = env::var("HOME").or_else(|_| env::var("USERPROFILE")) {
|
||||
let mut path = PathBuf::from(home_dir);
|
||||
path.push(format!(".{DEFAULT_LOG_FILENAME}"));
|
||||
path.push(DEFAULT_LOG_DIR);
|
||||
if ensure_directory_writable(&path) {
|
||||
//return path;
|
||||
}
|
||||
}*/
|
||||
let test_lmdb_lifecycle_dir = "/tmp/lmdb_lifecycle".to_string();
|
||||
let temp_dir = std::path::PathBuf::from(&test_lmdb_lifecycle_dir);
|
||||
if temp_dir.exists() {
|
||||
fs::remove_dir_all(&temp_dir).await.ok();
|
||||
}
|
||||
fs::create_dir_all(&temp_dir).await.unwrap();
|
||||
let lmdb_env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&test_lmdb_lifecycle_dir).unwrap() };
|
||||
let bucket_name = format!("test-lc-cache-{}", "00000");
|
||||
let mut wtxn = lmdb_env.write_txn().unwrap();
|
||||
let db = match lmdb_env
|
||||
.database_options()
|
||||
.name(&format!("bucket_{bucket_name}"))
|
||||
.types::<I64<BigEndian>, LifecycleContentCodec>()
|
||||
.flags(DatabaseFlags::DUP_SORT)
|
||||
//.dup_sort_comparator::<>()
|
||||
.create(&mut wtxn)
|
||||
{
|
||||
Ok(db) => db,
|
||||
Err(err) => {
|
||||
panic!("lmdb error: {err}");
|
||||
}
|
||||
};
|
||||
let _ = wtxn.commit();
|
||||
let _ = GLOBAL_LMDB_ENV.set(lmdb_env);
|
||||
let _ = GLOBAL_LMDB_DB.set(db);
|
||||
|
||||
// Store in global once lock
|
||||
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
|
||||
|
||||
(disk_paths, ecstore)
|
||||
}
|
||||
|
||||
/// Test helper: Create a test bucket
|
||||
#[allow(dead_code)]
|
||||
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
|
||||
(**ecstore)
|
||||
.make_bucket(bucket_name, &Default::default())
|
||||
.await
|
||||
.expect("Failed to create test bucket");
|
||||
info!("Created test bucket: {}", bucket_name);
|
||||
}
|
||||
|
||||
/// Test helper: Create a test lock bucket
|
||||
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
|
||||
(**ecstore)
|
||||
.make_bucket(
|
||||
bucket_name,
|
||||
&MakeBucketOptions {
|
||||
lock_enabled: true,
|
||||
versioning_enabled: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("Failed to create test bucket");
|
||||
info!("Created test bucket: {}", bucket_name);
|
||||
}
|
||||
|
||||
/// Test helper: Upload test object
|
||||
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
|
||||
let mut reader = PutObjReader::from_vec(data.to_vec());
|
||||
let object_info = (**ecstore)
|
||||
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
|
||||
.await
|
||||
.expect("Failed to upload test object");
|
||||
|
||||
println!("object_info1: {object_info:?}");
|
||||
|
||||
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
Ok(info) => !info.delete_marker,
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ns_to_offset_datetime(ns: i128) -> Option<OffsetDateTime> {
|
||||
OffsetDateTime::from_unix_timestamp_nanos(ns).ok()
|
||||
}
|
||||
|
||||
fn convert_record_to_object_info(record: &LocalObjectRecord) -> ObjectInfo {
|
||||
let usage = &record.usage;
|
||||
|
||||
ObjectInfo {
|
||||
bucket: usage.bucket.clone(),
|
||||
name: usage.object.clone(),
|
||||
size: usage.total_size as i64,
|
||||
delete_marker: !usage.has_live_object && usage.delete_markers_count > 0,
|
||||
mod_time: usage.last_modified_ns.and_then(ns_to_offset_datetime),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn to_object_info(
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
total_size: i64,
|
||||
delete_marker: bool,
|
||||
mod_time: OffsetDateTime,
|
||||
version_id: &str,
|
||||
) -> ObjectInfo {
|
||||
ObjectInfo {
|
||||
bucket: bucket.to_string(),
|
||||
name: object.to_string(),
|
||||
size: total_size,
|
||||
delete_marker,
|
||||
mod_time: Some(mod_time),
|
||||
version_id: Some(Uuid::parse_str(version_id).unwrap()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum LifecycleType {
|
||||
ExpiryCurrent,
|
||||
ExpiryNoncurrent,
|
||||
TransitionCurrent,
|
||||
TransitionNoncurrent,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct LifecycleContent {
|
||||
ver_no: u8,
|
||||
ver_id: String,
|
||||
mod_time: OffsetDateTime,
|
||||
type_: LifecycleType,
|
||||
object_name: String,
|
||||
}
|
||||
|
||||
pub struct LifecycleContentCodec;
|
||||
|
||||
impl BytesEncode<'_> for LifecycleContentCodec {
|
||||
type EItem = LifecycleContent;
|
||||
|
||||
fn bytes_encode(lcc: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
|
||||
let (ver_no_byte, ver_id_bytes, mod_timestamp_bytes, type_byte, object_name_bytes) = match lcc {
|
||||
LifecycleContent {
|
||||
ver_no,
|
||||
ver_id,
|
||||
mod_time,
|
||||
type_: LifecycleType::ExpiryCurrent,
|
||||
object_name,
|
||||
} => (
|
||||
ver_no,
|
||||
ver_id.clone().into_bytes(),
|
||||
mod_time.unix_timestamp().to_be_bytes(),
|
||||
0,
|
||||
object_name.clone().into_bytes(),
|
||||
),
|
||||
LifecycleContent {
|
||||
ver_no,
|
||||
ver_id,
|
||||
mod_time,
|
||||
type_: LifecycleType::ExpiryNoncurrent,
|
||||
object_name,
|
||||
} => (
|
||||
ver_no,
|
||||
ver_id.clone().into_bytes(),
|
||||
mod_time.unix_timestamp().to_be_bytes(),
|
||||
1,
|
||||
object_name.clone().into_bytes(),
|
||||
),
|
||||
LifecycleContent {
|
||||
ver_no,
|
||||
ver_id,
|
||||
mod_time,
|
||||
type_: LifecycleType::TransitionCurrent,
|
||||
object_name,
|
||||
} => (
|
||||
ver_no,
|
||||
ver_id.clone().into_bytes(),
|
||||
mod_time.unix_timestamp().to_be_bytes(),
|
||||
2,
|
||||
object_name.clone().into_bytes(),
|
||||
),
|
||||
LifecycleContent {
|
||||
ver_no,
|
||||
ver_id,
|
||||
mod_time,
|
||||
type_: LifecycleType::TransitionNoncurrent,
|
||||
object_name,
|
||||
} => (
|
||||
ver_no,
|
||||
ver_id.clone().into_bytes(),
|
||||
mod_time.unix_timestamp().to_be_bytes(),
|
||||
3,
|
||||
object_name.clone().into_bytes(),
|
||||
),
|
||||
};
|
||||
|
||||
let mut output = Vec::<u8>::new();
|
||||
output.push(*ver_no_byte);
|
||||
output.extend_from_slice(&ver_id_bytes);
|
||||
output.extend_from_slice(&mod_timestamp_bytes);
|
||||
output.push(type_byte);
|
||||
output.extend_from_slice(&object_name_bytes);
|
||||
Ok(Cow::Owned(output))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> BytesDecode<'a> for LifecycleContentCodec {
|
||||
type DItem = LifecycleContent;
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
|
||||
use std::mem::size_of;
|
||||
|
||||
let ver_no = match bytes.get(..size_of::<u8>()) {
|
||||
Some(bytes) => bytes.try_into().map(u8::from_be_bytes).unwrap(),
|
||||
None => return Err("invalid LifecycleContent: cannot extract ver_no".into()),
|
||||
};
|
||||
|
||||
let ver_id = match bytes.get(size_of::<u8>()..(36 + 1)) {
|
||||
Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() },
|
||||
None => return Err("invalid LifecycleContent: cannot extract ver_id".into()),
|
||||
};
|
||||
|
||||
let mod_timestamp = match bytes.get((36 + 1)..(size_of::<i64>() + 36 + 1)) {
|
||||
Some(bytes) => bytes.try_into().map(i64::from_be_bytes).unwrap(),
|
||||
None => return Err("invalid LifecycleContent: cannot extract mod_time timestamp".into()),
|
||||
};
|
||||
|
||||
let type_ = match bytes.get(size_of::<i64>() + 36 + 1) {
|
||||
Some(&0) => LifecycleType::ExpiryCurrent,
|
||||
Some(&1) => LifecycleType::ExpiryNoncurrent,
|
||||
Some(&2) => LifecycleType::TransitionCurrent,
|
||||
Some(&3) => LifecycleType::TransitionNoncurrent,
|
||||
Some(_) => return Err("invalid LifecycleContent: invalid LifecycleType".into()),
|
||||
None => return Err("invalid LifecycleContent: cannot extract LifecycleType".into()),
|
||||
};
|
||||
|
||||
let object_name = match bytes.get((size_of::<i64>() + 36 + 1 + 1)..) {
|
||||
Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() },
|
||||
None => return Err("invalid LifecycleContent: cannot extract object_name".into()),
|
||||
};
|
||||
|
||||
Ok(LifecycleContent {
|
||||
ver_no,
|
||||
ver_id,
|
||||
mod_time: OffsetDateTime::from_unix_timestamp(mod_timestamp).unwrap(),
|
||||
type_,
|
||||
object_name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
mod serial_tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
//#[ignore]
|
||||
async fn test_lifecycle_chche_build() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
// Create test bucket and object
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-cache-{}", &suffix[..8]);
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
|
||||
Ok(outcome) => outcome,
|
||||
Err(err) => {
|
||||
warn!("Local usage scan failed: {}", err);
|
||||
LocalScanOutcome::default()
|
||||
}
|
||||
};
|
||||
let bucket_objects_map = &scan_outcome.bucket_objects;
|
||||
|
||||
let records = match bucket_objects_map.get(&bucket_name) {
|
||||
Some(records) => records,
|
||||
None => {
|
||||
debug!("No local snapshot entries found for bucket {}; skipping lifecycle/integrity", bucket_name);
|
||||
&vec![]
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(lmdb_env) = GLOBAL_LMDB_ENV.get() {
|
||||
if let Some(lmdb) = GLOBAL_LMDB_DB.get() {
|
||||
let mut wtxn = lmdb_env.write_txn().unwrap();
|
||||
|
||||
/*if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
|
||||
if let Ok(object_info) = ecstore
|
||||
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
|
||||
&lc_config,
|
||||
None,
|
||||
None,
|
||||
&object_info,
|
||||
)
|
||||
.await;
|
||||
|
||||
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
|
||||
ecstore.clone(),
|
||||
&object_info,
|
||||
&event,
|
||||
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
|
||||
)
|
||||
.await;
|
||||
|
||||
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
|
||||
}
|
||||
}*/
|
||||
|
||||
for record in records {
|
||||
if !record.usage.has_live_object {
|
||||
continue;
|
||||
}
|
||||
|
||||
let object_info = convert_record_to_object_info(record);
|
||||
println!("object_info2: {object_info:?}");
|
||||
let mod_time = object_info.mod_time.unwrap_or(OffsetDateTime::now_utc());
|
||||
let expiry_time = rustfs_ecstore::bucket::lifecycle::lifecycle::expected_expiry_time(mod_time, 1);
|
||||
|
||||
let version_id = if let Some(version_id) = object_info.version_id {
|
||||
version_id.to_string()
|
||||
} else {
|
||||
"zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz".to_string()
|
||||
};
|
||||
|
||||
lmdb.put(
|
||||
&mut wtxn,
|
||||
&expiry_time.unix_timestamp(),
|
||||
&LifecycleContent {
|
||||
ver_no: 0,
|
||||
ver_id: version_id,
|
||||
mod_time,
|
||||
type_: LifecycleType::TransitionNoncurrent,
|
||||
object_name: object_info.name,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
let mut wtxn = lmdb_env.write_txn().unwrap();
|
||||
let iter = lmdb.iter_mut(&mut wtxn).unwrap();
|
||||
//let _ = unsafe { iter.del_current().unwrap() };
|
||||
for row in iter {
|
||||
if let Ok(ref elm) = row {
|
||||
let LifecycleContent {
|
||||
ver_no,
|
||||
ver_id,
|
||||
mod_time,
|
||||
type_,
|
||||
object_name,
|
||||
} = &elm.1;
|
||||
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
|
||||
//eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await;
|
||||
eval_inner(
|
||||
&lifecycle::ObjectOpts {
|
||||
name: oi.name.clone(),
|
||||
user_tags: oi.user_tags.clone(),
|
||||
version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(),
|
||||
mod_time: oi.mod_time,
|
||||
size: oi.size as usize,
|
||||
is_latest: oi.is_latest,
|
||||
num_versions: oi.num_versions,
|
||||
delete_marker: oi.delete_marker,
|
||||
successor_mod_time: oi.successor_mod_time,
|
||||
restore_ongoing: oi.restore_ongoing,
|
||||
restore_expires: oi.restore_expires,
|
||||
transition_status: oi.transitioned_object.status.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
OffsetDateTime::now_utc(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
println!("row:{row:?}");
|
||||
}
|
||||
//drop(iter);
|
||||
wtxn.commit().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
println!("Lifecycle cache test completed");
|
||||
}
|
||||
}
|
||||
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
|
||||
let mut events = Vec::<Event>::new();
|
||||
info!(
|
||||
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
|
||||
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
|
||||
);
|
||||
if obj.mod_time.expect("err").unix_timestamp() == 0 {
|
||||
info!("eval_inner: mod_time is 0, returning default event");
|
||||
return Event::default();
|
||||
}
|
||||
|
||||
if let Some(restore_expires) = obj.restore_expires {
|
||||
if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() {
|
||||
let mut action = IlmAction::DeleteRestoredAction;
|
||||
if !obj.is_latest {
|
||||
action = IlmAction::DeleteRestoredVersionAction;
|
||||
}
|
||||
|
||||
events.push(Event {
|
||||
action,
|
||||
due: Some(now),
|
||||
rule_id: "".into(),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref lc_rules) = self.filter_rules(obj).await {
|
||||
for rule in lc_rules.iter() {
|
||||
if obj.expired_object_deletemarker() {
|
||||
if let Some(expiration) = rule.expiration.as_ref() {
|
||||
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(now),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(days) = expiration.days {
|
||||
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(expected_expiry),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if obj.is_latest {
|
||||
if let Some(ref expiration) = rule.expiration {
|
||||
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
|
||||
if obj.delete_marker && expired_object_delete_marker {
|
||||
let due = expiration.next_due(obj);
|
||||
if let Some(due) = due {
|
||||
if now.unix_timestamp() >= due.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DelMarkerDeleteAllVersionsAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(due),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
|
||||
if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions {
|
||||
if newer_noncurrent_versions > 0 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
|
||||
if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days {
|
||||
if noncurrent_days != 0 {
|
||||
if let Some(successor_mod_time) = obj.successor_mod_time {
|
||||
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(expected_expiry),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.is_latest {
|
||||
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
|
||||
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
|
||||
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
|
||||
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
|
||||
if let Some(due0) = due {
|
||||
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::TransitionVersionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due,
|
||||
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
|
||||
.storage_class
|
||||
.clone()
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
|
||||
obj.is_latest,
|
||||
obj.delete_marker,
|
||||
obj.version_id,
|
||||
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
|
||||
);
|
||||
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
|
||||
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
|
||||
info!("eval_inner: entering expiration check");
|
||||
if let Some(ref expiration) = rule.expiration {
|
||||
if let Some(ref date) = expiration.date {
|
||||
let date0 = OffsetDateTime::from(date.clone());
|
||||
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
|
||||
info!("eval_inner: expiration by date - date0={:?}", date0);
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(date0),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
});
|
||||
}
|
||||
} else if let Some(days) = expiration.days {
|
||||
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
|
||||
info!(
|
||||
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
|
||||
days,
|
||||
obj.mod_time.expect("err!"),
|
||||
expected_expiry,
|
||||
now,
|
||||
now.unix_timestamp() > expected_expiry.unix_timestamp()
|
||||
);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
info!("eval_inner: object should expire, adding DeleteAction");
|
||||
let mut event = Event {
|
||||
action: IlmAction::DeleteAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due: Some(expected_expiry),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
storage_class: "".into(),
|
||||
};
|
||||
/*if rule.expiration.expect("err!").delete_all.val {
|
||||
event.action = IlmAction::DeleteAllVersionsAction
|
||||
}*/
|
||||
events.push(event);
|
||||
}
|
||||
} else {
|
||||
info!("eval_inner: expiration.days is None");
|
||||
}
|
||||
} else {
|
||||
info!("eval_inner: rule.expiration is None");
|
||||
}
|
||||
|
||||
if obj.transition_status != TRANSITION_COMPLETE {
|
||||
if let Some(ref transitions) = rule.transitions {
|
||||
let due = transitions[0].next_due(obj);
|
||||
if let Some(due0) = due {
|
||||
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::TransitionAction,
|
||||
rule_id: rule.id.clone().expect("err!"),
|
||||
due,
|
||||
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
|
||||
noncurrent_days: 0,
|
||||
newer_noncurrent_versions: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if events.len() > 0 {
|
||||
events.sort_by(|a, b| {
|
||||
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
|
||||
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|
||||
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
|
||||
{
|
||||
match a.action {
|
||||
IlmAction::DeleteAllVersionsAction
|
||||
| IlmAction::DelMarkerDeleteAllVersionsAction
|
||||
| IlmAction::DeleteAction
|
||||
| IlmAction::DeleteVersionAction => {
|
||||
return Ordering::Less;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
match b.action {
|
||||
IlmAction::DeleteAllVersionsAction
|
||||
| IlmAction::DelMarkerDeleteAllVersionsAction
|
||||
| IlmAction::DeleteAction
|
||||
| IlmAction::DeleteVersionAction => {
|
||||
return Ordering::Greater;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
return Ordering::Less;
|
||||
}
|
||||
|
||||
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
|
||||
return Ordering::Less;
|
||||
}
|
||||
return Ordering::Greater;
|
||||
});
|
||||
return events[0].clone();
|
||||
}
|
||||
|
||||
Event::default()
|
||||
}
|
||||
444
crates/scanner/tests/lifecycle_integration_test.rs
Normal file
444
crates/scanner/tests/lifecycle_integration_test.rs
Normal file
@@ -0,0 +1,444 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use rustfs_ahm::{heal::storage::ECStoreHealStorage, init_heal_manager};
|
||||
use rustfs_ecstore::{
|
||||
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
|
||||
bucket::metadata_sys,
|
||||
disk::endpoint::Endpoint,
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
global::GLOBAL_TierConfigMgr,
|
||||
store::ECStore,
|
||||
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
|
||||
tier::tier_config::{TierConfig, TierMinIO, TierType},
|
||||
};
|
||||
use rustfs_scanner::scanner::init_data_scanner;
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::fs;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
});
|
||||
}
|
||||
|
||||
/// Test helper: Create test environment with ECStore
|
||||
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
|
||||
init_tracing();
|
||||
|
||||
// Fast path: already initialized, just clone and return
|
||||
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
|
||||
return (paths.clone(), ecstore.clone());
|
||||
}
|
||||
|
||||
// create temp dir as 4 disks with unique base dir
|
||||
let test_base_dir = format!("/tmp/rustfs_scanner_lifecycle_test_{}", uuid::Uuid::new_v4());
|
||||
let temp_dir = std::path::PathBuf::from(&test_base_dir);
|
||||
if temp_dir.exists() {
|
||||
fs::remove_dir_all(&temp_dir).await.ok();
|
||||
}
|
||||
fs::create_dir_all(&temp_dir).await.unwrap();
|
||||
|
||||
// create 4 disk dirs
|
||||
let disk_paths = vec![
|
||||
temp_dir.join("disk1"),
|
||||
temp_dir.join("disk2"),
|
||||
temp_dir.join("disk3"),
|
||||
temp_dir.join("disk4"),
|
||||
];
|
||||
|
||||
for disk_path in &disk_paths {
|
||||
fs::create_dir_all(disk_path).await.unwrap();
|
||||
}
|
||||
|
||||
// create EndpointServerPools
|
||||
let mut endpoints = Vec::new();
|
||||
for (i, disk_path) in disk_paths.iter().enumerate() {
|
||||
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
|
||||
// set correct index
|
||||
endpoint.set_pool_index(0);
|
||||
endpoint.set_set_index(0);
|
||||
endpoint.set_disk_index(i);
|
||||
endpoints.push(endpoint);
|
||||
}
|
||||
|
||||
let pool_endpoints = PoolEndpoints {
|
||||
legacy: false,
|
||||
set_count: 1,
|
||||
drives_per_set: 4,
|
||||
endpoints: Endpoints::from(endpoints),
|
||||
cmd_line: "test".to_string(),
|
||||
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
|
||||
};
|
||||
|
||||
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
|
||||
|
||||
// format disks (only first time)
|
||||
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
|
||||
|
||||
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
|
||||
let port = 9002; // for simplicity
|
||||
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
|
||||
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// init bucket metadata system
|
||||
let buckets_list = ecstore
|
||||
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
|
||||
no_metadata: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
|
||||
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
|
||||
|
||||
// Initialize background expiry workers
|
||||
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await;
|
||||
|
||||
// Store in global once lock
|
||||
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
|
||||
|
||||
(disk_paths, ecstore)
|
||||
}
|
||||
|
||||
/// Test helper: Create a test bucket
|
||||
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
|
||||
(**ecstore)
|
||||
.make_bucket(bucket_name, &Default::default())
|
||||
.await
|
||||
.expect("Failed to create test bucket");
|
||||
info!("Created test bucket: {}", bucket_name);
|
||||
}
|
||||
|
||||
/// Test helper: Create a test lock bucket
|
||||
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
|
||||
(**ecstore)
|
||||
.make_bucket(
|
||||
bucket_name,
|
||||
&MakeBucketOptions {
|
||||
lock_enabled: true,
|
||||
versioning_enabled: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("Failed to create test bucket");
|
||||
info!("Created test bucket: {}", bucket_name);
|
||||
}
|
||||
|
||||
/// Test helper: Upload test object
|
||||
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
|
||||
let mut reader = PutObjReader::from_vec(data.to_vec());
|
||||
let object_info = (**ecstore)
|
||||
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
|
||||
.await
|
||||
.expect("Failed to upload test object");
|
||||
|
||||
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
|
||||
}
|
||||
|
||||
/// Test helper: Set bucket lifecycle configuration
|
||||
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LifecycleConfiguration>
|
||||
<Rule>
|
||||
<ID>test-rule</ID>
|
||||
<Status>Enabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<Expiration>
|
||||
<Days>0</Days>
|
||||
</Expiration>
|
||||
</Rule>
|
||||
</LifecycleConfiguration>"#;
|
||||
|
||||
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test helper: Set bucket lifecycle configuration
|
||||
async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LifecycleConfiguration>
|
||||
<Rule>
|
||||
<ID>test-rule</ID>
|
||||
<Status>Enabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<Expiration>
|
||||
<Days>0</Days>
|
||||
<ExpiredObjectDeleteMarker>true</ExpiredObjectDeleteMarker>
|
||||
</Expiration>
|
||||
</Rule>
|
||||
</LifecycleConfiguration>"#;
|
||||
|
||||
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LifecycleConfiguration>
|
||||
<Rule>
|
||||
<ID>test-rule</ID>
|
||||
<Status>Enabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<Transition>
|
||||
<Days>0</Days>
|
||||
<StorageClass>COLDTIER44</StorageClass>
|
||||
</Transition>
|
||||
</Rule>
|
||||
<Rule>
|
||||
<ID>test-rule2</ID>
|
||||
<Status>Disabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<NoncurrentVersionTransition>
|
||||
<NoncurrentDays>0</NoncurrentDays>
|
||||
<StorageClass>COLDTIER44</StorageClass>
|
||||
</NoncurrentVersionTransition>
|
||||
</Rule>
|
||||
</LifecycleConfiguration>"#;
|
||||
|
||||
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test helper: Create a test tier
|
||||
#[allow(dead_code)]
|
||||
async fn create_test_tier(server: u32) {
|
||||
let args = TierConfig {
|
||||
version: "v1".to_string(),
|
||||
tier_type: TierType::MinIO,
|
||||
name: "COLDTIER44".to_string(),
|
||||
s3: None,
|
||||
aliyun: None,
|
||||
tencent: None,
|
||||
huaweicloud: None,
|
||||
azure: None,
|
||||
gcs: None,
|
||||
r2: None,
|
||||
rustfs: None,
|
||||
minio: if server == 1 {
|
||||
Some(TierMinIO {
|
||||
access_key: "minioadmin".to_string(),
|
||||
secret_key: "minioadmin".to_string(),
|
||||
bucket: "hello".to_string(),
|
||||
endpoint: "http://39.105.198.204:9000".to_string(),
|
||||
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
|
||||
region: "".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
} else if server == 2 {
|
||||
Some(TierMinIO {
|
||||
access_key: "minioadmin".to_string(),
|
||||
secret_key: "minioadmin".to_string(),
|
||||
bucket: "mblock2".to_string(),
|
||||
endpoint: "http://m1ddns.pvtool.com:9020".to_string(),
|
||||
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
|
||||
region: "".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
Some(TierMinIO {
|
||||
access_key: "minioadmin".to_string(),
|
||||
secret_key: "minioadmin".to_string(),
|
||||
bucket: "mblock2".to_string(),
|
||||
endpoint: "http://127.0.0.1:9020".to_string(),
|
||||
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
|
||||
region: "".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
},
|
||||
};
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
if let Err(err) = tier_config_mgr.add(args, false).await {
|
||||
println!("tier_config_mgr add failed, e: {err:?}");
|
||||
panic!("tier add failed. {err}");
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
println!("tier_config_mgr save failed, e: {e:?}");
|
||||
panic!("tier save failed");
|
||||
}
|
||||
println!("Created test tier: COLDTIER44");
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
Ok(info) => !info.delete_marker,
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
#[allow(dead_code)]
|
||||
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
println!("oi: {oi:?}");
|
||||
oi.delete_marker
|
||||
} else {
|
||||
println!("object_is_delete_marker is error");
|
||||
panic!("object_is_delete_marker is error");
|
||||
}
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
#[allow(dead_code)]
|
||||
async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
println!("oi: {oi:?}");
|
||||
!oi.transitioned_object.status.is_empty()
|
||||
} else {
|
||||
println!("object_is_transitioned is error");
|
||||
panic!("object_is_transitioned is error");
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_object_absence(ecstore: &Arc<ECStore>, bucket: &str, object: &str, timeout: Duration) -> bool {
|
||||
let deadline = tokio::time::Instant::now() + timeout;
|
||||
|
||||
loop {
|
||||
if !object_exists(ecstore, bucket, object).await {
|
||||
return true;
|
||||
}
|
||||
|
||||
if tokio::time::Instant::now() >= deadline {
|
||||
return false;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
|
||||
mod serial_tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial]
|
||||
#[ignore]
|
||||
async fn test_lifecycle_transition_basic() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
create_test_tier(2).await;
|
||||
|
||||
// Create test bucket and object
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-transition-{}", &suffix[..8]);
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(
|
||||
&ecstore,
|
||||
bucket_name.as_str(),
|
||||
object_name,
|
||||
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
|
||||
)
|
||||
.await;
|
||||
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
|
||||
set_bucket_lifecycle_transition(bucket_name.as_str())
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
|
||||
|
||||
// Verify lifecycle configuration was set
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await {
|
||||
Ok(bucket_meta) => {
|
||||
assert!(bucket_meta.lifecycle_config.is_some());
|
||||
println!("✅ Bucket metadata retrieved successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
println!("❌ Error retrieving bucket metadata: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = CancellationToken::new();
|
||||
|
||||
// Start scanner
|
||||
let heal_storage = Arc::new(ECStoreHealStorage::new(ecstore.clone()));
|
||||
init_heal_manager(heal_storage, None).await;
|
||||
init_data_scanner(ctx.clone(), ecstore.clone()).await;
|
||||
println!("✅ Scanner started");
|
||||
|
||||
// Wait for scanner to process lifecycle rules
|
||||
tokio::time::sleep(Duration::from_secs(1200)).await;
|
||||
|
||||
// Check if object has been expired (deleted)
|
||||
let check_result = object_is_transitioned(&ecstore, &bucket_name, object_name).await;
|
||||
println!("Object exists after lifecycle processing: {check_result}");
|
||||
|
||||
if check_result {
|
||||
println!("✅ Object was transitioned by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(obj_info) => {
|
||||
println!(
|
||||
"Object info: name={}, size={}, mod_time={:?}",
|
||||
obj_info.name, obj_info.size, obj_info.mod_time
|
||||
);
|
||||
println!("Object info: transitioned_object={:?}", obj_info.transitioned_object);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error getting object info: {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("❌ Object was not transitioned by lifecycle processing");
|
||||
}
|
||||
|
||||
assert!(check_result);
|
||||
println!("✅ Object successfully transitioned");
|
||||
|
||||
// Stop scanner
|
||||
ctx.cancel();
|
||||
println!("✅ Scanner stopped");
|
||||
|
||||
println!("Lifecycle transition basic test completed");
|
||||
}
|
||||
}
|
||||
@@ -1071,76 +1071,62 @@ impl S3 for FS {
|
||||
version_id,
|
||||
..
|
||||
} = req.input.clone();
|
||||
let rreq = rreq.unwrap();
|
||||
|
||||
/*if let Err(e) = un_escape_path(object) {
|
||||
warn!("post restore object failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
|
||||
}*/
|
||||
let rreq = rreq.ok_or_else(|| {
|
||||
S3Error::with_message(S3ErrorCode::Custom("ErrValidRestoreObject".into()), "restore request is required")
|
||||
})?;
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
/*if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
|
||||
}*/
|
||||
let version_id_str = version_id.unwrap_or_default();
|
||||
let opts = post_restore_opts(&version_id_str, &bucket, &object)
|
||||
.await
|
||||
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrPostRestoreOpts".into()), "restore object failed."))?;
|
||||
|
||||
/*if req.content_length <= 0 {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
|
||||
}*/
|
||||
let Ok(opts) = post_restore_opts(&version_id.unwrap(), &bucket, &object).await else {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
|
||||
"post restore object failed",
|
||||
));
|
||||
};
|
||||
|
||||
let Ok(mut obj_info) = store.get_object_info(&bucket, &object, &opts).await else {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
|
||||
"post restore object failed",
|
||||
));
|
||||
};
|
||||
let mut obj_info = store
|
||||
.get_object_info(&bucket, &object, &opts)
|
||||
.await
|
||||
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "restore object failed."))?;
|
||||
|
||||
// Check if object is in a transitioned state
|
||||
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
|
||||
"post restore object failed",
|
||||
S3ErrorCode::Custom("ErrInvalidTransitionedState".into()),
|
||||
"restore object failed.",
|
||||
));
|
||||
}
|
||||
|
||||
//let mut api_err;
|
||||
let mut _status_code = StatusCode::OK;
|
||||
let mut already_restored = false;
|
||||
if let Err(_err) = rreq.validate(store.clone()) {
|
||||
//api_err = to_api_err(ErrMalformedXML);
|
||||
//api_err.description = err.to_string();
|
||||
// Validate restore request
|
||||
rreq.validate(store.clone()).map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::Custom("ErrValidRestoreObject".into()), "restore object validation failed")
|
||||
})?;
|
||||
|
||||
// Check if restore is already in progress
|
||||
if obj_info.restore_ongoing && (rreq.type_.as_ref().is_none_or(|t| t.as_str() != "SELECT")) {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
|
||||
"post restore object failed",
|
||||
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
|
||||
"restore object failed.",
|
||||
));
|
||||
} else {
|
||||
if obj_info.restore_ongoing && (rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT") {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
|
||||
"post restore object failed",
|
||||
));
|
||||
}
|
||||
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
|
||||
_status_code = StatusCode::ACCEPTED;
|
||||
}
|
||||
|
||||
let mut already_restored = false;
|
||||
if let Some(restore_expires) = obj_info.restore_expires {
|
||||
if !obj_info.restore_ongoing && restore_expires.unix_timestamp() != 0 {
|
||||
already_restored = true;
|
||||
}
|
||||
}
|
||||
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap());
|
||||
|
||||
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap_or(&1));
|
||||
let mut metadata = obj_info.user_defined.clone();
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
|
||||
let obj_info_ = obj_info.clone();
|
||||
if rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT" {
|
||||
if rreq.type_.as_ref().is_none_or(|t| t.as_str() != "SELECT") {
|
||||
obj_info.metadata_only = true;
|
||||
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap().to_string());
|
||||
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap_or(1).to_string());
|
||||
metadata.insert(AMZ_RESTORE_REQUEST_DATE.to_string(), OffsetDateTime::now_utc().format(&Rfc3339).unwrap());
|
||||
if already_restored {
|
||||
metadata.insert(
|
||||
@@ -1162,7 +1148,8 @@ impl S3 for FS {
|
||||
);
|
||||
}
|
||||
obj_info.user_defined = metadata;
|
||||
if let Err(_err) = store
|
||||
|
||||
store
|
||||
.clone()
|
||||
.copy_object(
|
||||
&bucket,
|
||||
@@ -1181,12 +1168,8 @@ impl S3 for FS {
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
|
||||
"post restore object failed",
|
||||
));
|
||||
}
|
||||
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrCopyObject".into()), "restore object failed"))?;
|
||||
|
||||
if already_restored {
|
||||
let output = RestoreObjectOutput {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
@@ -1196,11 +1179,11 @@ impl S3 for FS {
|
||||
}
|
||||
}
|
||||
|
||||
let restore_object = Uuid::new_v4().to_string();
|
||||
//if let Some(rreq) = rreq {
|
||||
// Handle output location for SELECT requests
|
||||
if let Some(output_location) = &rreq.output_location {
|
||||
if let Some(s3) = &output_location.s3 {
|
||||
if !s3.bucket_name.is_empty() {
|
||||
let restore_object = Uuid::new_v4().to_string();
|
||||
header.insert(
|
||||
X_AMZ_RESTORE_OUTPUT_PATH,
|
||||
format!("{}{}{}", s3.bucket_name, s3.prefix, restore_object).parse().unwrap(),
|
||||
@@ -1208,86 +1191,40 @@ impl S3 for FS {
|
||||
}
|
||||
}
|
||||
}
|
||||
//}
|
||||
/*send_event(EventArgs {
|
||||
event_name: event::ObjectRestorePost,
|
||||
bucket_name: bucket,
|
||||
object: obj_info,
|
||||
req_params: extract_req_params(r),
|
||||
user_agent: req.user_agent(),
|
||||
host: handlers::get_source_ip(r),
|
||||
});*/
|
||||
tokio::spawn(async move {
|
||||
/*if rreq.select_parameters.is_some() {
|
||||
let actual_size = obj_info_.get_actual_size();
|
||||
if actual_size.is_err() {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
|
||||
}
|
||||
|
||||
let object_rsc = s3select.new_object_read_seek_closer(
|
||||
|offset: i64| -> (ReadCloser, error) {
|
||||
rs := &HTTPRangeSpec{
|
||||
IsSuffixLength: false,
|
||||
Start: offset,
|
||||
End: -1,
|
||||
}
|
||||
return get_transitioned_object_reader(bucket, object, rs, r.Header,
|
||||
obj_info, ObjectOptions {version_id: obj_info_.version_id});
|
||||
},
|
||||
actual_size.unwrap(),
|
||||
);
|
||||
if err = rreq.select_parameters.open(object_rsc); err != nil {
|
||||
if serr, ok := err.(s3select.SelectError); ok {
|
||||
let encoded_error_response = encodeResponse(APIErrorResponse {
|
||||
code: serr.ErrorCode(),
|
||||
message: serr.ErrorMessage(),
|
||||
bucket_name: bucket,
|
||||
key: object,
|
||||
resource: r.URL.Path,
|
||||
request_id: w.Header().Get(xhttp.AmzRequestID),
|
||||
host_id: globalDeploymentID(),
|
||||
});
|
||||
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
|
||||
} else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
let nr = httptest.NewRecorder();
|
||||
let rw = xhttp.NewResponseRecorder(nr);
|
||||
rw.log_err_body = true;
|
||||
rw.log_all_body = true;
|
||||
rreq.select_parameters.evaluate(rw);
|
||||
rreq.select_parameters.Close();
|
||||
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
|
||||
}*/
|
||||
// Spawn restoration task in the background
|
||||
let store_clone = store.clone();
|
||||
let bucket_clone = bucket.clone();
|
||||
let object_clone = object.clone();
|
||||
let rreq_clone = rreq.clone();
|
||||
let obj_info_clone = obj_info_.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let opts = ObjectOptions {
|
||||
transition: TransitionOptions {
|
||||
restore_request: rreq,
|
||||
restore_request: rreq_clone,
|
||||
restore_expiry,
|
||||
..Default::default()
|
||||
},
|
||||
version_id: obj_info_.version_id.map(|e| e.to_string()),
|
||||
version_id: obj_info_clone.version_id.map(|e| e.to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
if let Err(err) = store.clone().restore_transitioned_object(&bucket, &object, &opts).await {
|
||||
warn!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string());
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("ErrRestoreTransitionedObject".into()),
|
||||
format!("unable to restore transitioned bucket/object {bucket}/{object}: {err}"),
|
||||
));
|
||||
}
|
||||
|
||||
/*send_event(EventArgs {
|
||||
EventName: event.ObjectRestoreCompleted,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
ReqParams: extractReqParams(r),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
});*/
|
||||
Ok(())
|
||||
if let Err(err) = store_clone
|
||||
.restore_transitioned_object(&bucket_clone, &object_clone, &opts)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"unable to restore transitioned bucket/object {}/{}: {}",
|
||||
bucket_clone,
|
||||
object_clone,
|
||||
err.to_string()
|
||||
);
|
||||
// Note: Errors from background tasks cannot be returned to client
|
||||
// Consider adding to monitoring/metrics system
|
||||
} else {
|
||||
info!("successfully restored transitioned object: {}/{}", bucket_clone, object_clone);
|
||||
}
|
||||
});
|
||||
|
||||
let output = RestoreObjectOutput {
|
||||
|
||||
1
rustfs/tests/lifecycle_ansible_test.sh
Normal file
1
rustfs/tests/lifecycle_ansible_test.sh
Normal file
@@ -0,0 +1 @@
|
||||
ansible-playbook rustfs/tests/lifecycle_ansible_test.yml
|
||||
15
rustfs/tests/lifecycle_ansible_test.yml
Normal file
15
rustfs/tests/lifecycle_ansible_test.yml
Normal file
@@ -0,0 +1,15 @@
|
||||
---
|
||||
- hosts: webservers
|
||||
remote_user: leafcolor
|
||||
vars:
|
||||
ansible_ssh_pass: "123456"
|
||||
tasks:
|
||||
- name: Configure S3 bucket lifecycle
|
||||
community.aws.s3_lifecycle:
|
||||
endpoint_url: "http://m1ddns.pvtool.com:9000"
|
||||
access_key: "rustfsadmin"
|
||||
secret_key: "rustfsadmin"
|
||||
name: "mblock"
|
||||
rule_id: "rule1"
|
||||
state: present
|
||||
expiration_days: "1"
|
||||
120
rustfs/tests/lifecycle_miniosdk_test.py
Normal file
120
rustfs/tests/lifecycle_miniosdk_test.py
Normal file
@@ -0,0 +1,120 @@
|
||||
# Copyright 2024 RustFS Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from datetime import timedelta
|
||||
import uuid
|
||||
from fastapi import UploadFile
|
||||
from minio import Minio
|
||||
from minio.commonconfig import Filter
|
||||
from minio.error import S3Error
|
||||
from minio.lifecycleconfig import Expiration, LifecycleConfig, Rule
|
||||
import os
|
||||
from collections import namedtuple
|
||||
|
||||
Settings = namedtuple("Settings", ['oss_endpoint',
|
||||
'oss_access_key',
|
||||
'oss_secret_key',
|
||||
'oss_bucket_name',
|
||||
'oss_lifecycle_days',
|
||||
'oss_secure',
|
||||
'oss_region'])
|
||||
settings = Settings(
|
||||
oss_endpoint = "m1ddns.pvtool.com:9000",
|
||||
oss_access_key = "rustfsadmin",
|
||||
oss_secret_key = "rustfsadmin",
|
||||
oss_bucket_name = "mblock99",
|
||||
oss_lifecycle_days = 1,
|
||||
oss_secure = False,
|
||||
oss_region = ""
|
||||
)
|
||||
|
||||
class OSS:
|
||||
def __init__(self):
|
||||
self.bucket_name = settings.oss_bucket_name
|
||||
self.lifecycle_days = settings.oss_lifecycle_days
|
||||
self.client = Minio(
|
||||
endpoint=settings.oss_endpoint,
|
||||
access_key=settings.oss_access_key,
|
||||
secret_key=settings.oss_secret_key,
|
||||
secure=settings.oss_secure,
|
||||
region=settings.oss_region,
|
||||
)
|
||||
|
||||
def new_uuid(self):
|
||||
return str(uuid.uuid4())
|
||||
|
||||
def create_bucket(self):
|
||||
found = self.client.bucket_exists(self.bucket_name)
|
||||
if not found:
|
||||
try:
|
||||
self.client.make_bucket(self.bucket_name)
|
||||
self.set_lifecycle_expiration(days=self.lifecycle_days)
|
||||
print(f"Bucket {self.bucket_name} created successfully")
|
||||
except S3Error as exc:
|
||||
if exc.code == "BucketAlreadyOwnedByYou":
|
||||
print(f"Bucket {self.bucket_name} already owned by you; continuing")
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
print(f"Bucket {self.bucket_name} already exists")
|
||||
|
||||
def set_lifecycle_expiration(self,days: int = 1, prefix: str = "") -> None:
|
||||
"""
|
||||
设置按天自动过期删除(MinIO 按天、每天巡检一次;<1天不生效)
|
||||
"""
|
||||
rule_filter = Filter(prefix=prefix or "")
|
||||
rule = Rule(
|
||||
rule_id=f"expire-{prefix or 'all'}-{days}d",
|
||||
status="Enabled",
|
||||
rule_filter=rule_filter,
|
||||
expiration=Expiration(days=int(days)),
|
||||
)
|
||||
cfg = LifecycleConfig([rule])
|
||||
self.client.set_bucket_lifecycle(self.bucket_name, cfg)
|
||||
|
||||
def upload_file(self, file: UploadFile):
|
||||
"""
|
||||
上传文件到OSS,返回文件的UUID
|
||||
"""
|
||||
ext = os.path.splitext(file.filename)[1]
|
||||
uuid = self.new_uuid()
|
||||
filename = f'{uuid}{ext}'
|
||||
file.file.seek(0)
|
||||
self.client.put_object(
|
||||
self.bucket_name,
|
||||
filename,
|
||||
file.file,
|
||||
length=-1,
|
||||
part_size=10*1024*1024,
|
||||
)
|
||||
return filename
|
||||
|
||||
def get_presigned_url(self, filename: str):
|
||||
"""
|
||||
获取文件的预签名URL,用于下载文件
|
||||
"""
|
||||
return self.client.presigned_get_object(
|
||||
self.bucket_name, filename, expires=timedelta(days=self.lifecycle_days)
|
||||
)
|
||||
|
||||
|
||||
def get_oss():
|
||||
"""
|
||||
获取OSS实例
|
||||
"""
|
||||
return OSS()
|
||||
|
||||
if __name__ == "__main__":
|
||||
oss = get_oss()
|
||||
oss.create_bucket()
|
||||
Reference in New Issue
Block a user