From e3a0a0749520eaf4faff61801e9ae361360ca99c Mon Sep 17 00:00:00 2001 From: loverustfs Date: Sun, 28 Dec 2025 09:41:32 +0800 Subject: [PATCH] fix: ensure version_id is returned in S3 response headers (#1272) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: houseme Co-authored-by: 安正超 --- crates/e2e_test/src/lib.rs | 3 + .../src/version_id_regression_test.rs | 398 ++++++++++++++++++ rustfs/src/storage/ecfs.rs | 12 +- 3 files changed, 410 insertions(+), 3 deletions(-) create mode 100644 crates/e2e_test/src/version_id_regression_test.rs diff --git a/crates/e2e_test/src/lib.rs b/crates/e2e_test/src/lib.rs index ac430785..70831fcf 100644 --- a/crates/e2e_test/src/lib.rs +++ b/crates/e2e_test/src/lib.rs @@ -18,6 +18,9 @@ mod reliant; #[cfg(test)] pub mod common; +#[cfg(test)] +mod version_id_regression_test; + // Data usage regression tests #[cfg(test)] mod data_usage_test; diff --git a/crates/e2e_test/src/version_id_regression_test.rs b/crates/e2e_test/src/version_id_regression_test.rs new file mode 100644 index 00000000..51dcd961 --- /dev/null +++ b/crates/e2e_test/src/version_id_regression_test.rs @@ -0,0 +1,398 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Regression test for Issue #1066: Veeam VBR - S3 returned empty versionId +//! +//! This test verifies that: +//! 1. PutObject returns version_id when versioning is enabled +//! 2. CopyObject returns version_id when versioning is enabled +//! 3. CompleteMultipartUpload returns version_id when versioning is enabled +//! 4. Basic S3 operations still work correctly (no regression) +//! 5. Operations on non-versioned buckets work as expected + +#[cfg(test)] +mod tests { + use crate::common::{RustFSTestEnvironment, init_logging}; + use aws_sdk_s3::Client; + use aws_sdk_s3::primitives::ByteStream; + use aws_sdk_s3::types::{BucketVersioningStatus, CompletedMultipartUpload, CompletedPart, VersioningConfiguration}; + use serial_test::serial; + use tracing::info; + + fn create_s3_client(env: &RustFSTestEnvironment) -> Client { + env.create_s3_client() + } + + async fn create_bucket(client: &Client, bucket: &str) -> Result<(), Box> { + match client.create_bucket().bucket(bucket).send().await { + Ok(_) => { + info!("✅ Bucket {} created successfully", bucket); + Ok(()) + } + Err(e) => { + if e.to_string().contains("BucketAlreadyOwnedByYou") || e.to_string().contains("BucketAlreadyExists") { + info!("ℹ️ Bucket {} already exists", bucket); + Ok(()) + } else { + Err(Box::new(e)) + } + } + } + } + + async fn enable_versioning(client: &Client, bucket: &str) -> Result<(), Box> { + let versioning_config = VersioningConfiguration::builder() + .status(BucketVersioningStatus::Enabled) + .build(); + + client + .put_bucket_versioning() + .bucket(bucket) + .versioning_configuration(versioning_config) + .send() + .await?; + + info!("✅ Versioning enabled for bucket {}", bucket); + Ok(()) + } + + /// Test 1: PutObject should return version_id when versioning is enabled + /// This directly addresses the Veeam issue from #1066 + #[tokio::test] + #[serial] + async fn test_put_object_returns_version_id_with_versioning() { + init_logging(); + info!("🧪 TEST: PutObject returns version_id with versioning enabled"); + + let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment"); + env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS"); + + let client = create_s3_client(&env); + let bucket = "test-put-version-id"; + + create_bucket(&client, bucket).await.expect("Failed to create bucket"); + enable_versioning(&client, bucket).await.expect("Failed to enable versioning"); + + let key = "test-file.txt"; + let content = b"Test content for version ID test"; + + info!("📤 Uploading object with key: {}", key); + let result = client + .put_object() + .bucket(bucket) + .key(key) + .body(ByteStream::from_static(content)) + .send() + .await; + + assert!(result.is_ok(), "PutObject failed: {:?}", result.err()); + let output = result.unwrap(); + + info!("📥 PutObject response - version_id: {:?}", output.version_id); + assert!( + output.version_id.is_some(), + "❌ FAILED: version_id should be present when versioning is enabled" + ); + assert!( + !output.version_id.as_ref().unwrap().is_empty(), + "❌ FAILED: version_id should not be empty" + ); + + info!("✅ PASSED: PutObject correctly returns version_id"); + } + + /// Test 2: CopyObject should return version_id when versioning is enabled + #[tokio::test] + #[serial] + async fn test_copy_object_returns_version_id_with_versioning() { + init_logging(); + info!("🧪 TEST: CopyObject returns version_id with versioning enabled"); + + let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment"); + env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS"); + + let client = create_s3_client(&env); + let bucket = "test-copy-version-id"; + + create_bucket(&client, bucket).await.expect("Failed to create bucket"); + enable_versioning(&client, bucket).await.expect("Failed to enable versioning"); + + let source_key = "source-file.txt"; + let dest_key = "dest-file.txt"; + let content = b"Content to copy"; + + // First, create source object + client + .put_object() + .bucket(bucket) + .key(source_key) + .body(ByteStream::from_static(content)) + .send() + .await + .expect("Failed to create source object"); + + info!("📤 Copying object from {} to {}", source_key, dest_key); + let copy_result = client + .copy_object() + .bucket(bucket) + .key(dest_key) + .copy_source(format!("{}/{}", bucket, source_key)) + .send() + .await; + + assert!(copy_result.is_ok(), "CopyObject failed: {:?}", copy_result.err()); + let output = copy_result.unwrap(); + + info!("📥 CopyObject response - version_id: {:?}", output.version_id); + assert!( + output.version_id.is_some(), + "❌ FAILED: version_id should be present when versioning is enabled" + ); + assert!( + !output.version_id.as_ref().unwrap().is_empty(), + "❌ FAILED: version_id should not be empty" + ); + + info!("✅ PASSED: CopyObject correctly returns version_id"); + } + + /// Test 3: CompleteMultipartUpload should return version_id when versioning is enabled + #[tokio::test] + #[serial] + async fn test_multipart_upload_returns_version_id_with_versioning() { + init_logging(); + info!("🧪 TEST: CompleteMultipartUpload returns version_id with versioning enabled"); + + let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment"); + env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS"); + + let client = create_s3_client(&env); + let bucket = "test-multipart-version-id"; + + create_bucket(&client, bucket).await.expect("Failed to create bucket"); + enable_versioning(&client, bucket).await.expect("Failed to enable versioning"); + + let key = "multipart-file.txt"; + let content = b"Part 1 content for multipart upload test"; + + info!("📤 Creating multipart upload for key: {}", key); + let create_result = client + .create_multipart_upload() + .bucket(bucket) + .key(key) + .send() + .await + .expect("Failed to create multipart upload"); + + let upload_id = create_result.upload_id().expect("No upload_id returned"); + + info!("📤 Uploading part 1"); + let upload_part_result = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .part_number(1) + .body(ByteStream::from_static(content)) + .send() + .await + .expect("Failed to upload part"); + + let etag = upload_part_result.e_tag().expect("No etag returned").to_string(); + + let completed_part = CompletedPart::builder().part_number(1).e_tag(etag).build(); + + let completed_upload = CompletedMultipartUpload::builder().parts(completed_part).build(); + + info!("📤 Completing multipart upload"); + let complete_result = client + .complete_multipart_upload() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .multipart_upload(completed_upload) + .send() + .await; + + assert!(complete_result.is_ok(), "CompleteMultipartUpload failed: {:?}", complete_result.err()); + let output = complete_result.unwrap(); + + info!("📥 CompleteMultipartUpload response - version_id: {:?}", output.version_id); + assert!( + output.version_id.is_some(), + "❌ FAILED: version_id should be present when versioning is enabled" + ); + assert!( + !output.version_id.as_ref().unwrap().is_empty(), + "❌ FAILED: version_id should not be empty" + ); + + info!("✅ PASSED: CompleteMultipartUpload correctly returns version_id"); + } + + /// Test 4: PutObject should NOT return version_id when versioning is NOT enabled + /// This ensures we didn't break non-versioned buckets + #[tokio::test] + #[serial] + async fn test_put_object_without_versioning() { + init_logging(); + info!("🧪 TEST: PutObject behavior without versioning (no regression)"); + + let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment"); + env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS"); + + let client = create_s3_client(&env); + let bucket = "test-no-versioning"; + + create_bucket(&client, bucket).await.expect("Failed to create bucket"); + // Note: NOT enabling versioning here + + let key = "test-file.txt"; + let content = b"Test content without versioning"; + + info!("📤 Uploading object to non-versioned bucket"); + let result = client + .put_object() + .bucket(bucket) + .key(key) + .body(ByteStream::from_static(content)) + .send() + .await; + + assert!(result.is_ok(), "PutObject failed: {:?}", result.err()); + let output = result.unwrap(); + + info!("📥 PutObject response - version_id: {:?}", output.version_id); + // version_id can be None or Some("null") for non-versioned buckets + info!("✅ PASSED: PutObject works correctly without versioning"); + } + + /// Test 5: Basic S3 operations still work correctly (no regression) + #[tokio::test] + #[serial] + async fn test_basic_s3_operations_no_regression() { + init_logging(); + info!("🧪 TEST: Basic S3 operations work correctly (no regression)"); + + let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment"); + env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS"); + + let client = create_s3_client(&env); + let bucket = "test-basic-operations"; + + create_bucket(&client, bucket).await.expect("Failed to create bucket"); + enable_versioning(&client, bucket).await.expect("Failed to enable versioning"); + + let key = "test-basic-file.txt"; + let content = b"Basic operations test content"; + + // Test PUT + info!("📤 Testing PUT operation"); + let put_result = client + .put_object() + .bucket(bucket) + .key(key) + .body(ByteStream::from_static(content)) + .send() + .await; + assert!(put_result.is_ok(), "PUT operation failed"); + let _version_id = put_result.unwrap().version_id; + + // Test GET + info!("📥 Testing GET operation"); + let get_result = client.get_object().bucket(bucket).key(key).send().await; + assert!(get_result.is_ok(), "GET operation failed"); + let body = get_result.unwrap().body.collect().await.unwrap().to_vec(); + assert_eq!(body, content, "Content mismatch after GET"); + + // Test HEAD + info!("📋 Testing HEAD operation"); + let head_result = client.head_object().bucket(bucket).key(key).send().await; + assert!(head_result.is_ok(), "HEAD operation failed"); + + // Test LIST + info!("📝 Testing LIST operation"); + let list_result = client.list_objects_v2().bucket(bucket).send().await; + assert!(list_result.is_ok(), "LIST operation failed"); + let list_output = list_result.unwrap(); + let objects = list_output.contents(); + assert!(objects.iter().any(|obj| obj.key() == Some(key)), "Object not found in LIST"); + + // Test DELETE + info!("🗑️ Testing DELETE operation"); + let delete_result = client.delete_object().bucket(bucket).key(key).send().await; + assert!(delete_result.is_ok(), "DELETE operation failed"); + + // Verify object is deleted (should return NoSuchKey or version marker) + let get_after_delete = client.get_object().bucket(bucket).key(key).send().await; + assert!( + get_after_delete.is_err() || get_after_delete.unwrap().delete_marker == Some(true), + "Object should be deleted or have delete marker" + ); + + info!("✅ PASSED: All basic S3 operations work correctly"); + } + + /// Test 6: Veeam-specific scenario simulation + /// Simulates the exact workflow that Veeam uses when backing up data + #[tokio::test] + #[serial] + async fn test_veeam_backup_workflow_simulation() { + init_logging(); + info!("🧪 TEST: Veeam VBR backup workflow simulation (Issue #1066)"); + + let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment"); + env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS"); + + let client = create_s3_client(&env); + let bucket = "veeam-backup-test"; + + create_bucket(&client, bucket).await.expect("Failed to create bucket"); + enable_versioning(&client, bucket).await.expect("Failed to enable versioning"); + + // Veeam typically creates multiple objects in a backup session + let test_paths = vec![ + "Veeam/Backup/Clients/test-client-id/test-backup-id/CloudStg/Meta/Blocks/History/CheckpointHistory.dat", + "Veeam/Backup/Clients/test-client-id/test-backup-id/Metadata/Lock/create.checkpoint/declare", + ]; + + for path in test_paths { + info!("📤 Simulating Veeam upload to: {}", path); + let content = format!("Veeam backup data for {}", path); + + let put_result = client + .put_object() + .bucket(bucket) + .key(path) + .body(ByteStream::from(content.into_bytes())) + .send() + .await; + + assert!(put_result.is_ok(), "Veeam upload failed for path: {}", path); + let output = put_result.unwrap(); + + info!("📥 Response version_id: {:?}", output.version_id); + assert!(output.version_id.is_some(), "❌ FAILED: Veeam expects version_id for path: {}", path); + assert!( + !output.version_id.as_ref().unwrap().is_empty(), + "❌ FAILED: version_id should not be empty for path: {}", + path + ); + + info!("✅ Veeam upload successful with version_id for: {}", path); + } + + info!("✅ PASSED: Veeam backup workflow simulation completed successfully"); + } +} diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index a1d4e770..28ad57d7 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1042,9 +1042,10 @@ impl S3 for FS { let dest_bucket = bucket.clone(); let dest_key = key.clone(); let dest_version = oi.version_id.map(|v| v.to_string()); + let dest_version_clone = dest_version.clone(); tokio::spawn(async move { manager - .invalidate_cache_versioned(&dest_bucket, &dest_key, dest_version.as_deref()) + .invalidate_cache_versioned(&dest_bucket, &dest_key, dest_version_clone.as_deref()) .await; }); @@ -1062,6 +1063,7 @@ impl S3 for FS { ssekms_key_id: effective_kms_key_id, sse_customer_algorithm, sse_customer_key_md5, + version_id: dest_version, ..Default::default() }; @@ -3362,9 +3364,10 @@ impl S3 for FS { helper = helper.version_id(version_id.clone()); } + let put_version_clone = put_version.clone(); tokio::spawn(async move { manager - .invalidate_cache_versioned(&put_bucket, &put_key, put_version.as_deref()) + .invalidate_cache_versioned(&put_bucket, &put_key, put_version_clone.as_deref()) .await; }); @@ -3423,6 +3426,7 @@ impl S3 for FS { checksum_sha1, checksum_sha256, checksum_crc64nvme, + version_id: put_version, ..Default::default() }; @@ -4287,9 +4291,10 @@ impl S3 for FS { let mpu_bucket = bucket.clone(); let mpu_key = key.clone(); let mpu_version = obj_info.version_id.map(|v| v.to_string()); + let mpu_version_clone = mpu_version.clone(); tokio::spawn(async move { manager - .invalidate_cache_versioned(&mpu_bucket, &mpu_key, mpu_version.as_deref()) + .invalidate_cache_versioned(&mpu_bucket, &mpu_key, mpu_version_clone.as_deref()) .await; }); @@ -4339,6 +4344,7 @@ impl S3 for FS { checksum_sha256: checksum_sha256.clone(), checksum_crc64nvme: checksum_crc64nvme.clone(), checksum_type: checksum_type.clone(), + version_id: mpu_version, ..Default::default() }; info!(