Compare commits

..

6 Commits

Author SHA1 Message Date
0xdx2
44f3eb7244 Fix: add support for additional AWS S3 storage classes and validation logic (#487)
* Fix: add pagination fields to S3 response

* Fix: add support for additional AWS S3 storage classes and validation logic

* Fix: improve handling of optional fields in S3 response

---------

Co-authored-by: DamonXue <damonxue2@gmail.com>
2025-09-05 09:50:41 +08:00
weisd
01b2623f66 Fix/response (#485)
* fix:list_parts response

* fix:list_objects skip delete_marker
2025-09-03 17:52:31 +08:00
dependabot[bot]
cf4d63795f build(deps): bump crc-fast from 1.4.0 to 1.5.0 in the dependencies group (#481)
Bumps the dependencies group with 1 update: [crc-fast](https://github.com/awesomized/crc-fast-rust).


Updates `crc-fast` from 1.4.0 to 1.5.0
- [Release notes](https://github.com/awesomized/crc-fast-rust/releases)
- [Changelog](https://github.com/awesomized/crc-fast-rust/blob/main/CHANGELOG.md)
- [Commits](https://github.com/awesomized/crc-fast-rust/compare/1.4.0...1.5.0)

---
updated-dependencies:
- dependency-name: crc-fast
  dependency-version: 1.5.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: weisd <im@weisd.in>
2025-09-03 17:30:08 +08:00
WenTao
0efc818635 Fix Windows path separator issue using PathBuf (#482)
* Update mod.rs

The following code uses a separator that is not compatible with Windows:

format!("{}/{}", file_config.path.clone(), rustfs_config::DEFAULT_SINK_FILE_LOG_FILE)


Change it to the following code:


std::path::Path::new(&file_config.path)
    .join(rustfs_config::DEFAULT_SINK_FILE_LOG_FILE)
    .to_string_lossy()
    .to_string()

* Replaced format! macro with PathBuf::join to fix path separator issue on Windows.Tested on Windows 10 with Rust 1.85.0, paths now correctly use \ separator.
2025-09-03 15:25:08 +08:00
weisd
c9d26c6e88 Fix/delete version (#484)
* fix:delete_version

* fix:test_lifecycle_expiry_basic

---------

Co-authored-by: likewu <likewu@126.com>
2025-09-03 15:12:58 +08:00
likewu
087df484a3 Fix/ilm (#478) 2025-09-02 18:18:26 +08:00
12 changed files with 248 additions and 47 deletions

4
Cargo.lock generated
View File

@@ -1660,9 +1660,9 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc-fast"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec9f79df9b0383475ae6df8fcf35d4e29528441706385339daf0fe3f4cce040b"
checksum = "61dd931a9d071dc6e36b8735c501601428c5aa5dfd6d2506b935498ef57e0098"
dependencies = [
"crc",
"digest 0.10.7",

View File

@@ -107,7 +107,7 @@ bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.0.1"
byteorder = "1.5.0"
cfg-if = "1.0.3"
crc-fast = "1.4.0"
crc-fast = "1.5.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.46", features = ["derive", "env"] }

View File

@@ -19,7 +19,7 @@ use rustfs_ecstore::{
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier::TierConfigMgr,
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
@@ -29,8 +29,8 @@ use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tokio::sync::RwLock;
use tracing::info;
use tracing::warn;
use tracing::{debug, info};
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
@@ -132,6 +132,22 @@ async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Create a test lock bucket
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(
bucket_name,
&MakeBucketOptions {
lock_enabled: true,
versioning_enabled: true,
..Default::default()
},
)
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
@@ -262,7 +278,7 @@ async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bo
#[allow(dead_code)]
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {:?}", oi);
debug!("oi: {:?}", oi);
oi.delete_marker
} else {
panic!("object_is_delete_marker is error");
@@ -339,12 +355,13 @@ async fn test_lifecycle_expiry_basic() {
tokio::time::sleep(Duration::from_secs(5)).await;
// Check if object has been expired (delete_marker)
//let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await;
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object is_delete_marker after lifecycle processing: {check_result}");
if !check_result {
if check_result {
println!("❌ Object was not deleted by lifecycle processing");
} else {
println!("✅ Object was successfully deleted by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
@@ -360,11 +377,9 @@ async fn test_lifecycle_expiry_basic() {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("✅ Object was successfully deleted by lifecycle processing");
}
assert!(check_result);
assert!(!check_result);
println!("✅ Object successfully expired");
// Stop scanner
@@ -384,7 +399,7 @@ async fn test_lifecycle_expiry_deletemarker() {
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_bucket(&ecstore, bucket_name).await;
create_test_lock_bucket(&ecstore, bucket_name).await;
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
// Verify object exists initially
@@ -433,6 +448,7 @@ async fn test_lifecycle_expiry_deletemarker() {
tokio::time::sleep(Duration::from_secs(5)).await;
// Check if object has been expired (deleted)
//let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await;
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {check_result}");

View File

@@ -36,6 +36,17 @@ pub fn default_parity_count(drive: usize) -> usize {
pub const RRS: &str = "REDUCED_REDUNDANCY";
pub const STANDARD: &str = "STANDARD";
// AWS S3 Storage Classes
pub const DEEP_ARCHIVE: &str = "DEEP_ARCHIVE";
pub const EXPRESS_ONEZONE: &str = "EXPRESS_ONEZONE";
pub const GLACIER: &str = "GLACIER";
pub const GLACIER_IR: &str = "GLACIER_IR";
pub const INTELLIGENT_TIERING: &str = "INTELLIGENT_TIERING";
pub const ONEZONE_IA: &str = "ONEZONE_IA";
pub const OUTPOSTS: &str = "OUTPOSTS";
pub const SNOW: &str = "SNOW";
pub const STANDARD_IA: &str = "STANDARD_IA";
// Standard constants for config info storage class
pub const CLASS_STANDARD: &str = "standard";
pub const CLASS_RRS: &str = "rrs";
@@ -115,6 +126,15 @@ impl Config {
None
}
}
// All these storage classes use standard parity configuration
STANDARD | DEEP_ARCHIVE | EXPRESS_ONEZONE | GLACIER | GLACIER_IR | INTELLIGENT_TIERING | ONEZONE_IA | OUTPOSTS
| SNOW | STANDARD_IA => {
if self.initialized {
Some(self.standard.parity)
} else {
None
}
}
_ => {
if self.initialized {
Some(self.standard.parity)

View File

@@ -4778,10 +4778,18 @@ impl StorageAPI for SetDisks {
let part_number_marker = part_number_marker.unwrap_or_default();
// Extract storage class from metadata, default to STANDARD if not found
let storage_class = fi
.metadata
.get(rustfs_filemeta::headers::AMZ_STORAGE_CLASS)
.cloned()
.unwrap_or_else(|| storageclass::STANDARD.to_string());
let mut ret = ListPartsInfo {
bucket: bucket.to_owned(),
object: object.to_owned(),
upload_id: upload_id.to_owned(),
storage_class,
max_parts,
part_number_marker,
user_defined: fi.metadata.clone(),
@@ -6039,6 +6047,40 @@ pub fn should_prevent_write(oi: &ObjectInfo, if_none_match: Option<String>, if_m
}
}
/// Validates if the given storage class is supported
pub fn is_valid_storage_class(storage_class: &str) -> bool {
matches!(
storage_class,
storageclass::STANDARD
| storageclass::RRS
| storageclass::DEEP_ARCHIVE
| storageclass::EXPRESS_ONEZONE
| storageclass::GLACIER
| storageclass::GLACIER_IR
| storageclass::INTELLIGENT_TIERING
| storageclass::ONEZONE_IA
| storageclass::OUTPOSTS
| storageclass::SNOW
| storageclass::STANDARD_IA
)
}
/// Returns true if the storage class is a cold storage tier that requires special handling
pub fn is_cold_storage_class(storage_class: &str) -> bool {
matches!(
storage_class,
storageclass::DEEP_ARCHIVE | storageclass::GLACIER | storageclass::GLACIER_IR
)
}
/// Returns true if the storage class is an infrequent access tier
pub fn is_infrequent_access_class(storage_class: &str) -> bool {
matches!(
storage_class,
storageclass::ONEZONE_IA | storageclass::STANDARD_IA | storageclass::INTELLIGENT_TIERING
)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -6528,4 +6570,53 @@ mod tests {
let if_match = None;
assert!(!should_prevent_write(&oi, if_none_match, if_match));
}
#[test]
fn test_is_valid_storage_class() {
// Test valid storage classes
assert!(is_valid_storage_class(storageclass::STANDARD));
assert!(is_valid_storage_class(storageclass::RRS));
assert!(is_valid_storage_class(storageclass::DEEP_ARCHIVE));
assert!(is_valid_storage_class(storageclass::EXPRESS_ONEZONE));
assert!(is_valid_storage_class(storageclass::GLACIER));
assert!(is_valid_storage_class(storageclass::GLACIER_IR));
assert!(is_valid_storage_class(storageclass::INTELLIGENT_TIERING));
assert!(is_valid_storage_class(storageclass::ONEZONE_IA));
assert!(is_valid_storage_class(storageclass::OUTPOSTS));
assert!(is_valid_storage_class(storageclass::SNOW));
assert!(is_valid_storage_class(storageclass::STANDARD_IA));
// Test invalid storage classes
assert!(!is_valid_storage_class("INVALID"));
assert!(!is_valid_storage_class(""));
assert!(!is_valid_storage_class("standard")); // lowercase
}
#[test]
fn test_is_cold_storage_class() {
// Test cold storage classes
assert!(is_cold_storage_class(storageclass::DEEP_ARCHIVE));
assert!(is_cold_storage_class(storageclass::GLACIER));
assert!(is_cold_storage_class(storageclass::GLACIER_IR));
// Test non-cold storage classes
assert!(!is_cold_storage_class(storageclass::STANDARD));
assert!(!is_cold_storage_class(storageclass::RRS));
assert!(!is_cold_storage_class(storageclass::STANDARD_IA));
assert!(!is_cold_storage_class(storageclass::EXPRESS_ONEZONE));
}
#[test]
fn test_is_infrequent_access_class() {
// Test infrequent access classes
assert!(is_infrequent_access_class(storageclass::ONEZONE_IA));
assert!(is_infrequent_access_class(storageclass::STANDARD_IA));
assert!(is_infrequent_access_class(storageclass::INTELLIGENT_TIERING));
// Test frequent access classes
assert!(!is_infrequent_access_class(storageclass::STANDARD));
assert!(!is_infrequent_access_class(storageclass::RRS));
assert!(!is_infrequent_access_class(storageclass::DEEP_ARCHIVE));
assert!(!is_infrequent_access_class(storageclass::EXPRESS_ONEZONE));
}
}

View File

@@ -1003,7 +1003,7 @@ async fn gather_results(
}
}
if !opts.incl_deleted && entry.is_object() && entry.is_latest_delete_marker() && entry.is_object_dir() {
if !opts.incl_deleted && entry.is_object() && entry.is_latest_delete_marker() && !entry.is_object_dir() {
continue;
}

View File

@@ -112,6 +112,39 @@ impl FileMeta {
Ok((&buf[8..], major, minor))
}
// Returns (meta, inline_data)
pub fn is_indexed_meta(buf: &[u8]) -> Result<(&[u8], &[u8])> {
let (buf, major, minor) = Self::check_xl2_v1(buf)?;
if major != 1 || minor < 3 {
return Ok((&[], &[]));
}
let (mut size_buf, buf) = buf.split_at(5);
// Get meta data, buf = crc + data
let bin_len = rmp::decode::read_bin_len(&mut size_buf)?;
if buf.len() < bin_len as usize {
return Ok((&[], &[]));
}
let (meta, buf) = buf.split_at(bin_len as usize);
if buf.len() < 5 {
return Err(Error::other("insufficient data for CRC"));
}
let (mut crc_buf, inline_data) = buf.split_at(5);
// crc check
let crc = rmp::decode::read_u32(&mut crc_buf)?;
let meta_crc = xxh64::xxh64(meta, XXHASH_SEED) as u32;
if crc != meta_crc {
return Err(Error::other("xl file crc check failed"));
}
Ok((meta, inline_data))
}
// Fixed u32
pub fn read_bytes_header(buf: &[u8]) -> Result<(u32, &[u8])> {
let (mut size_buf, _) = buf.split_at(5);
@@ -289,6 +322,7 @@ impl FileMeta {
let offset = wr.len();
// xl header
rmp::encode::write_uint8(&mut wr, XL_HEADER_VERSION)?;
rmp::encode::write_uint8(&mut wr, XL_META_VERSION)?;
@@ -578,45 +612,61 @@ impl FileMeta {
}
}
let mut found_index = None;
for (i, version) in self.versions.iter().enumerate() {
if version.header.version_type != VersionType::Object || version.header.version_id != fi.version_id {
continue;
}
let mut ver = self.get_idx(i)?;
if fi.expire_restored {
ver.object.as_mut().unwrap().remove_restore_hdrs();
let _ = self.set_idx(i, ver.clone());
} else if fi.transition_status == TRANSITION_COMPLETE {
ver.object.as_mut().unwrap().set_transition(fi);
ver.object.as_mut().unwrap().reset_inline_data();
self.set_idx(i, ver.clone())?;
return Ok(None);
} else {
let vers = self.versions[i + 1..].to_vec();
self.versions.extend(vers.iter().cloned());
let (free_version, to_free) = ver.object.as_ref().unwrap().init_free_version(fi);
if to_free {
self.add_version_filemata(free_version)?;
}
if version.header.version_type == VersionType::Object && version.header.version_id == fi.version_id {
found_index = Some(i);
break;
}
}
let Some(i) = found_index else {
if fi.deleted {
self.add_version_filemata(ventry)?;
}
if self.shared_data_dir_count(ver.object.as_ref().unwrap().version_id, ver.object.as_ref().unwrap().data_dir) > 0 {
return Ok(None);
}
return Ok(ver.object.as_ref().unwrap().data_dir);
return Err(Error::FileVersionNotFound);
};
let mut ver = self.get_idx(i)?;
let Some(obj) = &mut ver.object else {
if fi.deleted {
self.add_version_filemata(ventry)?;
return Ok(None);
}
return Err(Error::FileVersionNotFound);
};
let obj_version_id = obj.version_id;
let obj_data_dir = obj.data_dir;
if fi.expire_restored {
obj.remove_restore_hdrs();
self.set_idx(i, ver)?;
} else if fi.transition_status == TRANSITION_COMPLETE {
obj.set_transition(fi);
obj.reset_inline_data();
self.set_idx(i, ver)?;
} else {
self.versions.remove(i);
let (free_version, to_free) = obj.init_free_version(fi);
if to_free {
self.add_version_filemata(free_version)?;
}
}
if fi.deleted {
self.add_version_filemata(ventry)?;
}
if self.shared_data_dir_count(obj_version_id, obj_data_dir) > 0 {
return Ok(None);
}
Err(Error::FileVersionNotFound)
Ok(obj_data_dir)
}
pub fn into_fileinfo(

View File

@@ -112,8 +112,8 @@ impl MetaCacheEntry {
return false;
}
match FileMeta::check_xl2_v1(&self.metadata) {
Ok((meta, _, _)) => {
match FileMeta::is_indexed_meta(&self.metadata) {
Ok((meta, _inline_data)) => {
if !meta.is_empty() {
return FileMeta::is_latest_delete_marker(meta);
}

View File

@@ -79,7 +79,10 @@ pub async fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
SinkConfig::File(file_config) => {
tracing::debug!("FileSink: Using path: {}", file_config.path);
match file::FileSink::new(
format!("{}/{}", file_config.path.clone(), rustfs_config::DEFAULT_SINK_FILE_LOG_FILE),
std::path::Path::new(&file_config.path)
.join(rustfs_config::DEFAULT_SINK_FILE_LOG_FILE)
.to_string_lossy()
.to_string(),
file_config
.buffer_size
.unwrap_or(rustfs_config::observability::DEFAULT_SINKS_FILE_BUFFER_SIZE),

View File

@@ -227,7 +227,7 @@ impl Operation for NotificationTarget {
let port = url
.port_or_known_default()
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing port"))?;
let addr = format!("{}:{}", host, port);
let addr = format!("{host}:{port}");
// First, try to parse as SocketAddr (IP:port)
if addr.parse::<SocketAddr>().is_err() {
// If not an IP:port, try DNS resolution

View File

@@ -144,9 +144,9 @@ pub async fn start_http_server(
for domain in &opt.server_domains {
domain_sets.insert(domain.to_string());
if let Some((host, _)) = domain.split_once(':') {
domain_sets.insert(format!("{}:{}", host, server_port));
domain_sets.insert(format!("{host}:{server_port}"));
} else {
domain_sets.insert(format!("{}:{}", domain, server_port));
domain_sets.insert(format!("{domain}:{server_port}"));
}
}

View File

@@ -57,8 +57,8 @@ use rustfs_ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use rustfs_ecstore::compress::is_compressible;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_ecstore::set_disk::{DEFAULT_READ_BUFFER_SIZE, is_valid_storage_class};
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::store_api::CompletePart;
use rustfs_ecstore::store_api::DeleteBucketOptions;
@@ -1385,8 +1385,7 @@ impl S3 for FS {
let input = req.input;
if let Some(ref storage_class) = input.storage_class {
let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
if !is_valid {
if !is_valid_storage_class(storage_class.as_str()) {
return Err(s3_error!(InvalidStorageClass));
}
}
@@ -1530,9 +1529,17 @@ impl S3 for FS {
key,
tagging,
version_id,
storage_class,
..
} = req.input.clone();
// Validate storage class if provided
if let Some(ref storage_class) = storage_class {
if !is_valid_storage_class(storage_class.as_str()) {
return Err(s3_error!(InvalidStorageClass));
}
}
// mc cp step 3
// debug!("create_multipart_upload meta {:?}", &metadata);
@@ -1895,6 +1902,20 @@ impl S3 for FS {
})
.collect(),
),
owner: Some(RUSTFS_OWNER.to_owned()),
initiator: Some(Initiator {
id: RUSTFS_OWNER.id.clone(),
display_name: RUSTFS_OWNER.display_name.clone(),
}),
is_truncated: Some(res.is_truncated),
next_part_number_marker: res.next_part_number_marker.try_into().ok(),
max_parts: res.max_parts.try_into().ok(),
part_number_marker: res.part_number_marker.try_into().ok(),
storage_class: if res.storage_class.is_empty() {
None
} else {
Some(res.storage_class.into())
},
..Default::default()
};
Ok(S3Response::new(output))