feat: support conditional writes (#409)

* feat: support conditional writes

* refactor: avoid using unwrap

* fix: obtain lock before check in CompleteMultiPartUpload

* refactor: do not obtain a lock when getting object meta

* fix: avoid using unwrap and modifying incoming arguments

* test: add e2e tests for conditional writes

---------

Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
This commit is contained in:
zzhpro
2025-08-26 09:35:24 +08:00
committed by GitHub
parent e00f5be746
commit c2d782bed1
8 changed files with 556 additions and 3 deletions

View File

@@ -0,0 +1,347 @@
#![cfg(test)]
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use bytes::Bytes;
use serial_test::serial;
use std::error::Error;
const ENDPOINT: &str = "http://localhost:9000";
const ACCESS_KEY: &str = "rustfsadmin";
const SECRET_KEY: &str = "rustfsadmin";
const BUCKET: &str = "api-test";
async fn create_aws_s3_client() -> Result<Client, Box<dyn Error>> {
let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1"));
let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region_provider)
.credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static"))
.endpoint_url(ENDPOINT)
.load()
.await;
let client = Client::from_conf(
aws_sdk_s3::Config::from(&shared_config)
.to_builder()
.force_path_style(true)
.build(),
);
Ok(client)
}
/// Setup test bucket, creating it if it doesn't exist
async fn setup_test_bucket(client: &Client) -> Result<(), Box<dyn Error>> {
match client.create_bucket().bucket(BUCKET).send().await {
Ok(_) => {}
Err(SdkError::ServiceError(e)) => {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
if !error_code.eq("BucketAlreadyExists") {
return Err(e.into());
}
}
Err(e) => {
return Err(e.into());
}
}
Ok(())
}
/// Generate test data of specified size
fn generate_test_data(size: usize) -> Vec<u8> {
let pattern = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
let mut data = Vec::with_capacity(size);
for i in 0..size {
data.push(pattern[i % pattern.len()]);
}
data
}
/// Upload an object and return its ETag
async fn upload_object_with_metadata(client: &Client, bucket: &str, key: &str, data: &[u8]) -> Result<String, Box<dyn Error>> {
let response = client
.put_object()
.bucket(bucket)
.key(key)
.body(Bytes::from(data.to_vec()).into())
.send()
.await?;
let etag = response.e_tag().unwrap_or("").to_string();
Ok(etag)
}
/// Cleanup test objects from bucket
async fn cleanup_objects(client: &Client, bucket: &str, keys: &[&str]) {
for key in keys {
let _ = client.delete_object().bucket(bucket).key(*key).send().await;
}
}
/// Generate unique test object key
fn generate_test_key(prefix: &str) -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
format!("{prefix}-{timestamp}")
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_put_okay() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let test_key = generate_test_key("conditional-put-ok");
let initial_data = generate_test_data(1024); // 1KB test data
let updated_data = generate_test_data(2048); // 2KB updated data
// Upload initial object and get its ETag
let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &initial_data).await?;
// Test 1: PUT with matching If-Match condition (should succeed)
let response1 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_match(&initial_etag)
.send()
.await;
assert!(response1.is_ok(), "PUT with matching If-Match should succeed");
// Test 2: PUT with non-matching If-None-Match condition (should succeed)
let fake_etag = "\"fake-etag-12345\"";
let response2 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_none_match(fake_etag)
.send()
.await;
assert!(response2.is_ok(), "PUT with non-matching If-None-Match should succeed");
// Cleanup
cleanup_objects(&client, BUCKET, &[&test_key]).await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_put_failed() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let test_key = generate_test_key("conditional-put-failed");
let initial_data = generate_test_data(1024);
let updated_data = generate_test_data(2048);
// Upload initial object and get its ETag
let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &initial_data).await?;
// Test 1: PUT with non-matching If-Match condition (should fail with 412)
let fake_etag = "\"fake-etag-should-not-match\"";
let response1 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_match(fake_etag)
.send()
.await;
assert!(response1.is_err(), "PUT with non-matching If-Match should fail");
if let Err(e) = response1 {
if let SdkError::ServiceError(e) = e {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
assert_eq!("PreconditionFailed", error_code);
} else {
panic!("Unexpected error: {e:?}");
}
}
// Test 2: PUT with matching If-None-Match condition (should fail with 412)
let response2 = client
.put_object()
.bucket(BUCKET)
.key(&test_key)
.body(Bytes::from(updated_data.clone()).into())
.if_none_match(&initial_etag)
.send()
.await;
assert!(response2.is_err(), "PUT with matching If-None-Match should fail");
if let Err(e) = response2 {
if let SdkError::ServiceError(e) = e {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
assert_eq!("PreconditionFailed", error_code);
} else {
panic!("Unexpected error: {e:?}");
}
}
// Cleanup - only need to clean up the initial object since failed PUTs shouldn't create objects
cleanup_objects(&client, BUCKET, &[&test_key]).await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_put_when_object_does_not_exist() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let key = "some_key";
cleanup_objects(&client, BUCKET, &[key]).await;
// When the object does not exist, the If-Match condition should always fail
let response1 = client
.put_object()
.bucket(BUCKET)
.key(key)
.body(Bytes::from(generate_test_data(1024)).into())
.if_match("*")
.send()
.await;
assert!(response1.is_err());
if let Err(e) = response1 {
if let SdkError::ServiceError(e) = e {
let e = e.into_err();
let error_code = e.meta().code().unwrap_or("");
assert_eq!("NoSuchKey", error_code);
} else {
panic!("Unexpected error: {e:?}");
}
}
// When the object does not exist, the If-None-Match condition should be able to succeed
let response2 = client
.put_object()
.bucket(BUCKET)
.key(key)
.body(Bytes::from(generate_test_data(1024)).into())
.if_none_match("*")
.send()
.await;
assert!(response2.is_ok());
cleanup_objects(&client, BUCKET, &[key]).await;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_conditional_multi_part_upload() -> Result<(), Box<dyn std::error::Error>> {
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
let test_key = generate_test_key("multipart-upload-ok");
let test_data = generate_test_data(1024);
let initial_etag = upload_object_with_metadata(&client, BUCKET, &test_key, &test_data).await?;
let part_size = 5 * 1024 * 1024; // 5MB per part (minimum for multipart)
let num_parts = 3;
let mut parts = Vec::new();
// Initiate multipart upload
let initiate_response = client.create_multipart_upload().bucket(BUCKET).key(&test_key).send().await?;
let upload_id = initiate_response
.upload_id()
.ok_or(std::io::Error::other("No upload ID returned"))?;
// Upload parts
for part_number in 1..=num_parts {
let part_data = generate_test_data(part_size);
let upload_part_response = client
.upload_part()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.part_number(part_number)
.body(Bytes::from(part_data).into())
.send()
.await?;
let part_etag = upload_part_response
.e_tag()
.ok_or(std::io::Error::other("Do not have etag"))?
.to_string();
let completed_part = CompletedPart::builder().part_number(part_number).e_tag(part_etag).build();
parts.push(completed_part);
}
// Complete multipart upload
let completed_upload = CompletedMultipartUpload::builder().set_parts(Some(parts)).build();
// Test 1: Multipart upload with wildcard If-None-Match, should fail
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_none_match("*")
.send()
.await;
assert!(complete_response.is_err());
// Test 2: Multipart upload with matching If-None-Match, should fail
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_none_match(initial_etag.clone())
.send()
.await;
assert!(complete_response.is_err());
// Test 3: Multipart upload with unmatching If-Match, should fail
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_match("\"abcdef\"")
.send()
.await;
assert!(complete_response.is_err());
// Test 4: Multipart upload with matching If-Match, should succeed
let complete_response = client
.complete_multipart_upload()
.bucket(BUCKET)
.key(&test_key)
.upload_id(upload_id)
.multipart_upload(completed_upload.clone())
.if_match(initial_etag)
.send()
.await;
assert!(complete_response.is_ok());
// Cleanup
cleanup_objects(&client, BUCKET, &[&test_key]).await;
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod conditional_writes;
mod lifecycle;
mod lock;
mod node_interact_test;

View File

@@ -187,6 +187,9 @@ pub enum StorageError {
#[error("Lock error: {0}")]
Lock(#[from] rustfs_lock::LockError),
#[error("Precondition failed")]
PreconditionFailed,
}
impl StorageError {
@@ -416,6 +419,7 @@ impl Clone for StorageError {
StorageError::Lock(e) => StorageError::Lock(e.clone()),
StorageError::InsufficientReadQuorum(a, b) => StorageError::InsufficientReadQuorum(a.clone(), b.clone()),
StorageError::InsufficientWriteQuorum(a, b) => StorageError::InsufficientWriteQuorum(a.clone(), b.clone()),
StorageError::PreconditionFailed => StorageError::PreconditionFailed,
}
}
}
@@ -481,6 +485,7 @@ impl StorageError {
StorageError::Lock(_) => 0x38,
StorageError::InsufficientReadQuorum(_, _) => 0x39,
StorageError::InsufficientWriteQuorum(_, _) => 0x3A,
StorageError::PreconditionFailed => 0x3B,
}
}
@@ -548,6 +553,7 @@ impl StorageError {
0x38 => Some(StorageError::Lock(rustfs_lock::LockError::internal("Generic lock error".to_string()))),
0x39 => Some(StorageError::InsufficientReadQuorum(Default::default(), Default::default())),
0x3A => Some(StorageError::InsufficientWriteQuorum(Default::default(), Default::default())),
0x3B => Some(StorageError::PreconditionFailed),
_ => None,
}
}

