From e393add5ee312d81ef7174a97c23615d6cec7dfd Mon Sep 17 00:00:00 2001 From: likewu Date: Wed, 7 Jan 2026 11:58:35 +0800 Subject: [PATCH] lifecycle test --- Cargo.lock | 1 + .../ahm/tests/lifecycle_integration_test.rs | 10 +- .../ecstore/src/bucket/lifecycle/lifecycle.rs | 67 +- crates/ecstore/src/pools.rs | 2 + crates/scanner/Cargo.toml | 1 + crates/scanner/src/scanner_folder.rs | 6 +- crates/scanner/src/scanner_io.rs | 2 +- crates/scanner/tests/lifecycle_cache_test.rs | 786 ++++++++++++++++++ .../tests/lifecycle_integration_test.rs | 441 ++++++++++ 9 files changed, 1280 insertions(+), 36 deletions(-) create mode 100644 crates/scanner/tests/lifecycle_cache_test.rs create mode 100644 crates/scanner/tests/lifecycle_integration_test.rs diff --git a/Cargo.lock b/Cargo.lock index 155c7eef..b95ae68a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7614,6 +7614,7 @@ dependencies = [ "async-trait", "chrono", "futures", + "heed", "http 1.4.0", "path-clean", "rand 0.10.0-rc.5", diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index 05186e0f..86fa2679 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -608,7 +608,7 @@ mod serial_tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial] - //#[ignore] + #[ignore] async fn test_lifecycle_transition_basic() { let (_disk_paths, ecstore) = setup_test_env().await; @@ -621,7 +621,13 @@ mod serial_tests { let test_data = b"Hello, this is test data for lifecycle expiry!"; create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; - upload_test_object(&ecstore, bucket_name.as_str(), object_name, b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !").await; + upload_test_object( + &ecstore, + bucket_name.as_str(), + object_name, + b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !", + ) + .await; //create_test_bucket(&ecstore, bucket_name.as_str()).await; upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await; diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index 9666699b..37136919 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -21,7 +21,7 @@ use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType}; use s3s::dto::{ BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition, - ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition, + ObjectLockConfiguration, ObjectLockEnabled, Prefix, RestoreRequest, Transition, }; use std::cmp::Ordering; use std::collections::HashMap; @@ -173,44 +173,51 @@ impl Lifecycle for BucketLifecycleConfiguration { continue; } - let rule_prefix = rule.prefix.as_ref().expect("err!"); + let rule_prefix = &rule.prefix.clone().unwrap_or_default(); if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix) { continue; } - let rule_noncurrent_version_expiration = rule.noncurrent_version_expiration.as_ref().expect("err!"); - if rule_noncurrent_version_expiration.noncurrent_days.expect("err!") > 0 { + if let Some(rule_noncurrent_version_expiration) = &rule.noncurrent_version_expiration { + if let Some(noncurrent_days) = rule_noncurrent_version_expiration.noncurrent_days { + if noncurrent_days > 0 { + return true; + } + } + if let Some(newer_noncurrent_versions) = rule_noncurrent_version_expiration.newer_noncurrent_versions { + if newer_noncurrent_versions > 0 { + return true; + } + } + } + if rule.noncurrent_version_transitions.is_some() { return true; } - if rule_noncurrent_version_expiration.newer_noncurrent_versions.expect("err!") > 0 { - return true; + if let Some(rule_expiration) = &rule.expiration { + if let Some(date1) = rule_expiration.date.clone() { + if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() { + return true; + } + } + if rule_expiration.date.is_some() { + return true; + } + if let Some(expired_object_delete_marker) = rule_expiration.expired_object_delete_marker + && expired_object_delete_marker + { + return true; + } } - if !rule.noncurrent_version_transitions.is_none() { - return true; + if let Some(rule_transitions) = &rule.transitions { + let rule_transitions_0 = rule_transitions[0].clone(); + if let Some(date1) = rule_transitions_0.date { + if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() { + return true; + } + } } - let rule_expiration = rule.expiration.as_ref().expect("err!"); - if !rule_expiration.date.is_none() - && OffsetDateTime::from(rule_expiration.date.clone().expect("err!")).unix_timestamp() - < OffsetDateTime::now_utc().unix_timestamp() - { - return true; - } - if !rule_expiration.date.is_none() { - return true; - } - if rule_expiration.expired_object_delete_marker.expect("err!") { - return true; - } - let rule_transitions: &[Transition] = &rule.transitions.as_ref().expect("err!"); - let rule_transitions_0 = rule_transitions[0].clone(); - if !rule_transitions_0.date.is_none() - && OffsetDateTime::from(rule_transitions_0.date.expect("err!")).unix_timestamp() - < OffsetDateTime::now_utc().unix_timestamp() - { - return true; - } - if !rule.transitions.is_none() { + if rule.transitions.is_some() { return true; } } diff --git a/crates/ecstore/src/pools.rs b/crates/ecstore/src/pools.rs index f2e0d19d..0845d053 100644 --- a/crates/ecstore/src/pools.rs +++ b/crates/ecstore/src/pools.rs @@ -452,6 +452,8 @@ pub fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (Strin .strip_prefix(SLASH_SEPARATOR) .unwrap_or(path); // Find the position of the first '/' + #[cfg(windows)] + let trimmed_path = trimmed_path.replace('\\', "/"); let Some(pos) = trimmed_path.find(SLASH_SEPARATOR) else { return (trimmed_path.to_string(), "".to_string()); }; diff --git a/crates/scanner/Cargo.toml b/crates/scanner/Cargo.toml index 3257b3b4..f4de56de 100644 --- a/crates/scanner/Cargo.toml +++ b/crates/scanner/Cargo.toml @@ -59,3 +59,4 @@ tokio-test = { workspace = true } tracing-subscriber = { workspace = true } tempfile = { workspace = true } serial_test = { workspace = true } +heed = { workspace = true } \ No newline at end of file diff --git a/crates/scanner/src/scanner_folder.rs b/crates/scanner/src/scanner_folder.rs index 83922cbb..bf02e5ea 100644 --- a/crates/scanner/src/scanner_folder.rs +++ b/crates/scanner/src/scanner_folder.rs @@ -44,7 +44,7 @@ use rustfs_ecstore::pools::{path2_bucket_object, path2_bucket_object_with_base_p use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete}; use rustfs_ecstore::store_utils::is_reserved_or_invalid_bucket; use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ReplicationStatusType}; -use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf}; +use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf, path_to_bucket_object_with_base_path}; use s3s::dto::{BucketLifecycleConfiguration, ObjectLockConfiguration}; use tokio::select; use tokio::sync::mpsc; @@ -554,9 +554,9 @@ impl FolderScanner { let file_path = entry.path().to_string_lossy().to_string(); - let trim_dir_name = file_path.strip_prefix(&dir_path).unwrap_or(&file_path); + //let trim_dir_name = file_path.strip_prefix(&dir_path).unwrap_or(&file_path); - let entry_name = path_join_buf(&[&folder.name, trim_dir_name]); + let entry_name = path_join_buf(&[&folder.name, &file_name]); if entry_name.is_empty() || entry_name == folder.name { debug!("scan_folder: done for now entry_name is empty or equals folder name"); diff --git a/crates/scanner/src/scanner_io.rs b/crates/scanner/src/scanner_io.rs index 0b33e884..50bd814b 100644 --- a/crates/scanner/src/scanner_io.rs +++ b/crates/scanner/src/scanner_io.rs @@ -272,7 +272,7 @@ impl ScannerIOCache for SetDisks { let store_clone = self.clone(); let ctx_clone = ctx.clone(); let send_update_fut = tokio::spawn(async move { - let mut ticker = tokio::time::interval(Duration::from_secs(30 + rand::random::() % 10)); + let mut ticker = tokio::time::interval(Duration::from_secs(3 + rand::random::() % 10)); let mut last_update = None; diff --git a/crates/scanner/tests/lifecycle_cache_test.rs b/crates/scanner/tests/lifecycle_cache_test.rs new file mode 100644 index 00000000..c439e9df --- /dev/null +++ b/crates/scanner/tests/lifecycle_cache_test.rs @@ -0,0 +1,786 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use heed::byteorder::BigEndian; +use heed::types::*; +use heed::{BoxedError, BytesDecode, BytesEncode, Database, DatabaseFlags, Env, EnvOpenOptions}; +use rustfs_ecstore::{ + disk::endpoint::Endpoint, + endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, + store::ECStore, + store_api::{MakeBucketOptions, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader, StorageAPI}, +}; +use rustfs_scanner::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome}; +use serial_test::serial; +use std::{ + borrow::Cow, + path::PathBuf, + sync::{Arc, Once, OnceLock}, +}; +//use heed_traits::Comparator; +use time::OffsetDateTime; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +static GLOBAL_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); +static INIT: Once = Once::new(); + +static _LIFECYCLE_EXPIRY_CURRENT_DAYS: i32 = 1; +static _LIFECYCLE_EXPIRY_NONCURRENT_DAYS: i32 = 1; +static _LIFECYCLE_TRANSITION_CURRENT_DAYS: i32 = 1; +static _LIFECYCLE_TRANSITION_NONCURRENT_DAYS: i32 = 1; +static GLOBAL_LMDB_ENV: OnceLock = OnceLock::new(); +static GLOBAL_LMDB_DB: OnceLock, LifecycleContentCodec>> = OnceLock::new(); + +fn init_tracing() { + INIT.call_once(|| { + let _ = tracing_subscriber::fmt::try_init(); + }); +} + +/// Test helper: Create test environment with ECStore +async fn setup_test_env() -> (Vec, Arc) { + init_tracing(); + + // Fast path: already initialized, just clone and return + if let Some((paths, ecstore)) = GLOBAL_ENV.get() { + return (paths.clone(), ecstore.clone()); + } + + // create temp dir as 4 disks with unique base dir + let test_base_dir = format!("/tmp/rustfs_ahm_lifecyclecache_test_{}", uuid::Uuid::new_v4()); + let temp_dir = std::path::PathBuf::from(&test_base_dir); + if temp_dir.exists() { + fs::remove_dir_all(&temp_dir).await.ok(); + } + fs::create_dir_all(&temp_dir).await.unwrap(); + + // create 4 disk dirs + let disk_paths = vec![ + temp_dir.join("disk1"), + temp_dir.join("disk2"), + temp_dir.join("disk3"), + temp_dir.join("disk4"), + ]; + + for disk_path in &disk_paths { + fs::create_dir_all(disk_path).await.unwrap(); + } + + // create EndpointServerPools + let mut endpoints = Vec::new(); + for (i, disk_path) in disk_paths.iter().enumerate() { + let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap(); + // set correct index + endpoint.set_pool_index(0); + endpoint.set_set_index(0); + endpoint.set_disk_index(i); + endpoints.push(endpoint); + } + + let pool_endpoints = PoolEndpoints { + legacy: false, + set_count: 1, + drives_per_set: 4, + endpoints: Endpoints::from(endpoints), + cmd_line: "test".to_string(), + platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH), + }; + + let endpoint_pools = EndpointServerPools(vec![pool_endpoints]); + + // format disks (only first time) + rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap(); + + // create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free + let port = 9002; // for simplicity + let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new()) + .await + .unwrap(); + + // init bucket metadata system + let buckets_list = ecstore + .list_bucket(&rustfs_ecstore::store_api::BucketOptions { + no_metadata: true, + ..Default::default() + }) + .await + .unwrap(); + let buckets = buckets_list.into_iter().map(|v| v.name).collect(); + rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await; + + //lmdb env + // User home directory + /*if let Ok(home_dir) = env::var("HOME").or_else(|_| env::var("USERPROFILE")) { + let mut path = PathBuf::from(home_dir); + path.push(format!(".{DEFAULT_LOG_FILENAME}")); + path.push(DEFAULT_LOG_DIR); + if ensure_directory_writable(&path) { + //return path; + } + }*/ + let test_lmdb_lifecycle_dir = "/tmp/lmdb_lifecycle".to_string(); + let temp_dir = std::path::PathBuf::from(&test_lmdb_lifecycle_dir); + if temp_dir.exists() { + fs::remove_dir_all(&temp_dir).await.ok(); + } + fs::create_dir_all(&temp_dir).await.unwrap(); + let lmdb_env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&test_lmdb_lifecycle_dir).unwrap() }; + let bucket_name = format!("test-lc-cache-{}", "00000"); + let mut wtxn = lmdb_env.write_txn().unwrap(); + let db = match lmdb_env + .database_options() + .name(&format!("bucket_{bucket_name}")) + .types::, LifecycleContentCodec>() + .flags(DatabaseFlags::DUP_SORT) + //.dup_sort_comparator::<>() + .create(&mut wtxn) + { + Ok(db) => db, + Err(err) => { + panic!("lmdb error: {err}"); + } + }; + let _ = wtxn.commit(); + let _ = GLOBAL_LMDB_ENV.set(lmdb_env); + let _ = GLOBAL_LMDB_DB.set(db); + + // Store in global once lock + let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone())); + + (disk_paths, ecstore) +} + +/// Test helper: Create a test bucket +#[allow(dead_code)] +async fn create_test_bucket(ecstore: &Arc, bucket_name: &str) { + (**ecstore) + .make_bucket(bucket_name, &Default::default()) + .await + .expect("Failed to create test bucket"); + info!("Created test bucket: {}", bucket_name); +} + +/// Test helper: Create a test lock bucket +async fn create_test_lock_bucket(ecstore: &Arc, bucket_name: &str) { + (**ecstore) + .make_bucket( + bucket_name, + &MakeBucketOptions { + lock_enabled: true, + versioning_enabled: true, + ..Default::default() + }, + ) + .await + .expect("Failed to create test bucket"); + info!("Created test bucket: {}", bucket_name); +} + +/// Test helper: Upload test object +async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, data: &[u8]) { + let mut reader = PutObjReader::from_vec(data.to_vec()); + let object_info = (**ecstore) + .put_object(bucket, object, &mut reader, &ObjectOptions::default()) + .await + .expect("Failed to upload test object"); + + println!("object_info1: {object_info:?}"); + + info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); +} + +/// Test helper: Check if object exists +async fn object_exists(ecstore: &Arc, bucket: &str, object: &str) -> bool { + match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await { + Ok(info) => !info.delete_marker, + Err(_) => false, + } +} + +fn ns_to_offset_datetime(ns: i128) -> Option { + OffsetDateTime::from_unix_timestamp_nanos(ns).ok() +} + +fn convert_record_to_object_info(record: &LocalObjectRecord) -> ObjectInfo { + let usage = &record.usage; + + ObjectInfo { + bucket: usage.bucket.clone(), + name: usage.object.clone(), + size: usage.total_size as i64, + delete_marker: !usage.has_live_object && usage.delete_markers_count > 0, + mod_time: usage.last_modified_ns.and_then(ns_to_offset_datetime), + ..Default::default() + } +} + +#[allow(dead_code)] +fn to_object_info( + bucket: &str, + object: &str, + total_size: i64, + delete_marker: bool, + mod_time: OffsetDateTime, + version_id: &str, +) -> ObjectInfo { + ObjectInfo { + bucket: bucket.to_string(), + name: object.to_string(), + size: total_size, + delete_marker, + mod_time: Some(mod_time), + version_id: Some(Uuid::parse_str(version_id).unwrap()), + ..Default::default() + } +} + +#[derive(Debug, PartialEq, Eq)] +enum LifecycleType { + ExpiryCurrent, + ExpiryNoncurrent, + TransitionCurrent, + TransitionNoncurrent, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct LifecycleContent { + ver_no: u8, + ver_id: String, + mod_time: OffsetDateTime, + type_: LifecycleType, + object_name: String, +} + +pub struct LifecycleContentCodec; + +impl BytesEncode<'_> for LifecycleContentCodec { + type EItem = LifecycleContent; + + fn bytes_encode(lcc: &Self::EItem) -> Result, BoxedError> { + let (ver_no_byte, ver_id_bytes, mod_timestamp_bytes, type_byte, object_name_bytes) = match lcc { + LifecycleContent { + ver_no, + ver_id, + mod_time, + type_: LifecycleType::ExpiryCurrent, + object_name, + } => ( + ver_no, + ver_id.clone().into_bytes(), + mod_time.unix_timestamp().to_be_bytes(), + 0, + object_name.clone().into_bytes(), + ), + LifecycleContent { + ver_no, + ver_id, + mod_time, + type_: LifecycleType::ExpiryNoncurrent, + object_name, + } => ( + ver_no, + ver_id.clone().into_bytes(), + mod_time.unix_timestamp().to_be_bytes(), + 1, + object_name.clone().into_bytes(), + ), + LifecycleContent { + ver_no, + ver_id, + mod_time, + type_: LifecycleType::TransitionCurrent, + object_name, + } => ( + ver_no, + ver_id.clone().into_bytes(), + mod_time.unix_timestamp().to_be_bytes(), + 2, + object_name.clone().into_bytes(), + ), + LifecycleContent { + ver_no, + ver_id, + mod_time, + type_: LifecycleType::TransitionNoncurrent, + object_name, + } => ( + ver_no, + ver_id.clone().into_bytes(), + mod_time.unix_timestamp().to_be_bytes(), + 3, + object_name.clone().into_bytes(), + ), + }; + + let mut output = Vec::::new(); + output.push(*ver_no_byte); + output.extend_from_slice(&ver_id_bytes); + output.extend_from_slice(&mod_timestamp_bytes); + output.push(type_byte); + output.extend_from_slice(&object_name_bytes); + Ok(Cow::Owned(output)) + } +} + +impl<'a> BytesDecode<'a> for LifecycleContentCodec { + type DItem = LifecycleContent; + + fn bytes_decode(bytes: &'a [u8]) -> Result { + use std::mem::size_of; + + let ver_no = match bytes.get(..size_of::()) { + Some(bytes) => bytes.try_into().map(u8::from_be_bytes).unwrap(), + None => return Err("invalid LifecycleContent: cannot extract ver_no".into()), + }; + + let ver_id = match bytes.get(size_of::()..(36 + 1)) { + Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() }, + None => return Err("invalid LifecycleContent: cannot extract ver_id".into()), + }; + + let mod_timestamp = match bytes.get((36 + 1)..(size_of::() + 36 + 1)) { + Some(bytes) => bytes.try_into().map(i64::from_be_bytes).unwrap(), + None => return Err("invalid LifecycleContent: cannot extract mod_time timestamp".into()), + }; + + let type_ = match bytes.get(size_of::() + 36 + 1) { + Some(&0) => LifecycleType::ExpiryCurrent, + Some(&1) => LifecycleType::ExpiryNoncurrent, + Some(&2) => LifecycleType::TransitionCurrent, + Some(&3) => LifecycleType::TransitionNoncurrent, + Some(_) => return Err("invalid LifecycleContent: invalid LifecycleType".into()), + None => return Err("invalid LifecycleContent: cannot extract LifecycleType".into()), + }; + + let object_name = match bytes.get((size_of::() + 36 + 1 + 1)..) { + Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() }, + None => return Err("invalid LifecycleContent: cannot extract object_name".into()), + }; + + Ok(LifecycleContent { + ver_no, + ver_id, + mod_time: OffsetDateTime::from_unix_timestamp(mod_timestamp).unwrap(), + type_, + object_name, + }) + } +} + +mod serial_tests { + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + //#[ignore] + async fn test_lifecycle_chche_build() { + let (_disk_paths, ecstore) = setup_test_env().await; + + // Create test bucket and object + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("test-lc-cache-{}", &suffix[..8]); + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; + + create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; + upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await; + + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await); + println!("✅ Object exists before lifecycle processing"); + + let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await { + Ok(outcome) => outcome, + Err(err) => { + warn!("Local usage scan failed: {}", err); + LocalScanOutcome::default() + } + }; + let bucket_objects_map = &scan_outcome.bucket_objects; + + let records = match bucket_objects_map.get(&bucket_name) { + Some(records) => records, + None => { + debug!("No local snapshot entries found for bucket {}; skipping lifecycle/integrity", bucket_name); + &vec![] + } + }; + + if let Some(lmdb_env) = GLOBAL_LMDB_ENV.get() { + if let Some(lmdb) = GLOBAL_LMDB_DB.get() { + let mut wtxn = lmdb_env.write_txn().unwrap(); + + /*if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await { + if let Ok(object_info) = ecstore + .get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle( + &lc_config, + None, + None, + &object_info, + ) + .await; + + rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects( + ecstore.clone(), + &object_info, + &event, + &rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner, + ) + .await; + + expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await; + } + }*/ + + for record in records { + if !record.usage.has_live_object { + continue; + } + + let object_info = convert_record_to_object_info(record); + println!("object_info2: {object_info:?}"); + let mod_time = object_info.mod_time.unwrap_or(OffsetDateTime::now_utc()); + let expiry_time = rustfs_ecstore::bucket::lifecycle::lifecycle::expected_expiry_time(mod_time, 1); + + let version_id = if let Some(version_id) = object_info.version_id { + version_id.to_string() + } else { + "zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz".to_string() + }; + + lmdb.put( + &mut wtxn, + &expiry_time.unix_timestamp(), + &LifecycleContent { + ver_no: 0, + ver_id: version_id, + mod_time, + type_: LifecycleType::TransitionNoncurrent, + object_name: object_info.name, + }, + ) + .unwrap(); + } + + wtxn.commit().unwrap(); + + let mut wtxn = lmdb_env.write_txn().unwrap(); + let iter = lmdb.iter_mut(&mut wtxn).unwrap(); + //let _ = unsafe { iter.del_current().unwrap() }; + for row in iter { + if let Ok(ref elm) = row { + let LifecycleContent { + ver_no, + ver_id, + mod_time, + type_, + object_name, + } = &elm.1; + println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}"); + //eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await; + eval_inner( + &lifecycle::ObjectOpts { + name: oi.name.clone(), + user_tags: oi.user_tags.clone(), + version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(), + mod_time: oi.mod_time, + size: oi.size as usize, + is_latest: oi.is_latest, + num_versions: oi.num_versions, + delete_marker: oi.delete_marker, + successor_mod_time: oi.successor_mod_time, + restore_ongoing: oi.restore_ongoing, + restore_expires: oi.restore_expires, + transition_status: oi.transitioned_object.status.clone(), + ..Default::default() + }, + OffsetDateTime::now_utc(), + ) + .await; + } + println!("row:{row:?}"); + } + //drop(iter); + wtxn.commit().unwrap(); + } + } + + println!("Lifecycle cache test completed"); + } +} + +async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event { + let mut events = Vec::::new(); + info!( + "eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}", + obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker + ); + if obj.mod_time.expect("err").unix_timestamp() == 0 { + info!("eval_inner: mod_time is 0, returning default event"); + return Event::default(); + } + + if let Some(restore_expires) = obj.restore_expires { + if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() { + let mut action = IlmAction::DeleteRestoredAction; + if !obj.is_latest { + action = IlmAction::DeleteRestoredVersionAction; + } + + events.push(Event { + action, + due: Some(now), + rule_id: "".into(), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }); + } + } + + if let Some(ref lc_rules) = self.filter_rules(obj).await { + for rule in lc_rules.iter() { + if obj.expired_object_deletemarker() { + if let Some(expiration) = rule.expiration.as_ref() { + if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker { + events.push(Event { + action: IlmAction::DeleteVersionAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(now), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }); + break; + } + + if let Some(days) = expiration.days { + let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/); + if now.unix_timestamp() >= expected_expiry.unix_timestamp() { + events.push(Event { + action: IlmAction::DeleteVersionAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(expected_expiry), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }); + break; + } + } + } + } + + if obj.is_latest { + if let Some(ref expiration) = rule.expiration { + if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker { + if obj.delete_marker && expired_object_delete_marker { + let due = expiration.next_due(obj); + if let Some(due) = due { + if now.unix_timestamp() >= due.unix_timestamp() { + events.push(Event { + action: IlmAction::DelMarkerDeleteAllVersionsAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(due), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }); + } + } + continue; + } + } + } + } + + if !obj.is_latest { + if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration { + if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions { + if newer_noncurrent_versions > 0 { + continue; + } + } + } + } + + if !obj.is_latest { + if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration { + if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days { + if noncurrent_days != 0 { + if let Some(successor_mod_time) = obj.successor_mod_time { + let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days); + if now.unix_timestamp() >= expected_expiry.unix_timestamp() { + events.push(Event { + action: IlmAction::DeleteVersionAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(expected_expiry), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }); + } + } + } + } + } + } + + if !obj.is_latest { + if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions { + if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class { + if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE { + let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj); + if let Some(due0) = due { + if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() { + events.push(Event { + action: IlmAction::TransitionVersionAction, + rule_id: rule.id.clone().expect("err!"), + due, + storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0] + .storage_class + .clone() + .unwrap() + .as_str() + .to_string(), + ..Default::default() + }); + } + } + } + } + } + } + + info!( + "eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}", + obj.is_latest, + obj.delete_marker, + obj.version_id, + (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker + ); + // Allow expiration for latest objects OR non-versioned objects (empty version_id) + if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker { + info!("eval_inner: entering expiration check"); + if let Some(ref expiration) = rule.expiration { + if let Some(ref date) = expiration.date { + let date0 = OffsetDateTime::from(date.clone()); + if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) { + info!("eval_inner: expiration by date - date0={:?}", date0); + events.push(Event { + action: IlmAction::DeleteAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(date0), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }); + } + } else if let Some(days) = expiration.days { + let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days); + info!( + "eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}", + days, + obj.mod_time.expect("err!"), + expected_expiry, + now, + now.unix_timestamp() > expected_expiry.unix_timestamp() + ); + if now.unix_timestamp() >= expected_expiry.unix_timestamp() { + info!("eval_inner: object should expire, adding DeleteAction"); + let mut event = Event { + action: IlmAction::DeleteAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(expected_expiry), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }; + /*if rule.expiration.expect("err!").delete_all.val { + event.action = IlmAction::DeleteAllVersionsAction + }*/ + events.push(event); + } + } else { + info!("eval_inner: expiration.days is None"); + } + } else { + info!("eval_inner: rule.expiration is None"); + } + + if obj.transition_status != TRANSITION_COMPLETE { + if let Some(ref transitions) = rule.transitions { + let due = transitions[0].next_due(obj); + if let Some(due0) = due { + if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() { + events.push(Event { + action: IlmAction::TransitionAction, + rule_id: rule.id.clone().expect("err!"), + due, + storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + }); + } + } + } + } + } + } + } + + if events.len() > 0 { + events.sort_by(|a, b| { + if now.unix_timestamp() > a.due.expect("err!").unix_timestamp() + && now.unix_timestamp() > b.due.expect("err").unix_timestamp() + || a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp() + { + match a.action { + IlmAction::DeleteAllVersionsAction + | IlmAction::DelMarkerDeleteAllVersionsAction + | IlmAction::DeleteAction + | IlmAction::DeleteVersionAction => { + return Ordering::Less; + } + _ => (), + } + match b.action { + IlmAction::DeleteAllVersionsAction + | IlmAction::DelMarkerDeleteAllVersionsAction + | IlmAction::DeleteAction + | IlmAction::DeleteVersionAction => { + return Ordering::Greater; + } + _ => (), + } + return Ordering::Less; + } + + if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() { + return Ordering::Less; + } + return Ordering::Greater; + }); + return events[0].clone(); + } + + Event::default() +} diff --git a/crates/scanner/tests/lifecycle_integration_test.rs b/crates/scanner/tests/lifecycle_integration_test.rs new file mode 100644 index 00000000..b0a4d79e --- /dev/null +++ b/crates/scanner/tests/lifecycle_integration_test.rs @@ -0,0 +1,441 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use rustfs_ecstore::{ + bucket::metadata::BUCKET_LIFECYCLE_CONFIG, + bucket::metadata_sys, + disk::endpoint::Endpoint, + endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, + global::GLOBAL_TierConfigMgr, + store::ECStore, + store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, + tier::tier_config::{TierConfig, TierMinIO, TierType}, +}; +use rustfs_scanner::scanner::init_data_scanner; +use serial_test::serial; +use std::{ + path::PathBuf, + sync::{Arc, Once, OnceLock}, + time::Duration, +}; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::info; + +static GLOBAL_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); +static INIT: Once = Once::new(); + +fn init_tracing() { + INIT.call_once(|| { + let _ = tracing_subscriber::fmt::try_init(); + }); +} + +/// Test helper: Create test environment with ECStore +async fn setup_test_env() -> (Vec, Arc) { + init_tracing(); + + // Fast path: already initialized, just clone and return + if let Some((paths, ecstore)) = GLOBAL_ENV.get() { + return (paths.clone(), ecstore.clone()); + } + + // create temp dir as 4 disks with unique base dir + let test_base_dir = format!("/tmp/rustfs_ahm_lifecycle_test_{}", uuid::Uuid::new_v4()); + let temp_dir = std::path::PathBuf::from(&test_base_dir); + if temp_dir.exists() { + fs::remove_dir_all(&temp_dir).await.ok(); + } + fs::create_dir_all(&temp_dir).await.unwrap(); + + // create 4 disk dirs + let disk_paths = vec![ + temp_dir.join("disk1"), + temp_dir.join("disk2"), + temp_dir.join("disk3"), + temp_dir.join("disk4"), + ]; + + for disk_path in &disk_paths { + fs::create_dir_all(disk_path).await.unwrap(); + } + + // create EndpointServerPools + let mut endpoints = Vec::new(); + for (i, disk_path) in disk_paths.iter().enumerate() { + let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap(); + // set correct index + endpoint.set_pool_index(0); + endpoint.set_set_index(0); + endpoint.set_disk_index(i); + endpoints.push(endpoint); + } + + let pool_endpoints = PoolEndpoints { + legacy: false, + set_count: 1, + drives_per_set: 4, + endpoints: Endpoints::from(endpoints), + cmd_line: "test".to_string(), + platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH), + }; + + let endpoint_pools = EndpointServerPools(vec![pool_endpoints]); + + // format disks (only first time) + rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap(); + + // create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free + let port = 9002; // for simplicity + let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new()) + .await + .unwrap(); + + // init bucket metadata system + let buckets_list = ecstore + .list_bucket(&rustfs_ecstore::store_api::BucketOptions { + no_metadata: true, + ..Default::default() + }) + .await + .unwrap(); + let buckets = buckets_list.into_iter().map(|v| v.name).collect(); + rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await; + + // Initialize background expiry workers + rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await; + + // Store in global once lock + let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone())); + + (disk_paths, ecstore) +} + +/// Test helper: Create a test bucket +async fn create_test_bucket(ecstore: &Arc, bucket_name: &str) { + (**ecstore) + .make_bucket(bucket_name, &Default::default()) + .await + .expect("Failed to create test bucket"); + info!("Created test bucket: {}", bucket_name); +} + +/// Test helper: Create a test lock bucket +async fn create_test_lock_bucket(ecstore: &Arc, bucket_name: &str) { + (**ecstore) + .make_bucket( + bucket_name, + &MakeBucketOptions { + lock_enabled: true, + versioning_enabled: true, + ..Default::default() + }, + ) + .await + .expect("Failed to create test bucket"); + info!("Created test bucket: {}", bucket_name); +} + +/// Test helper: Upload test object +async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, data: &[u8]) { + let mut reader = PutObjReader::from_vec(data.to_vec()); + let object_info = (**ecstore) + .put_object(bucket, object, &mut reader, &ObjectOptions::default()) + .await + .expect("Failed to upload test object"); + + info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); +} + +/// Test helper: Set bucket lifecycle configuration +async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box> { + // Create a simple lifecycle configuration XML with 0 days expiry for immediate testing + let lifecycle_xml = r#" + + + test-rule + Enabled + + test/ + + + 0 + + +"#; + + metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?; + + Ok(()) +} + +/// Test helper: Set bucket lifecycle configuration +async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box> { + // Create a simple lifecycle configuration XML with 0 days expiry for immediate testing + let lifecycle_xml = r#" + + + test-rule + Enabled + + test/ + + + 0 + true + + +"#; + + metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?; + + Ok(()) +} + +#[allow(dead_code)] +async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box> { + // Create a simple lifecycle configuration XML with 0 days expiry for immediate testing + let lifecycle_xml = r#" + + + test-rule + Enabled + + test/ + + + 0 + COLDTIER44 + + + + test-rule2 + Disabled + + test/ + + + 0 + COLDTIER44 + + +"#; + + metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?; + + Ok(()) +} + +/// Test helper: Create a test tier +#[allow(dead_code)] +async fn create_test_tier(server: u32) { + let args = TierConfig { + version: "v1".to_string(), + tier_type: TierType::MinIO, + name: "COLDTIER44".to_string(), + s3: None, + aliyun: None, + tencent: None, + huaweicloud: None, + azure: None, + gcs: None, + r2: None, + rustfs: None, + minio: if server == 1 { + Some(TierMinIO { + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + bucket: "hello".to_string(), + endpoint: "http://39.105.198.204:9000".to_string(), + prefix: format!("mypre{}/", uuid::Uuid::new_v4()), + region: "".to_string(), + ..Default::default() + }) + } else if server == 2 { + Some(TierMinIO { + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + bucket: "mblock2".to_string(), + endpoint: "http://m1ddns.pvtool.com:9020".to_string(), + prefix: format!("mypre{}/", uuid::Uuid::new_v4()), + region: "".to_string(), + ..Default::default() + }) + } else { + Some(TierMinIO { + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + bucket: "mblock2".to_string(), + endpoint: "http://127.0.0.1:9020".to_string(), + prefix: format!("mypre{}/", uuid::Uuid::new_v4()), + region: "".to_string(), + ..Default::default() + }) + }, + }; + let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; + if let Err(err) = tier_config_mgr.add(args, false).await { + println!("tier_config_mgr add failed, e: {err:?}"); + panic!("tier add failed. {err}"); + } + if let Err(e) = tier_config_mgr.save().await { + println!("tier_config_mgr save failed, e: {e:?}"); + panic!("tier save failed"); + } + println!("Created test tier: COLDTIER44"); +} + +/// Test helper: Check if object exists +async fn object_exists(ecstore: &Arc, bucket: &str, object: &str) -> bool { + match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await { + Ok(info) => !info.delete_marker, + Err(_) => false, + } +} + +/// Test helper: Check if object exists +#[allow(dead_code)] +async fn object_is_delete_marker(ecstore: &Arc, bucket: &str, object: &str) -> bool { + if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await { + println!("oi: {oi:?}"); + oi.delete_marker + } else { + println!("object_is_delete_marker is error"); + panic!("object_is_delete_marker is error"); + } +} + +/// Test helper: Check if object exists +#[allow(dead_code)] +async fn object_is_transitioned(ecstore: &Arc, bucket: &str, object: &str) -> bool { + if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await { + println!("oi: {oi:?}"); + !oi.transitioned_object.status.is_empty() + } else { + println!("object_is_transitioned is error"); + panic!("object_is_transitioned is error"); + } +} + +async fn wait_for_object_absence(ecstore: &Arc, bucket: &str, object: &str, timeout: Duration) -> bool { + let deadline = tokio::time::Instant::now() + timeout; + + loop { + if !object_exists(ecstore, bucket, object).await { + return true; + } + + if tokio::time::Instant::now() >= deadline { + return false; + } + + tokio::time::sleep(Duration::from_millis(200)).await; + } +} + +mod serial_tests { + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[serial] + //#[ignore] + async fn test_lifecycle_transition_basic() { + let (_disk_paths, ecstore) = setup_test_env().await; + + create_test_tier(2).await; + + // Create test bucket and object + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("test-lc-transition-{}", &suffix[..8]); + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; + + create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; + upload_test_object( + &ecstore, + bucket_name.as_str(), + object_name, + b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !", + ) + .await; + //create_test_bucket(&ecstore, bucket_name.as_str()).await; + upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await; + + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await); + println!("✅ Object exists before lifecycle processing"); + + // Set lifecycle configuration with very short expiry (0 days = immediate expiry) + set_bucket_lifecycle_transition(bucket_name.as_str()) + .await + .expect("Failed to set lifecycle configuration"); + println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + + // Verify lifecycle configuration was set + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await { + Ok(bucket_meta) => { + assert!(bucket_meta.lifecycle_config.is_some()); + println!("✅ Bucket metadata retrieved successfully"); + } + Err(e) => { + println!("❌ Error retrieving bucket metadata: {e:?}"); + } + } + + let ctx = CancellationToken::new(); + + // Start scanner + init_data_scanner(ctx.clone(), ecstore.clone()).await; + println!("✅ Scanner started"); + + // Wait for scanner to process lifecycle rules + tokio::time::sleep(Duration::from_secs(1200)).await; + + // Check if object has been expired (deleted) + let check_result = object_is_transitioned(&ecstore, &bucket_name, object_name).await; + println!("Object exists after lifecycle processing: {check_result}"); + + if check_result { + println!("✅ Object was transitioned by lifecycle processing"); + // Let's try to get object info to see its details + match ecstore + .get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + Ok(obj_info) => { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + println!("Object info: transitioned_object={:?}", obj_info.transitioned_object); + } + Err(e) => { + println!("Error getting object info: {e:?}"); + } + } + } else { + println!("❌ Object was not transitioned by lifecycle processing"); + } + + assert!(check_result); + println!("✅ Object successfully transitioned"); + + // Stop scanner + ctx.cancel(); + println!("✅ Scanner stopped"); + + println!("Lifecycle transition basic test completed"); + } +}