diff --git a/crates/ecstore/src/erasure_coding/encode.rs b/crates/ecstore/src/erasure_coding/encode.rs index 766c96ca..e19690c1 100644 --- a/crates/ecstore/src/erasure_coding/encode.rs +++ b/crates/ecstore/src/erasure_coding/encode.rs @@ -149,6 +149,12 @@ impl Erasure { break; } Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + // Check if the inner error is a checksum mismatch - if so, propagate it + if let Some(inner) = e.get_ref() { + if rustfs_rio::is_checksum_mismatch(inner) { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())); + } + } break; } Err(e) => { diff --git a/crates/ecstore/src/error.rs b/crates/ecstore/src/error.rs index 01d57fbd..410faa72 100644 --- a/crates/ecstore/src/error.rs +++ b/crates/ecstore/src/error.rs @@ -194,6 +194,12 @@ pub enum StorageError { #[error("Precondition failed")] PreconditionFailed, + #[error("Not modified")] + NotModified, + + #[error("Invalid part number: {0}")] + InvalidPartNumber(usize), + #[error("Invalid range specified: {0}")] InvalidRangeSpec(String), } @@ -427,6 +433,8 @@ impl Clone for StorageError { StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()), StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()), StorageError::PreconditionFailed => StorageError::PreconditionFailed, + StorageError::NotModified => StorageError::NotModified, + StorageError::InvalidPartNumber(a) => StorageError::InvalidPartNumber(*a), StorageError::InvalidRangeSpec(a) => StorageError::InvalidRangeSpec(a.clone()), } } @@ -496,6 +504,8 @@ impl StorageError { StorageError::PreconditionFailed => 0x3B, StorageError::EntityTooSmall(_, _, _) => 0x3C, StorageError::InvalidRangeSpec(_) => 0x3D, + StorageError::NotModified => 0x3E, + StorageError::InvalidPartNumber(_) => 0x3F, } } @@ -566,6 +576,8 @@ impl StorageError { 0x3B => Some(StorageError::PreconditionFailed), 0x3C => Some(StorageError::EntityTooSmall(Default::default(), Default::default(), Default::default())), 0x3D => Some(StorageError::InvalidRangeSpec(Default::default())), + 0x3E => Some(StorageError::NotModified), + 0x3F => Some(StorageError::InvalidPartNumber(Default::default())), _ => None, } } @@ -679,6 +691,10 @@ pub fn is_err_data_movement_overwrite(err: &Error) -> bool { matches!(err, &StorageError::DataMovementOverwriteErr(_, _, _)) } +pub fn is_err_io(err: &Error) -> bool { + matches!(err, &StorageError::Io(_)) +} + pub fn is_all_not_found(errs: &[Option]) -> bool { for err in errs.iter() { if let Some(err) = err { diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index fbaa19c0..3097a9e2 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -767,6 +767,12 @@ impl ECStore { def_pool = pinfo.clone(); has_def_pool = true; + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-deletes.html + if is_err_object_not_found(err) { + if let Err(err) = opts.precondition_check(&pinfo.object_info) { + return Err(err.clone()); + } + } if !is_err_object_not_found(err) && !is_err_version_not_found(err) { return Err(err.clone()); @@ -1392,6 +1398,7 @@ impl StorageAPI for ECStore { let (info, _) = self.get_latest_object_info_with_idx(bucket, object.as_str(), opts).await?; + opts.precondition_check(&info)?; Ok(info) } diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index dbad3911..90c8fd96 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -356,6 +356,8 @@ impl HTTPRangeSpec { pub struct HTTPPreconditions { pub if_match: Option, pub if_none_match: Option, + pub if_modified_since: Option, + pub if_unmodified_since: Option, } #[derive(Debug, Default, Clone)] @@ -456,6 +458,76 @@ impl ObjectOptions { ..Default::default() } } + + pub fn precondition_check(&self, obj_info: &ObjectInfo) -> Result<()> { + let has_valid_mod_time = obj_info.mod_time.is_some_and(|t| t != OffsetDateTime::UNIX_EPOCH); + + if let Some(part_number) = self.part_number { + if part_number > 1 && !obj_info.parts.is_empty() { + let part_found = obj_info.parts.iter().any(|pi| pi.number == part_number); + if !part_found { + return Err(Error::InvalidPartNumber(part_number)); + } + } + } + + if let Some(pre) = &self.http_preconditions { + if let Some(if_none_match) = &pre.if_none_match { + if let Some(etag) = &obj_info.etag { + if is_etag_equal(etag, if_none_match) { + return Err(Error::NotModified); + } + } + } + + if has_valid_mod_time { + if let Some(if_modified_since) = &pre.if_modified_since { + if let Some(mod_time) = &obj_info.mod_time { + if !is_modified_since(mod_time, if_modified_since) { + return Err(Error::NotModified); + } + } + } + } + + if let Some(if_match) = &pre.if_match { + if let Some(etag) = &obj_info.etag { + if !is_etag_equal(etag, if_match) { + return Err(Error::PreconditionFailed); + } + } else { + return Err(Error::PreconditionFailed); + } + } + if has_valid_mod_time && pre.if_match.is_none() { + if let Some(if_unmodified_since) = &pre.if_unmodified_since { + if let Some(mod_time) = &obj_info.mod_time { + if is_modified_since(mod_time, if_unmodified_since) { + return Err(Error::PreconditionFailed); + } + } + } + } + } + + Ok(()) + } +} + +fn is_etag_equal(etag1: &str, etag2: &str) -> bool { + let e1 = etag1.trim_matches('"'); + let e2 = etag2.trim_matches('"'); + // Handle wildcard "*" - matches any ETag (per HTTP/1.1 RFC 7232) + if e2 == "*" { + return true; + } + e1 == e2 +} + +fn is_modified_since(mod_time: &OffsetDateTime, given_time: &OffsetDateTime) -> bool { + let mod_secs = mod_time.unix_timestamp(); + let given_secs = given_time.unix_timestamp(); + mod_secs > given_secs } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/crates/rio/src/hash_reader.rs b/crates/rio/src/hash_reader.rs index 15b4a49d..4d672e3c 100644 --- a/crates/rio/src/hash_reader.rs +++ b/crates/rio/src/hash_reader.rs @@ -499,17 +499,18 @@ impl AsyncRead for HashReader { let content_hash = hasher.finalize(); if content_hash != expected_content_hash.raw { + let expected_hex = hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower); + let actual_hex = hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower); error!( "Content hash mismatch, type={:?}, encoded={:?}, expected={:?}, actual={:?}", - expected_content_hash.checksum_type, - expected_content_hash.encoded, - hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower), - hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower) + expected_content_hash.checksum_type, expected_content_hash.encoded, expected_hex, actual_hex ); - return Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Content hash mismatch", - ))); + // Use ChecksumMismatch error so that API layer can return BadDigest + let checksum_err = crate::errors::ChecksumMismatch { + want: expected_hex, + got: actual_hex, + }; + return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, checksum_err))); } } diff --git a/crates/utils/src/io.rs b/crates/utils/src/io.rs index 42dc5ac2..94777b8d 100644 --- a/crates/utils/src/io.rs +++ b/crates/utils/src/io.rs @@ -41,6 +41,11 @@ pub async fn read_full(mut reader: R, mut bu if total == 0 { return Err(e); } + // If the error is InvalidData (e.g., checksum mismatch), preserve it + // instead of wrapping it as UnexpectedEof, so proper error handling can occur + if e.kind() == std::io::ErrorKind::InvalidData { + return Err(e); + } return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, format!("read {total} bytes, error: {e}"), diff --git a/rustfs/src/error.rs b/rustfs/src/error.rs index daeeacb3..5b4fe128 100644 --- a/rustfs/src/error.rs +++ b/rustfs/src/error.rs @@ -192,6 +192,21 @@ impl From for S3Error { impl From for ApiError { fn from(err: StorageError) -> Self { + // Special handling for Io errors that may contain ChecksumMismatch + if let StorageError::Io(ref io_err) = err { + if let Some(inner) = io_err.get_ref() { + if inner.downcast_ref::().is_some() + || inner.downcast_ref::().is_some() + { + return ApiError { + code: S3ErrorCode::BadDigest, + message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest), + source: Some(Box::new(err)), + }; + } + } + } + let code = match &err { StorageError::NotImplemented => S3ErrorCode::NotImplemented, StorageError::InvalidArgument(_, _, _) => S3ErrorCode::InvalidArgument, @@ -239,6 +254,23 @@ impl From for ApiError { impl From for ApiError { fn from(err: std::io::Error) -> Self { + // Check if the error is a ChecksumMismatch (BadDigest) + if let Some(inner) = err.get_ref() { + if inner.downcast_ref::().is_some() { + return ApiError { + code: S3ErrorCode::BadDigest, + message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest), + source: Some(Box::new(err)), + }; + } + if inner.downcast_ref::().is_some() { + return ApiError { + code: S3ErrorCode::BadDigest, + message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest), + source: Some(Box::new(err)), + }; + } + } ApiError { code: S3ErrorCode::InternalError, message: err.to_string(), diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 03e37c5a..8828fec2 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1418,12 +1418,17 @@ impl S3 for FS { ..Default::default() }; - let opts = ObjectOptions { - version_id: object.version_id.map(|v| v.to_string()), - versioned: version_cfg.prefix_enabled(&object.object_name), - version_suspended: version_cfg.suspended(), - ..Default::default() - }; + let metadata = extract_metadata(&req.headers); + + let opts: ObjectOptions = del_opts( + &bucket, + &object.object_name, + object.version_id.map(|f| f.to_string()), + &req.headers, + metadata, + ) + .await + .map_err(ApiError::from)?; let mut goi = ObjectInfo::default(); let mut gerr = None; @@ -1684,10 +1689,6 @@ impl S3 for FS { version_id, part_number, range, - if_none_match, - if_match, - if_modified_since, - if_unmodified_since, .. } = req.input.clone(); @@ -1880,35 +1881,6 @@ impl S3 for FS { let info = reader.object_info; - if let Some(match_etag) = if_none_match { - if info.etag.as_ref().is_some_and(|etag| etag == match_etag.as_str()) { - return Err(S3Error::new(S3ErrorCode::NotModified)); - } - } - - if let Some(modified_since) = if_modified_since { - // obj_time < givenTime + 1s - if info.mod_time.is_some_and(|mod_time| { - let give_time: OffsetDateTime = modified_since.into(); - mod_time < give_time.add(time::Duration::seconds(1)) - }) { - return Err(S3Error::new(S3ErrorCode::NotModified)); - } - } - - if let Some(match_etag) = if_match { - if info.etag.as_ref().is_some_and(|etag| etag != match_etag.as_str()) { - return Err(S3Error::new(S3ErrorCode::PreconditionFailed)); - } - } else if let Some(unmodified_since) = if_unmodified_since { - if info.mod_time.is_some_and(|mod_time| { - let give_time: OffsetDateTime = unmodified_since.into(); - mod_time > give_time.add(time::Duration::seconds(1)) - }) { - return Err(S3Error::new(S3ErrorCode::PreconditionFailed)); - } - } - debug!(object_size = info.size, part_count = info.parts.len(), "GET object metadata snapshot"); for part in &info.parts { debug!( @@ -4194,6 +4166,13 @@ impl S3 for FS { .. } = req.input.clone(); + if tagging.tag_set.len() > 10 { + // TOTO: Note that Amazon S3 limits the maximum number of tags to 10 tags per object. + // Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html + // Reference: https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_PutObjectTagging.html + // https://github.com/minio/mint/blob/master/run/core/aws-sdk-go-v2/main.go#L1647 + } + let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; diff --git a/rustfs/src/storage/options.rs b/rustfs/src/storage/options.rs index f2075a0f..aacd6089 100644 --- a/rustfs/src/storage/options.rs +++ b/rustfs/src/storage/options.rs @@ -93,6 +93,8 @@ pub async fn del_opts( .map(|v| v.to_str().unwrap() == "true") .unwrap_or_default(); + fill_conditional_writes_opts_from_header(headers, &mut opts)?; + Ok(opts) } @@ -133,6 +135,8 @@ pub async fn get_opts( opts.version_suspended = version_suspended; opts.versioned = versioned; + fill_conditional_writes_opts_from_header(headers, &mut opts)?; + Ok(opts) }