View File

@@ -61,6 +61,7 @@ use glob::Pattern;
use http::HeaderMap;
use md5::{Digest as Md5Digest, Md5};
use rand::{Rng, seq::SliceRandom};
use regex::Regex;
use rustfs_common::heal_channel::{DriveState, HealChannelPriority, HealItemType, HealOpts, HealScanMode, send_heal_disk};
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{
@@ -3218,6 +3219,44 @@ impl SetDisks {
obj?;
Ok(())
}
async fn check_write_precondition(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Option<StorageError> {
let mut opts = opts.clone();
let http_preconditions = opts.http_preconditions?;
opts.http_preconditions = None;
// Never claim a lock here, to avoid deadlock
// - If no_lock is false, we must have obtained the lock out side of this function
// - If no_lock is true, we should not obtain locks
opts.no_lock = true;
let oi = self.get_object_info(bucket, object, &opts).await;
match oi {
Ok(oi) => {
if should_prevent_write(&oi, http_preconditions.if_none_match, http_preconditions.if_match) {
return Some(StorageError::PreconditionFailed);
}
}
Err(StorageError::VersionNotFound(_, _, _))
| Err(StorageError::ObjectNotFound(_, _))
| Err(StorageError::ErasureReadQuorum) => {
// When the object is not found,
// - if If-Match is set, we should return 404 NotFound
// - if If-None-Match is set, we should be able to proceed with the request
if http_preconditions.if_match.is_some() {
return Some(StorageError::ObjectNotFound(bucket.to_string(), object.to_string()));
}
}
Err(e) => {
return Some(e);
}
}
None
}
}
#[async_trait::async_trait]
@@ -3335,6 +3374,12 @@ impl ObjectIO for SetDisks {
_object_lock_guard = guard_opt;
}
if let Some(http_preconditions) = opts.http_preconditions.clone() {
if let Some(err) = self.check_write_precondition(bucket, object, opts).await {
return Err(err);
}
}
let mut user_defined = opts.user_defined.clone();
let sc_parity_drives = {
@@ -5123,6 +5168,26 @@ impl StorageAPI for SetDisks {
let disks = disks.clone();
// let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
let mut _object_lock_guard: Option<rustfs_lock::LockGuard> = None;
if let Some(http_preconditions) = opts.http_preconditions.clone() {
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_object_lock_guard = guard_opt;
}
if let Some(err) = self.check_write_precondition(bucket, object, opts).await {
return Err(err);
}
}
let part_path = format!("{}/{}/", upload_id_path, fi.data_dir.unwrap_or(Uuid::nil()));
let part_meta_paths = uploaded_parts
@@ -5942,13 +6007,45 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
format!("{:x}-{}", hasher.finalize(), parts.len())
}
pub fn canonicalize_etag(etag: &str) -> String {
let re = Regex::new("\"*?([^\"]*?)\"*?$").unwrap();
re.replace_all(etag, "$1").to_string()
}
pub fn e_tag_matches(etag: &str, condition: &str) -> bool {
if condition.trim() == "*" {
return true;
}
canonicalize_etag(etag) == canonicalize_etag(condition)
}
pub fn should_prevent_write(oi: &ObjectInfo, if_none_match: Option<String>, if_match: Option<String>) -> bool {
match &oi.etag {
Some(etag) => {
if let Some(if_none_match) = if_none_match {
if e_tag_matches(etag, &if_none_match) {
return true;
}
}
if let Some(if_match) = if_match {
if !e_tag_matches(etag, &if_match) {
return true;
}
}
false
}
// If we can't obtain the etag of the object, perevent the write only when we have at least one condition
None => if_none_match.is_some() || if_match.is_some(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::disk::CHECK_PART_UNKNOWN;
use crate::disk::CHECK_PART_VOLUME_NOT_FOUND;
use crate::disk::error::DiskError;
use crate::store_api::CompletePart;
use crate::store_api::{CompletePart, ObjectInfo};
use rustfs_filemeta::ErasureInfo;
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -6373,4 +6470,62 @@ mod tests {
assert_eq!(result2.len(), 3);
assert!(result2.iter().all(|d| d.is_none()));
}
#[test]
fn test_etag_matches() {
assert!(e_tag_matches("abc", "abc"));
assert!(e_tag_matches("\"abc\"", "abc"));
assert!(e_tag_matches("\"abc\"", "*"));
}
#[test]
fn test_should_prevent_write() {
let oi = ObjectInfo {
etag: Some("abc".to_string()),
..Default::default()
};
let if_none_match = Some("abc".to_string());
let if_match = None;
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("*".to_string());
let if_match = None;
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = None;
let if_match = Some("def".to_string());
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = None;
let if_match = Some("*".to_string());
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("def".to_string());
let if_match = None;
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("def".to_string());
let if_match = Some("*".to_string());
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("def".to_string());
let if_match = Some("\"abc\"".to_string());
assert!(!should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = Some("*".to_string());
let if_match = Some("\"abc\"".to_string());
assert!(should_prevent_write(&oi, if_none_match, if_match));
let oi = ObjectInfo {
etag: None,
..Default::default()
};
let if_none_match = Some("*".to_string());
let if_match = Some("\"abc\"".to_string());
assert!(should_prevent_write(&oi, if_none_match, if_match));
let if_none_match = None;
let if_match = None;
assert!(!should_prevent_write(&oi, if_none_match, if_match));
}
}

View File

@@ -303,6 +303,12 @@ impl HTTPRangeSpec {
}
}
#[derive(Debug, Default, Clone)]
pub struct HTTPPreconditions {
pub if_match: Option<String>,
pub if_none_match: Option<String>,
}
#[derive(Debug, Default, Clone)]
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files
@@ -326,6 +332,7 @@ pub struct ObjectOptions {
pub user_defined: HashMap<String, String>,
pub preserve_etag: Option<String>,
pub metadata_chg: bool,
pub http_preconditions: Option<HTTPPreconditions>,
pub replication_request: bool,
pub delete_marker: bool,

View File

@@ -81,6 +81,7 @@ impl From<StorageError> for ApiError {
StorageError::DataMovementOverwriteErr(_, _, _) => S3ErrorCode::InvalidArgument,
StorageError::ObjectExistsAsDirectory(_, _) => S3ErrorCode::InvalidArgument,
StorageError::InvalidPart(_, _, _) => S3ErrorCode::InvalidPart,
StorageError::PreconditionFailed => S3ErrorCode::PreconditionFailed,
_ => S3ErrorCode::InternalError,
};

View File

@@ -21,6 +21,7 @@ use crate::error::ApiError;
use crate::storage::access::ReqInfo;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::get_complete_multipart_upload_opts;
use crate::storage::options::{extract_metadata_from_mime_with_object_name, get_opts, parse_copy_source_range};
use bytes::Bytes;
use chrono::DateTime;
@@ -1982,7 +1983,7 @@ impl S3 for FS {
let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
let opts = &ObjectOptions::default();
let opts = &get_complete_multipart_upload_opts(&req.headers).map_err(ApiError::from)?;
let mut uploaded_parts = Vec::new();

View File

@@ -16,7 +16,8 @@ use http::{HeaderMap, HeaderValue};
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::error::Result;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::store_api::{HTTPRangeSpec, ObjectOptions};
use rustfs_ecstore::store_api::{HTTPPreconditions, HTTPRangeSpec, ObjectOptions};
use rustfs_utils::path::is_dir_object;
use s3s::{S3Result, s3_error};
use std::collections::HashMap;
@@ -106,6 +107,32 @@ pub async fn get_opts(
Ok(opts)
}
fn fill_conditional_writes_opts_from_header(headers: &HeaderMap<HeaderValue>, opts: &mut ObjectOptions) -> Result<()> {
if headers.contains_key("If-None-Match") || headers.contains_key("If-Match") {
let mut preconditions = HTTPPreconditions::default();
if let Some(if_none_match) = headers.get("If-None-Match") {
preconditions.if_none_match = Some(
if_none_match
.to_str()
.map_err(|_| StorageError::other("Invalid If-None-Match header"))?
.to_string(),
);
}
if let Some(if_match) = headers.get("If-Match") {
preconditions.if_match = Some(
if_match
.to_str()
.map_err(|_| StorageError::other("Invalid If-Match header"))?
.to_string(),
);
}
opts.http_preconditions = Some(preconditions);
}
Ok(())
}
/// Creates options for putting an object in a bucket.
pub async fn put_opts(
bucket: &str,
@@ -142,6 +169,14 @@ pub async fn put_opts(
opts.version_suspended = version_suspended;
opts.versioned = versioned;
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
Ok(opts)
}
pub fn get_complete_multipart_upload_opts(headers: &HeaderMap<HeaderValue>) -> Result<ObjectOptions> {
let mut opts = ObjectOptions::default();
fill_conditional_writes_opts_from_header(headers, &mut opts)?;
Ok(opts)
}