mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-16 17:20:33 +00:00
feat: enhance error handling and add precondition checks for object o… (#1008)
This commit is contained in:
@@ -149,6 +149,12 @@ impl Erasure {
|
||||
break;
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
// Check if the inner error is a checksum mismatch - if so, propagate it
|
||||
if let Some(inner) = e.get_ref() {
|
||||
if rustfs_rio::is_checksum_mismatch(inner) {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -194,6 +194,12 @@ pub enum StorageError {
|
||||
#[error("Precondition failed")]
|
||||
PreconditionFailed,
|
||||
|
||||
#[error("Not modified")]
|
||||
NotModified,
|
||||
|
||||
#[error("Invalid part number: {0}")]
|
||||
InvalidPartNumber(usize),
|
||||
|
||||
#[error("Invalid range specified: {0}")]
|
||||
InvalidRangeSpec(String),
|
||||
}
|
||||
@@ -427,6 +433,8 @@ impl Clone for StorageError {
|
||||
StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()),
|
||||
StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()),
|
||||
StorageError::PreconditionFailed => StorageError::PreconditionFailed,
|
||||
StorageError::NotModified => StorageError::NotModified,
|
||||
StorageError::InvalidPartNumber(a) => StorageError::InvalidPartNumber(*a),
|
||||
StorageError::InvalidRangeSpec(a) => StorageError::InvalidRangeSpec(a.clone()),
|
||||
}
|
||||
}
|
||||
@@ -496,6 +504,8 @@ impl StorageError {
|
||||
StorageError::PreconditionFailed => 0x3B,
|
||||
StorageError::EntityTooSmall(_, _, _) => 0x3C,
|
||||
StorageError::InvalidRangeSpec(_) => 0x3D,
|
||||
StorageError::NotModified => 0x3E,
|
||||
StorageError::InvalidPartNumber(_) => 0x3F,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -566,6 +576,8 @@ impl StorageError {
|
||||
0x3B => Some(StorageError::PreconditionFailed),
|
||||
0x3C => Some(StorageError::EntityTooSmall(Default::default(), Default::default(), Default::default())),
|
||||
0x3D => Some(StorageError::InvalidRangeSpec(Default::default())),
|
||||
0x3E => Some(StorageError::NotModified),
|
||||
0x3F => Some(StorageError::InvalidPartNumber(Default::default())),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -679,6 +691,10 @@ pub fn is_err_data_movement_overwrite(err: &Error) -> bool {
|
||||
matches!(err, &StorageError::DataMovementOverwriteErr(_, _, _))
|
||||
}
|
||||
|
||||
pub fn is_err_io(err: &Error) -> bool {
|
||||
matches!(err, &StorageError::Io(_))
|
||||
}
|
||||
|
||||
pub fn is_all_not_found(errs: &[Option<Error>]) -> bool {
|
||||
for err in errs.iter() {
|
||||
if let Some(err) = err {
|
||||
|
||||
@@ -767,6 +767,12 @@ impl ECStore {
|
||||
|
||||
def_pool = pinfo.clone();
|
||||
has_def_pool = true;
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-deletes.html
|
||||
if is_err_object_not_found(err) {
|
||||
if let Err(err) = opts.precondition_check(&pinfo.object_info) {
|
||||
return Err(err.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !is_err_object_not_found(err) && !is_err_version_not_found(err) {
|
||||
return Err(err.clone());
|
||||
@@ -1392,6 +1398,7 @@ impl StorageAPI for ECStore {
|
||||
|
||||
let (info, _) = self.get_latest_object_info_with_idx(bucket, object.as_str(), opts).await?;
|
||||
|
||||
opts.precondition_check(&info)?;
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
|
||||
@@ -356,6 +356,8 @@ impl HTTPRangeSpec {
|
||||
pub struct HTTPPreconditions {
|
||||
pub if_match: Option<String>,
|
||||
pub if_none_match: Option<String>,
|
||||
pub if_modified_since: Option<OffsetDateTime>,
|
||||
pub if_unmodified_since: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
@@ -456,6 +458,76 @@ impl ObjectOptions {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn precondition_check(&self, obj_info: &ObjectInfo) -> Result<()> {
|
||||
let has_valid_mod_time = obj_info.mod_time.is_some_and(|t| t != OffsetDateTime::UNIX_EPOCH);
|
||||
|
||||
if let Some(part_number) = self.part_number {
|
||||
if part_number > 1 && !obj_info.parts.is_empty() {
|
||||
let part_found = obj_info.parts.iter().any(|pi| pi.number == part_number);
|
||||
if !part_found {
|
||||
return Err(Error::InvalidPartNumber(part_number));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(pre) = &self.http_preconditions {
|
||||
if let Some(if_none_match) = &pre.if_none_match {
|
||||
if let Some(etag) = &obj_info.etag {
|
||||
if is_etag_equal(etag, if_none_match) {
|
||||
return Err(Error::NotModified);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if has_valid_mod_time {
|
||||
if let Some(if_modified_since) = &pre.if_modified_since {
|
||||
if let Some(mod_time) = &obj_info.mod_time {
|
||||
if !is_modified_since(mod_time, if_modified_since) {
|
||||
return Err(Error::NotModified);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(if_match) = &pre.if_match {
|
||||
if let Some(etag) = &obj_info.etag {
|
||||
if !is_etag_equal(etag, if_match) {
|
||||
return Err(Error::PreconditionFailed);
|
||||
}
|
||||
} else {
|
||||
return Err(Error::PreconditionFailed);
|
||||
}
|
||||
}
|
||||
if has_valid_mod_time && pre.if_match.is_none() {
|
||||
if let Some(if_unmodified_since) = &pre.if_unmodified_since {
|
||||
if let Some(mod_time) = &obj_info.mod_time {
|
||||
if is_modified_since(mod_time, if_unmodified_since) {
|
||||
return Err(Error::PreconditionFailed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn is_etag_equal(etag1: &str, etag2: &str) -> bool {
|
||||
let e1 = etag1.trim_matches('"');
|
||||
let e2 = etag2.trim_matches('"');
|
||||
// Handle wildcard "*" - matches any ETag (per HTTP/1.1 RFC 7232)
|
||||
if e2 == "*" {
|
||||
return true;
|
||||
}
|
||||
e1 == e2
|
||||
}
|
||||
|
||||
fn is_modified_since(mod_time: &OffsetDateTime, given_time: &OffsetDateTime) -> bool {
|
||||
let mod_secs = mod_time.unix_timestamp();
|
||||
let given_secs = given_time.unix_timestamp();
|
||||
mod_secs > given_secs
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
|
||||
@@ -499,17 +499,18 @@ impl AsyncRead for HashReader {
|
||||
let content_hash = hasher.finalize();
|
||||
|
||||
if content_hash != expected_content_hash.raw {
|
||||
let expected_hex = hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower);
|
||||
let actual_hex = hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower);
|
||||
error!(
|
||||
"Content hash mismatch, type={:?}, encoded={:?}, expected={:?}, actual={:?}",
|
||||
expected_content_hash.checksum_type,
|
||||
expected_content_hash.encoded,
|
||||
hex_simd::encode_to_string(&expected_content_hash.raw, hex_simd::AsciiCase::Lower),
|
||||
hex_simd::encode_to_string(content_hash, hex_simd::AsciiCase::Lower)
|
||||
expected_content_hash.checksum_type, expected_content_hash.encoded, expected_hex, actual_hex
|
||||
);
|
||||
return Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"Content hash mismatch",
|
||||
)));
|
||||
// Use ChecksumMismatch error so that API layer can return BadDigest
|
||||
let checksum_err = crate::errors::ChecksumMismatch {
|
||||
want: expected_hex,
|
||||
got: actual_hex,
|
||||
};
|
||||
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, checksum_err)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,11 @@ pub async fn read_full<R: AsyncRead + Send + Sync + Unpin>(mut reader: R, mut bu
|
||||
if total == 0 {
|
||||
return Err(e);
|
||||
}
|
||||
// If the error is InvalidData (e.g., checksum mismatch), preserve it
|
||||
// instead of wrapping it as UnexpectedEof, so proper error handling can occur
|
||||
if e.kind() == std::io::ErrorKind::InvalidData {
|
||||
return Err(e);
|
||||
}
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
format!("read {total} bytes, error: {e}"),
|
||||
|
||||
@@ -192,6 +192,21 @@ impl From<ApiError> for S3Error {
|
||||
|
||||
impl From<StorageError> for ApiError {
|
||||
fn from(err: StorageError) -> Self {
|
||||
// Special handling for Io errors that may contain ChecksumMismatch
|
||||
if let StorageError::Io(ref io_err) = err {
|
||||
if let Some(inner) = io_err.get_ref() {
|
||||
if inner.downcast_ref::<rustfs_rio::ChecksumMismatch>().is_some()
|
||||
|| inner.downcast_ref::<rustfs_rio::BadDigest>().is_some()
|
||||
{
|
||||
return ApiError {
|
||||
code: S3ErrorCode::BadDigest,
|
||||
message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest),
|
||||
source: Some(Box::new(err)),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let code = match &err {
|
||||
StorageError::NotImplemented => S3ErrorCode::NotImplemented,
|
||||
StorageError::InvalidArgument(_, _, _) => S3ErrorCode::InvalidArgument,
|
||||
@@ -239,6 +254,23 @@ impl From<StorageError> for ApiError {
|
||||
|
||||
impl From<std::io::Error> for ApiError {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
// Check if the error is a ChecksumMismatch (BadDigest)
|
||||
if let Some(inner) = err.get_ref() {
|
||||
if inner.downcast_ref::<rustfs_rio::ChecksumMismatch>().is_some() {
|
||||
return ApiError {
|
||||
code: S3ErrorCode::BadDigest,
|
||||
message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest),
|
||||
source: Some(Box::new(err)),
|
||||
};
|
||||
}
|
||||
if inner.downcast_ref::<rustfs_rio::BadDigest>().is_some() {
|
||||
return ApiError {
|
||||
code: S3ErrorCode::BadDigest,
|
||||
message: ApiError::error_code_to_message(&S3ErrorCode::BadDigest),
|
||||
source: Some(Box::new(err)),
|
||||
};
|
||||
}
|
||||
}
|
||||
ApiError {
|
||||
code: S3ErrorCode::InternalError,
|
||||
message: err.to_string(),
|
||||
|
||||
@@ -1418,12 +1418,17 @@ impl S3 for FS {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let opts = ObjectOptions {
|
||||
version_id: object.version_id.map(|v| v.to_string()),
|
||||
versioned: version_cfg.prefix_enabled(&object.object_name),
|
||||
version_suspended: version_cfg.suspended(),
|
||||
..Default::default()
|
||||
};
|
||||
let metadata = extract_metadata(&req.headers);
|
||||
|
||||
let opts: ObjectOptions = del_opts(
|
||||
&bucket,
|
||||
&object.object_name,
|
||||
object.version_id.map(|f| f.to_string()),
|
||||
&req.headers,
|
||||
metadata,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let mut goi = ObjectInfo::default();
|
||||
let mut gerr = None;
|
||||
@@ -1684,10 +1689,6 @@ impl S3 for FS {
|
||||
version_id,
|
||||
part_number,
|
||||
range,
|
||||
if_none_match,
|
||||
if_match,
|
||||
if_modified_since,
|
||||
if_unmodified_since,
|
||||
..
|
||||
} = req.input.clone();
|
||||
|
||||
@@ -1880,35 +1881,6 @@ impl S3 for FS {
|
||||
|
||||
let info = reader.object_info;
|
||||
|
||||
if let Some(match_etag) = if_none_match {
|
||||
if info.etag.as_ref().is_some_and(|etag| etag == match_etag.as_str()) {
|
||||
return Err(S3Error::new(S3ErrorCode::NotModified));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(modified_since) = if_modified_since {
|
||||
// obj_time < givenTime + 1s
|
||||
if info.mod_time.is_some_and(|mod_time| {
|
||||
let give_time: OffsetDateTime = modified_since.into();
|
||||
mod_time < give_time.add(time::Duration::seconds(1))
|
||||
}) {
|
||||
return Err(S3Error::new(S3ErrorCode::NotModified));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(match_etag) = if_match {
|
||||
if info.etag.as_ref().is_some_and(|etag| etag != match_etag.as_str()) {
|
||||
return Err(S3Error::new(S3ErrorCode::PreconditionFailed));
|
||||
}
|
||||
} else if let Some(unmodified_since) = if_unmodified_since {
|
||||
if info.mod_time.is_some_and(|mod_time| {
|
||||
let give_time: OffsetDateTime = unmodified_since.into();
|
||||
mod_time > give_time.add(time::Duration::seconds(1))
|
||||
}) {
|
||||
return Err(S3Error::new(S3ErrorCode::PreconditionFailed));
|
||||
}
|
||||
}
|
||||
|
||||
debug!(object_size = info.size, part_count = info.parts.len(), "GET object metadata snapshot");
|
||||
for part in &info.parts {
|
||||
debug!(
|
||||
@@ -4194,6 +4166,13 @@ impl S3 for FS {
|
||||
..
|
||||
} = req.input.clone();
|
||||
|
||||
if tagging.tag_set.len() > 10 {
|
||||
// TOTO: Note that Amazon S3 limits the maximum number of tags to 10 tags per object.
|
||||
// Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
|
||||
// Reference: https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_PutObjectTagging.html
|
||||
// https://github.com/minio/mint/blob/master/run/core/aws-sdk-go-v2/main.go#L1647
|
||||
}
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
@@ -93,6 +93,8 @@ pub async fn del_opts(
|
||||
.map(|v| v.to_str().unwrap() == "true")
|
||||
.unwrap_or_default();
|
||||
|
||||
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
@@ -133,6 +135,8 @@ pub async fn get_opts(
|
||||
opts.version_suspended = version_suspended;
|
||||
opts.versioned = versioned;
|
||||
|
||||
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user