From c2d782bed16c963f37a294d97f527ec70712325a Mon Sep 17 00:00:00 2001 From: zzhpro <56196563+zzhpro@users.noreply.github.com> Date: Tue, 26 Aug 2025 09:35:24 +0800 Subject: [PATCH] feat: support conditional writes (#409) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: support conditional writes * refactor: avoid using unwrap * fix: obtain lock before check in CompleteMultiPartUpload * refactor: do not obtain a lock when getting object meta * fix: avoid using unwrap and modifying incoming arguments * test: add e2e tests for conditional writes --------- Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com> Co-authored-by: 安正超 Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com> --- .../src/reliant/conditional_writes.rs | 347 ++++++++++++++++++ crates/e2e_test/src/reliant/mod.rs | 1 + crates/ecstore/src/error.rs | 6 + crates/ecstore/src/set_disk.rs | 157 +++++++- crates/ecstore/src/store_api.rs | 7 + rustfs/src/error.rs | 1 + rustfs/src/storage/ecfs.rs | 3 +- rustfs/src/storage/options.rs | 37 +- 8 files changed, 556 insertions(+), 3 deletions(-) create mode 100644 crates/e2e_test/src/reliant/conditional_writes.rs diff --git a/crates/e2e_test/src/reliant/conditional_writes.rs b/crates/e2e_test/src/reliant/conditional_writes.rs new file mode 100644 index 00000000..df870f06 --- /dev/null +++ b/crates/e2e_test/src/reliant/conditional_writes.rs @@ -0,0 +1,347 @@ +#![cfg(test)] + +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::Client; +use aws_sdk_s3::config::{Credentials, Region}; +use aws_sdk_s3::error::SdkError; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; +use bytes::Bytes; +use serial_test::serial; +use std::error::Error; + +const ENDPOINT: &str = "http://localhost:9000"; +const ACCESS_KEY: &str = "rustfsadmin"; +const SECRET_KEY: &str = "rustfsadmin"; +const BUCKET: &str = "api-test"; + +async fn create_aws_s3_client() -> Result> { + let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1")); + let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region_provider) + .credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static")) + .endpoint_url(ENDPOINT) + .load() + .await; + + let client = Client::from_conf( + aws_sdk_s3::Config::from(&shared_config) + .to_builder() + .force_path_style(true) + .build(), + ); + Ok(client) +} + +/// Setup test bucket, creating it if it doesn't exist +async fn setup_test_bucket(client: &Client) -> Result<(), Box> { + match client.create_bucket().bucket(BUCKET).send().await { + Ok(_) => {} + Err(SdkError::ServiceError(e)) => { + let e = e.into_err(); + let error_code = e.meta().code().unwrap_or(""); + if !error_code.eq("BucketAlreadyExists") { + return Err(e.into()); + } + } + Err(e) => { + return Err(e.into()); + } + } + Ok(()) +} + +/// Generate test data of specified size +fn generate_test_data(size: usize) -> Vec { + let pattern = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + let mut data = Vec::with_capacity(size); + for i in 0..size { + data.push(pattern[i % pattern.len()]); + } + data +} + +/// Upload an object and return its ETag +async fn upload_object_with_metadata(client: &Client, bucket: &str, key: &str, data: &[u8]) -> Result> { + let response = client + .put_object() + .bucket(bucket) + .key(key) + .body(Bytes::from(data.to_vec()).into()) + .send() + .await?; + + let etag = response.e_tag().unwrap_or("").to_string(); + Ok(etag) +} + +/// Cleanup test objects from bucket +async fn cleanup_objects(client: &Client, bucket: &str, keys: &[&str]) { + for key in keys { + let _ = client.delete_object().bucket(bucket).key(*key).send().await; + } +} + +/// Generate unique test object key +fn generate_test_key(prefix: &str) -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); + format!("{prefix}-{timestamp}") +} + +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_conditional_put_okay() -> Result<(), Box> { + let client = create_aws_s3_client().await?; + setup_test_bucket(&client).await?; + + let test_key = generate_test_key("conditional-put-ok"); + let initial_data = generate_test_data(1024); // 1KB test data + let updated_data = generate_test_data(2048); // 2KB updated data + + // Upload initial object and get its ETag + let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &initial_data).await?; + + // Test 1: PUT with matching If-Match condition (should succeed) + let response1 = client + .put_object() + .bucket(BUCKET) + .key(&test_key) + .body(Bytes::from(updated_data.clone()).into()) + .if_match(&initial_etag) + .send() + .await; + assert!(response1.is_ok(), "PUT with matching If-Match should succeed"); + + // Test 2: PUT with non-matching If-None-Match condition (should succeed) + let fake_etag = "\"fake-etag-12345\""; + let response2 = client + .put_object() + .bucket(BUCKET) + .key(&test_key) + .body(Bytes::from(updated_data.clone()).into()) + .if_none_match(fake_etag) + .send() + .await; + assert!(response2.is_ok(), "PUT with non-matching If-None-Match should succeed"); + + // Cleanup + cleanup_objects(&client, BUCKET, &[&test_key]).await; + + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_conditional_put_failed() -> Result<(), Box> { + let client = create_aws_s3_client().await?; + setup_test_bucket(&client).await?; + + let test_key = generate_test_key("conditional-put-failed"); + let initial_data = generate_test_data(1024); + let updated_data = generate_test_data(2048); + + // Upload initial object and get its ETag + let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &initial_data).await?; + + // Test 1: PUT with non-matching If-Match condition (should fail with 412) + let fake_etag = "\"fake-etag-should-not-match\""; + let response1 = client + .put_object() + .bucket(BUCKET) + .key(&test_key) + .body(Bytes::from(updated_data.clone()).into()) + .if_match(fake_etag) + .send() + .await; + + assert!(response1.is_err(), "PUT with non-matching If-Match should fail"); + if let Err(e) = response1 { + if let SdkError::ServiceError(e) = e { + let e = e.into_err(); + let error_code = e.meta().code().unwrap_or(""); + assert_eq!("PreconditionFailed", error_code); + } else { + panic!("Unexpected error: {e:?}"); + } + } + + // Test 2: PUT with matching If-None-Match condition (should fail with 412) + let response2 = client + .put_object() + .bucket(BUCKET) + .key(&test_key) + .body(Bytes::from(updated_data.clone()).into()) + .if_none_match(&initial_etag) + .send() + .await; + + assert!(response2.is_err(), "PUT with matching If-None-Match should fail"); + if let Err(e) = response2 { + if let SdkError::ServiceError(e) = e { + let e = e.into_err(); + let error_code = e.meta().code().unwrap_or(""); + assert_eq!("PreconditionFailed", error_code); + } else { + panic!("Unexpected error: {e:?}"); + } + } + + // Cleanup - only need to clean up the initial object since failed PUTs shouldn't create objects + cleanup_objects(&client, BUCKET, &[&test_key]).await; + + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_conditional_put_when_object_does_not_exist() -> Result<(), Box> { + let client = create_aws_s3_client().await?; + setup_test_bucket(&client).await?; + + let key = "some_key"; + cleanup_objects(&client, BUCKET, &[key]).await; + + // When the object does not exist, the If-Match condition should always fail + let response1 = client + .put_object() + .bucket(BUCKET) + .key(key) + .body(Bytes::from(generate_test_data(1024)).into()) + .if_match("*") + .send() + .await; + assert!(response1.is_err()); + if let Err(e) = response1 { + if let SdkError::ServiceError(e) = e { + let e = e.into_err(); + let error_code = e.meta().code().unwrap_or(""); + assert_eq!("NoSuchKey", error_code); + } else { + panic!("Unexpected error: {e:?}"); + } + } + + // When the object does not exist, the If-None-Match condition should be able to succeed + let response2 = client + .put_object() + .bucket(BUCKET) + .key(key) + .body(Bytes::from(generate_test_data(1024)).into()) + .if_none_match("*") + .send() + .await; + assert!(response2.is_ok()); + + cleanup_objects(&client, BUCKET, &[key]).await; + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_conditional_multi_part_upload() -> Result<(), Box> { + let client = create_aws_s3_client().await?; + setup_test_bucket(&client).await?; + + let test_key = generate_test_key("multipart-upload-ok"); + let test_data = generate_test_data(1024); + let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &test_data).await?; + + let part_size = 5 * 1024 * 1024; // 5MB per part (minimum for multipart) + let num_parts = 3; + let mut parts = Vec::new(); + + // Initiate multipart upload + let initiate_response = client.create_multipart_upload().bucket(BUCKET).key(&test_key).send().await?; + + let upload_id = initiate_response + .upload_id() + .ok_or(std::io::Error::other("No upload ID returned"))?; + + // Upload parts + for part_number in 1..=num_parts { + let part_data = generate_test_data(part_size); + + let upload_part_response = client + .upload_part() + .bucket(BUCKET) + .key(&test_key) + .upload_id(upload_id) + .part_number(part_number) + .body(Bytes::from(part_data).into()) + .send() + .await?; + + let part_etag = upload_part_response + .e_tag() + .ok_or(std::io::Error::other("Do not have etag"))? + .to_string(); + + let completed_part = CompletedPart::builder().part_number(part_number).e_tag(part_etag).build(); + + parts.push(completed_part); + } + + // Complete multipart upload + let completed_upload = CompletedMultipartUpload::builder().set_parts(Some(parts)).build(); + + // Test 1: Multipart upload with wildcard If-None-Match, should fail + let complete_response = client + .complete_multipart_upload() + .bucket(BUCKET) + .key(&test_key) + .upload_id(upload_id) + .multipart_upload(completed_upload.clone()) + .if_none_match("*") + .send() + .await; + + assert!(complete_response.is_err()); + + // Test 2: Multipart upload with matching If-None-Match, should fail + let complete_response = client + .complete_multipart_upload() + .bucket(BUCKET) + .key(&test_key) + .upload_id(upload_id) + .multipart_upload(completed_upload.clone()) + .if_none_match(initial_etag.clone()) + .send() + .await; + + assert!(complete_response.is_err()); + + // Test 3: Multipart upload with unmatching If-Match, should fail + let complete_response = client + .complete_multipart_upload() + .bucket(BUCKET) + .key(&test_key) + .upload_id(upload_id) + .multipart_upload(completed_upload.clone()) + .if_match("\"abcdef\"") + .send() + .await; + + assert!(complete_response.is_err()); + + // Test 4: Multipart upload with matching If-Match, should succeed + let complete_response = client + .complete_multipart_upload() + .bucket(BUCKET) + .key(&test_key) + .upload_id(upload_id) + .multipart_upload(completed_upload.clone()) + .if_match(initial_etag) + .send() + .await; + + assert!(complete_response.is_ok()); + + // Cleanup + cleanup_objects(&client, BUCKET, &[&test_key]).await; + + Ok(()) +} diff --git a/crates/e2e_test/src/reliant/mod.rs b/crates/e2e_test/src/reliant/mod.rs index b8d05efd..017ecc88 100644 --- a/crates/e2e_test/src/reliant/mod.rs +++ b/crates/e2e_test/src/reliant/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod conditional_writes; mod lifecycle; mod lock; mod node_interact_test; diff --git a/crates/ecstore/src/error.rs b/crates/ecstore/src/error.rs index 43a52476..a6fd0547 100644 --- a/crates/ecstore/src/error.rs +++ b/crates/ecstore/src/error.rs @@ -187,6 +187,9 @@ pub enum StorageError { #[error("Lock error: {0}")] Lock(#[from] rustfs_lock::LockError), + + #[error("Precondition failed")] + PreconditionFailed, } impl StorageError { @@ -416,6 +419,7 @@ impl Clone for StorageError { StorageError::Lock(e) => StorageError::Lock(e.clone()), StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()), StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()), + StorageError::PreconditionFailed => StorageError::PreconditionFailed, } } } @@ -481,6 +485,7 @@ impl StorageError { StorageError::Lock(_) => 0x38, StorageError::InsufficientReadQuorum(_, _) => 0x39, StorageError::InsufficientWriteQuorum(_, _) => 0x3A, + StorageError::PreconditionFailed => 0x3B, } } @@ -548,6 +553,7 @@ impl StorageError { 0x38 => Some(StorageError::Lock(rustfs_lock::LockError::internal("Generic lock error".to_string()))), 0x39 => Some(StorageError::InsufficientReadQuorum(Default::default(), Default::default())), 0x3A => Some(StorageError::InsufficientWriteQuorum(Default::default(), Default::default())), + 0x3B => Some(StorageError::PreconditionFailed), _ => None, } } diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 58dc5ae0..f037db7a 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -61,6 +61,7 @@ use glob::Pattern; use http::HeaderMap; use md5::{Digest as Md5Digest, Md5}; use rand::{Rng, seq::SliceRandom}; +use regex::Regex; use rustfs_common::heal_channel::{DriveState, HealChannelPriority, HealItemType, HealOpts, HealScanMode, send_heal_disk}; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::{ @@ -3218,6 +3219,44 @@ impl SetDisks { obj?; Ok(()) } + + async fn check_write_precondition(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Option { + let mut opts = opts.clone(); + + let http_preconditions = opts.http_preconditions?; + opts.http_preconditions = None; + + // Never claim a lock here, to avoid deadlock + // - If no_lock is false, we must have obtained the lock out side of this function + // - If no_lock is true, we should not obtain locks + opts.no_lock = true; + let oi = self.get_object_info(bucket, object, &opts).await; + + match oi { + Ok(oi) => { + if should_prevent_write(&oi, http_preconditions.if_none_match, http_preconditions.if_match) { + return Some(StorageError::PreconditionFailed); + } + } + + Err(StorageError::VersionNotFound(_, _, _)) + | Err(StorageError::ObjectNotFound(_, _)) + | Err(StorageError::ErasureReadQuorum) => { + // When the object is not found, + // - if If-Match is set, we should return 404 NotFound + // - if If-None-Match is set, we should be able to proceed with the request + if http_preconditions.if_match.is_some() { + return Some(StorageError::ObjectNotFound(bucket.to_string(), object.to_string())); + } + } + + Err(e) => { + return Some(e); + } + } + + None + } } #[async_trait::async_trait] @@ -3335,6 +3374,12 @@ impl ObjectIO for SetDisks { _object_lock_guard = guard_opt; } + if let Some(http_preconditions) = opts.http_preconditions.clone() { + if let Some(err) = self.check_write_precondition(bucket, object, opts).await { + return Err(err); + } + } + let mut user_defined = opts.user_defined.clone(); let sc_parity_drives = { @@ -5123,6 +5168,26 @@ impl StorageAPI for SetDisks { let disks = disks.clone(); // let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution); + // Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop. + let mut _object_lock_guard: Option = None; + if let Some(http_preconditions) = opts.http_preconditions.clone() { + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _object_lock_guard = guard_opt; + } + + if let Some(err) = self.check_write_precondition(bucket, object, opts).await { + return Err(err); + } + } + let part_path = format!("{}/{}/", upload_id_path, fi.data_dir.unwrap_or(Uuid::nil())); let part_meta_paths = uploaded_parts @@ -5942,13 +6007,45 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String { format!("{:x}-{}", hasher.finalize(), parts.len()) } +pub fn canonicalize_etag(etag: &str) -> String { + let re = Regex::new("\"*?([^\"]*?)\"*?$").unwrap(); + re.replace_all(etag, "$1").to_string() +} + +pub fn e_tag_matches(etag: &str, condition: &str) -> bool { + if condition.trim() == "*" { + return true; + } + canonicalize_etag(etag) == canonicalize_etag(condition) +} + +pub fn should_prevent_write(oi: &ObjectInfo, if_none_match: Option, if_match: Option) -> bool { + match &oi.etag { + Some(etag) => { + if let Some(if_none_match) = if_none_match { + if e_tag_matches(etag, &if_none_match) { + return true; + } + } + if let Some(if_match) = if_match { + if !e_tag_matches(etag, &if_match) { + return true; + } + } + false + } + // If we can't obtain the etag of the object, perevent the write only when we have at least one condition + None => if_none_match.is_some() || if_match.is_some(), + } +} + #[cfg(test)] mod tests { use super::*; use crate::disk::CHECK_PART_UNKNOWN; use crate::disk::CHECK_PART_VOLUME_NOT_FOUND; use crate::disk::error::DiskError; - use crate::store_api::CompletePart; + use crate::store_api::{CompletePart, ObjectInfo}; use rustfs_filemeta::ErasureInfo; use std::collections::HashMap; use time::OffsetDateTime; @@ -6373,4 +6470,62 @@ mod tests { assert_eq!(result2.len(), 3); assert!(result2.iter().all(|d| d.is_none())); } + + #[test] + fn test_etag_matches() { + assert!(e_tag_matches("abc", "abc")); + assert!(e_tag_matches("\"abc\"", "abc")); + assert!(e_tag_matches("\"abc\"", "*")); + } + + #[test] + fn test_should_prevent_write() { + let oi = ObjectInfo { + etag: Some("abc".to_string()), + ..Default::default() + }; + let if_none_match = Some("abc".to_string()); + let if_match = None; + assert!(should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = Some("*".to_string()); + let if_match = None; + assert!(should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = None; + let if_match = Some("def".to_string()); + assert!(should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = None; + let if_match = Some("*".to_string()); + assert!(!should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = Some("def".to_string()); + let if_match = None; + assert!(!should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = Some("def".to_string()); + let if_match = Some("*".to_string()); + assert!(!should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = Some("def".to_string()); + let if_match = Some("\"abc\"".to_string()); + assert!(!should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = Some("*".to_string()); + let if_match = Some("\"abc\"".to_string()); + assert!(should_prevent_write(&oi, if_none_match, if_match)); + + let oi = ObjectInfo { + etag: None, + ..Default::default() + }; + let if_none_match = Some("*".to_string()); + let if_match = Some("\"abc\"".to_string()); + assert!(should_prevent_write(&oi, if_none_match, if_match)); + + let if_none_match = None; + let if_match = None; + assert!(!should_prevent_write(&oi, if_none_match, if_match)); + } } diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index ab299f3d..bfc51d59 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -303,6 +303,12 @@ impl HTTPRangeSpec { } } +#[derive(Debug, Default, Clone)] +pub struct HTTPPreconditions { + pub if_match: Option, + pub if_none_match: Option, +} + #[derive(Debug, Default, Clone)] pub struct ObjectOptions { // Use the maximum parity (N/2), used when saving server configuration files @@ -326,6 +332,7 @@ pub struct ObjectOptions { pub user_defined: HashMap, pub preserve_etag: Option, pub metadata_chg: bool, + pub http_preconditions: Option, pub replication_request: bool, pub delete_marker: bool, diff --git a/rustfs/src/error.rs b/rustfs/src/error.rs index 10e3940d..337903d5 100644 --- a/rustfs/src/error.rs +++ b/rustfs/src/error.rs @@ -81,6 +81,7 @@ impl From for ApiError { StorageError::DataMovementOverwriteErr(_, _, _) => S3ErrorCode::InvalidArgument, StorageError::ObjectExistsAsDirectory(_, _) => S3ErrorCode::InvalidArgument, StorageError::InvalidPart(_, _, _) => S3ErrorCode::InvalidPart, + StorageError::PreconditionFailed => S3ErrorCode::PreconditionFailed, _ => S3ErrorCode::InternalError, }; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 98fa3a63..15779cb6 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -21,6 +21,7 @@ use crate::error::ApiError; use crate::storage::access::ReqInfo; use crate::storage::options::copy_dst_opts; use crate::storage::options::copy_src_opts; +use crate::storage::options::get_complete_multipart_upload_opts; use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts, parse_copy_source_range}; use bytes::Bytes; use chrono::DateTime; @@ -1982,7 +1983,7 @@ impl S3 for FS { let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) }; - let opts = &ObjectOptions::default(); + let opts = &get_complete_multipart_upload_opts(&req.headers).map_err(ApiError::from)?; let mut uploaded_parts = Vec::new(); diff --git a/rustfs/src/storage/options.rs b/rustfs/src/storage/options.rs index a1955162..e917facd 100644 --- a/rustfs/src/storage/options.rs +++ b/rustfs/src/storage/options.rs @@ -16,7 +16,8 @@ use http::{HeaderMap, HeaderValue}; use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; use rustfs_ecstore::error::Result; use rustfs_ecstore::error::StorageError; -use rustfs_ecstore::store_api::{HTTPRangeSpec, ObjectOptions}; + +use rustfs_ecstore::store_api::{HTTPPreconditions, HTTPRangeSpec, ObjectOptions}; use rustfs_utils::path::is_dir_object; use s3s::{S3Result, s3_error}; use std::collections::HashMap; @@ -106,6 +107,32 @@ pub async fn get_opts( Ok(opts) } +fn fill_conditional_writes_opts_from_header(headers: &HeaderMap, opts: &mut ObjectOptions) -> Result<()> { + if headers.contains_key("If-None-Match") || headers.contains_key("If-Match") { + let mut preconditions = HTTPPreconditions::default(); + if let Some(if_none_match) = headers.get("If-None-Match") { + preconditions.if_none_match = Some( + if_none_match + .to_str() + .map_err(|_| StorageError::other("Invalid If-None-Match header"))? + .to_string(), + ); + } + if let Some(if_match) = headers.get("If-Match") { + preconditions.if_match = Some( + if_match + .to_str() + .map_err(|_| StorageError::other("Invalid If-Match header"))? + .to_string(), + ); + } + + opts.http_preconditions = Some(preconditions); + } + + Ok(()) +} + /// Creates options for putting an object in a bucket. pub async fn put_opts( bucket: &str, @@ -142,6 +169,14 @@ pub async fn put_opts( opts.version_suspended = version_suspended; opts.versioned = versioned; + fill_conditional_writes_opts_from_header(headers, &mut opts)?; + + Ok(opts) +} + +pub fn get_complete_multipart_upload_opts(headers: &HeaderMap) -> Result { + let mut opts = ObjectOptions::default(); + fill_conditional_writes_opts_from_header(headers, &mut opts)?; Ok(opts) }