Compare commits

...

1 Commits

Author SHA1 Message Date
安正超
639bf0c233 Revert "feat(append): implement object append operations with state tracking (#599)" (#646)
This reverts commit 4f73760a45.
2025-10-12 23:47:51 +08:00
19 changed files with 240 additions and 4163 deletions

2
Cargo.lock generated
View File

@@ -2923,7 +2923,6 @@ dependencies = [
"chrono",
"flatbuffers",
"futures",
"http 1.3.1",
"md5",
"rand 0.9.2",
"reqwest",
@@ -6645,7 +6644,6 @@ dependencies = [
"rustfs-utils",
"s3s",
"serde",
"serde_json",
"thiserror 2.0.17",
"time",
"tokio",

View File

@@ -49,5 +49,4 @@ uuid = { workspace = true }
base64 = { workspace = true }
rand = { workspace = true }
chrono = { workspace = true }
http.workspace = true
md5 = { workspace = true }

View File

@@ -13,16 +13,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! 分片上传加密功能的分步测试用例
//!
//! 这个测试套件将验证分片上传加密功能的每一个步骤:
//! 1. 测试基础的单分片加密(验证加密基础逻辑)
//! 2. 测试多分片上传(验证分片拼接逻辑)
//! 3. 测试加密元数据的保存和读取
//! 4. 测试完整的分片上传加密流程
use super::common::LocalKMSTestEnvironment;
use crate::common::{TEST_BUCKET, init_logging};
use serial_test::serial;
use tracing::{debug, info};
/// 步骤1测试基础单文件加密功能确保SSE-S3在非分片场景下正常工作
#[tokio::test]
#[serial]
async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 step1: test basic single file encryption");
info!("🧪 步骤1测试基础单文件加密功能");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -31,11 +40,11 @@ async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::er
let s3_client = kms_env.base_env.create_s3_client();
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
// test small file encryption (should inline store)
// 测试小文件加密(应该会内联存储)
let test_data = b"Hello, this is a small test file for SSE-S3!";
let object_key = "test-single-file-encrypted";
info!("📤 step1: upload small file ({}) with SSE-S3 encryption", test_data.len());
info!("📤 上传小文件({}字节启用SSE-S3加密", test_data.len());
let put_response = s3_client
.put_object()
.bucket(TEST_BUCKET)
@@ -45,41 +54,41 @@ async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::er
.send()
.await?;
debug!("PUT response ETag: {:?}", put_response.e_tag());
debug!("PUT response SSE: {:?}", put_response.server_side_encryption());
debug!("PUT响应ETag: {:?}", put_response.e_tag());
debug!("PUT响应SSE: {:?}", put_response.server_side_encryption());
// verify PUT response contains correct encryption header
// 验证PUT响应包含正确的加密头
assert_eq!(
put_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
);
info!("📥 step1: download file and verify encryption status");
info!("📥 下载文件并验证加密状态");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
debug!("GET response SSE: {:?}", get_response.server_side_encryption());
debug!("GET响应SSE: {:?}", get_response.server_side_encryption());
// verify GET response contains correct encryption header
// 验证GET响应包含正确的加密头
assert_eq!(
get_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
);
// verify data integrity
// 验证数据完整性
let downloaded_data = get_response.body.collect().await?.into_bytes();
assert_eq!(&downloaded_data[..], test_data);
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("step1: basic single file encryption works as expected");
info!("步骤1通过基础单文件加密功能正常");
Ok(())
}
/// test basic multipart upload without encryption
/// 步骤2测试不加密的分片上传确保分片上传基础功能正常
#[tokio::test]
#[serial]
async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 step2: test basic multipart upload without encryption");
info!("🧪 步骤2测试不加密的分片上传");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -93,16 +102,12 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
let total_parts = 2;
let total_size = part_size * total_parts;
// generate test data (with clear pattern for easy verification)
// 生成测试数据(有明显的模式便于验证)
let test_data: Vec<u8> = (0..total_size).map(|i| (i % 256) as u8).collect();
info!(
"🚀 step2: start multipart upload (no encryption) with {} parts, each {}MB",
total_parts,
part_size / (1024 * 1024)
);
info!("🚀 开始分片上传(无加密):{} parts每个 {}MB", total_parts, part_size / (1024 * 1024));
// step1: create multipart upload
// 步骤1创建分片上传
let create_multipart_output = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
@@ -111,16 +116,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
.await?;
let upload_id = create_multipart_output.upload_id().unwrap();
info!("📋 step2: create multipart upload, ID: {}", upload_id);
info!("📋 创建分片上传,ID: {}", upload_id);
// step2: upload each part
// 步骤2上传各个分片
let mut completed_parts = Vec::new();
for part_number in 1..=total_parts {
let start = (part_number - 1) * part_size;
let end = std::cmp::min(start + part_size, total_size);
let part_data = &test_data[start..end];
info!("📤 step2: upload part {} ({} bytes)", part_number, part_data.len());
info!("📤 上传分片 {} ({} bytes)", part_number, part_data.len());
let upload_part_output = s3_client
.upload_part()
@@ -140,15 +145,15 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
.build(),
);
debug!("step2: part {} uploaded, ETag: {}", part_number, etag);
debug!("分片 {} 上传完成,ETag: {}", part_number, etag);
}
// step3: complete multipart upload
// 步骤3完成分片上传
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
info!("🔗 step2: complete multipart upload");
info!("🔗 完成分片上传");
let complete_output = s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
@@ -158,16 +163,10 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
.send()
.await?;
debug!("step2: multipart upload completed, ETag: {:?}", complete_output.e_tag());
debug!("完成分片上传,ETag: {:?}", complete_output.e_tag());
// step4: verify multipart upload completed successfully
assert_eq!(
complete_output.e_tag().unwrap().to_string(),
format!("\"{}-{}-{}\"", object_key, upload_id, total_parts)
);
// verify data integrity
info!("📥 step2: download file and verify data integrity");
// 步骤4下载并验证
info!("📥 下载文件并验证数据完整性");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
let downloaded_data = get_response.body.collect().await?.into_bytes();
@@ -175,16 +174,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
assert_eq!(&downloaded_data[..], &test_data[..]);
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("step2: basic multipart upload without encryption works as expected");
info!("步骤2通过不加密的分片上传功能正常");
Ok(())
}
/// test multipart upload with SSE-S3 encryption
/// 步骤3测试分片上传 + SSE-S3加密重点测试
#[tokio::test]
#[serial]
async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 step3: test multipart upload with SSE-S3 encryption");
info!("🧪 步骤3测试分片上传 + SSE-S3加密");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -198,16 +197,16 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
let total_parts = 2;
let total_size = part_size * total_parts;
// generate test data (with clear pattern for easy verification)
// 生成测试数据
let test_data: Vec<u8> = (0..total_size).map(|i| ((i / 1000) % 256) as u8).collect();
info!(
"🔐 step3: start multipart upload with SSE-S3 encryption: {} parts, each {}MB",
"🔐 开始分片上传SSE-S3加密{} parts,每个 {}MB",
total_parts,
part_size / (1024 * 1024)
);
// step1: create multipart upload and enable SSE-S3
// 步骤1创建分片上传并启用SSE-S3
let create_multipart_output = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
@@ -217,24 +216,24 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
.await?;
let upload_id = create_multipart_output.upload_id().unwrap();
info!("📋 step3: create multipart upload with SSE-S3 encryption, ID: {}", upload_id);
info!("📋 创建加密分片上传,ID: {}", upload_id);
// step2: verify CreateMultipartUpload response (SSE-S3 header should be included)
// 验证CreateMultipartUpload响应如果有SSE头的话
if let Some(sse) = create_multipart_output.server_side_encryption() {
debug!("CreateMultipartUpload response contains SSE header: {:?}", sse);
debug!("CreateMultipartUpload包含SSE响应: {:?}", sse);
assert_eq!(sse, &aws_sdk_s3::types::ServerSideEncryption::Aes256);
} else {
debug!("CreateMultipartUpload response does not contain SSE header (some implementations may return empty string)");
debug!("CreateMultipartUpload不包含SSE响应头某些实现中正常");
}
// step2: upload each part
// 步骤2上传各个分片
let mut completed_parts = Vec::new();
for part_number in 1..=total_parts {
let start = (part_number - 1) * part_size;
let end = std::cmp::min(start + part_size, total_size);
let part_data = &test_data[start..end];
info!("🔐 step3: upload encrypted part {} ({} bytes)", part_number, part_data.len());
info!("🔐 上传加密分片 {} ({} bytes)", part_number, part_data.len());
let upload_part_output = s3_client
.upload_part()
@@ -254,15 +253,15 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
.build(),
);
debug!("step3: part {} uploaded, ETag: {}", part_number, etag);
debug!("加密分片 {} 上传完成,ETag: {}", part_number, etag);
}
// step3: complete multipart upload
// 步骤3完成分片上传
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
info!("🔗 step3: complete multipart upload with SSE-S3 encryption");
info!("🔗 完成加密分片上传");
let complete_output = s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
@@ -272,46 +271,43 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
.send()
.await?;
debug!(
"step3: complete multipart upload with SSE-S3 encryption, ETag: {:?}",
complete_output.e_tag()
);
debug!("完成加密分片上传ETag: {:?}", complete_output.e_tag());
// step4: HEAD request to check metadata
info!("📋 step4: check object metadata");
// 步骤4HEAD请求检查元数据
info!("📋 检查对象元数据");
let head_response = s3_client.head_object().bucket(TEST_BUCKET).key(object_key).send().await?;
debug!("HEAD response SSE: {:?}", head_response.server_side_encryption());
debug!("HEAD response metadata: {:?}", head_response.metadata());
debug!("HEAD响应 SSE: {:?}", head_response.server_side_encryption());
debug!("HEAD响应 元数据: {:?}", head_response.metadata());
// step5: GET request to download and verify
info!("📥 step5: download encrypted file and verify");
// 步骤5GET请求下载并验证
info!("📥 下载加密文件并验证");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
debug!("GET response SSE: {:?}", get_response.server_side_encryption());
debug!("GET响应 SSE: {:?}", get_response.server_side_encryption());
// step5: verify GET response contains SSE-S3 encryption header
// 🎯 关键验证GET响应必须包含SSE-S3加密头
assert_eq!(
get_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
);
// step5: verify downloaded data matches original test data
// 验证数据完整性
let downloaded_data = get_response.body.collect().await?.into_bytes();
assert_eq!(downloaded_data.len(), total_size);
assert_eq!(&downloaded_data[..], &test_data[..]);
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("step3: multipart upload with SSE-S3 encryption function is normal");
info!("步骤3通过分片上传 + SSE-S3加密功能正常");
Ok(())
}
/// step4: test larger multipart upload with encryption (streaming encryption)
/// 步骤4测试更大的分片上传测试流式加密
#[tokio::test]
#[serial]
async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 step4: test larger multipart upload with encryption (streaming encryption)");
info!("🧪 步骤4测试大文件分片上传加密");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -326,13 +322,13 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
let total_size = part_size * total_parts;
info!(
"🗂️ step4: generate large test data: {} parts, each {}MB, total {}MB",
"🗂️ 生成大文件测试数据:{} parts每个 {}MB总计 {}MB",
total_parts,
part_size / (1024 * 1024),
total_size / (1024 * 1024)
);
// step4: generate large test data (using complex pattern for verification)
// 生成大文件测试数据(使用复杂模式便于验证)
let test_data: Vec<u8> = (0..total_size)
.map(|i| {
let part_num = i / part_size;
@@ -341,9 +337,9 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
})
.collect();
info!("🔐 step4: start large multipart upload with encryption (SSE-S3)");
info!("🔐 开始大文件分片上传(SSE-S3加密)");
// step4: create multipart upload
// 创建分片上传
let create_multipart_output = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
@@ -353,9 +349,9 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
.await?;
let upload_id = create_multipart_output.upload_id().unwrap();
info!("📋 step4: create multipart upload with encryption (SSE-S3), ID: {}", upload_id);
info!("📋 创建大文件加密分片上传,ID: {}", upload_id);
// step4: upload parts
// 上传各个分片
let mut completed_parts = Vec::new();
for part_number in 1..=total_parts {
let start = (part_number - 1) * part_size;
@@ -363,7 +359,7 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
let part_data = &test_data[start..end];
info!(
"🔐 step4: upload part {} ({:.2}MB)",
"🔐 上传大文件加密分片 {} ({:.2}MB)",
part_number,
part_data.len() as f64 / (1024.0 * 1024.0)
);
@@ -386,15 +382,15 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
.build(),
);
debug!("step4: upload part {} completed, ETag: {}", part_number, etag);
debug!("大文件加密分片 {} 上传完成,ETag: {}", part_number, etag);
}
// step4: complete multipart upload
// 完成分片上传
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
info!("🔗 step4: complete multipart upload with encryption (SSE-S3)");
info!("🔗 完成大文件加密分片上传");
let complete_output = s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
@@ -404,46 +400,40 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
.send()
.await?;
debug!(
"step4: complete multipart upload with encryption (SSE-S3), ETag: {:?}",
complete_output.e_tag()
);
debug!("完成大文件加密分片上传ETag: {:?}", complete_output.e_tag());
// step4: download and verify
info!("📥 step4: download and verify large multipart upload with encryption (SSE-S3)");
// 下载并验证
info!("📥 下载大文件并验证");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
// step4: verify encryption header
// 验证加密头
assert_eq!(
get_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
);
// step4: verify data integrity
// 验证数据完整性
let downloaded_data = get_response.body.collect().await?.into_bytes();
assert_eq!(downloaded_data.len(), total_size);
// step4: verify data matches original test data
// 逐字节验证数据(对于大文件更严格)
for (i, (&actual, &expected)) in downloaded_data.iter().zip(test_data.iter()).enumerate() {
if actual != expected {
panic!(
"step4: large multipart upload with encryption (SSE-S3) data mismatch at byte {}: actual={}, expected={}",
i, actual, expected
);
panic!("大文件数据在第{i}字节不匹配: 实际={actual}, 期待={expected}");
}
}
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("step4: large multipart upload with encryption (SSE-S3) functionality normal");
info!("步骤4通过大文件分片上传加密功能正常");
Ok(())
}
/// step5: test all encryption types multipart upload
/// 步骤5测试所有加密类型的分片上传
#[tokio::test]
#[serial]
async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 step5: test all encryption types multipart upload");
info!("🧪 步骤5测试所有加密类型的分片上传");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -456,8 +446,8 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
let total_parts = 2;
let total_size = part_size * total_parts;
// step5: test SSE-KMS multipart upload
info!("🔐 step5: test SSE-KMS multipart upload");
// 测试SSE-KMS
info!("🔐 测试 SSE-KMS 分片上传");
test_multipart_encryption_type(
&s3_client,
TEST_BUCKET,
@@ -469,8 +459,8 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
)
.await?;
// step5: test SSE-C multipart upload
info!("🔐 step5: test SSE-C multipart upload");
// 测试SSE-C
info!("🔐 测试 SSE-C 分片上传");
test_multipart_encryption_type(
&s3_client,
TEST_BUCKET,
@@ -483,7 +473,7 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
.await?;
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("step5: all encryption types multipart upload functionality normal");
info!("步骤5通过所有加密类型的分片上传功能正常");
Ok(())
}
@@ -493,7 +483,7 @@ enum EncryptionType {
SSEC,
}
/// step5: test specific encryption type multipart upload
/// 辅助函数:测试特定加密类型的分片上传
async fn test_multipart_encryption_type(
s3_client: &aws_sdk_s3::Client,
bucket: &str,
@@ -503,10 +493,10 @@ async fn test_multipart_encryption_type(
total_parts: usize,
encryption_type: EncryptionType,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// step5: generate test data
// 生成测试数据
let test_data: Vec<u8> = (0..total_size).map(|i| ((i * 7) % 256) as u8).collect();
// step5: prepare SSE-C key and MD5 (if needed)
// 准备SSE-C所需的密钥如果需要
let (sse_c_key, sse_c_md5) = if matches!(encryption_type, EncryptionType::SSEC) {
let key = "01234567890123456789012345678901";
let key_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, key);
@@ -516,10 +506,9 @@ async fn test_multipart_encryption_type(
(None, None)
};
// step5: create multipart upload
info!("🔗 step5: create multipart upload with encryption {:?}", encryption_type);
info!("📋 创建分片上传 - {:?}", encryption_type);
// step5: create multipart upload request
// 创建分片上传
let mut create_request = s3_client.create_multipart_upload().bucket(bucket).key(object_key);
create_request = match encryption_type {
@@ -533,6 +522,7 @@ async fn test_multipart_encryption_type(
let create_multipart_output = create_request.send().await?;
let upload_id = create_multipart_output.upload_id().unwrap();
// 上传分片
let mut completed_parts = Vec::new();
for part_number in 1..=total_parts {
let start = (part_number - 1) * part_size;
@@ -547,7 +537,7 @@ async fn test_multipart_encryption_type(
.part_number(part_number as i32)
.body(aws_sdk_s3::primitives::ByteStream::from(part_data.to_vec()));
// step5: include SSE-C key and MD5 in each UploadPart request (if needed)
// SSE-C需要在每个UploadPart请求中包含密钥
if matches!(encryption_type, EncryptionType::SSEC) {
upload_request = upload_request
.sse_customer_algorithm("AES256")
@@ -564,11 +554,10 @@ async fn test_multipart_encryption_type(
.build(),
);
// step5: complete multipart upload request
debug!("🔗 step5: complete multipart upload part {} with etag {}", part_number, etag);
debug!("{:?} 分片 {} 上传完成", encryption_type, part_number);
}
// step5: complete multipart upload
// 完成分片上传
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
@@ -582,12 +571,10 @@ async fn test_multipart_encryption_type(
.send()
.await?;
// step5: download and verify multipart upload
info!("🔗 step5: download and verify multipart upload with encryption {:?}", encryption_type);
// 下载并验证
let mut get_request = s3_client.get_object().bucket(bucket).key(object_key);
// step5: include SSE-C key and MD5 in each GET request (if needed)
// SSE-C需要在GET请求中包含密钥
if matches!(encryption_type, EncryptionType::SSEC) {
get_request = get_request
.sse_customer_algorithm("AES256")
@@ -597,7 +584,7 @@ async fn test_multipart_encryption_type(
let get_response = get_request.send().await?;
// step5: verify encryption headers
// 验证加密头
match encryption_type {
EncryptionType::SSEKMS => {
assert_eq!(
@@ -610,15 +597,11 @@ async fn test_multipart_encryption_type(
}
}
// step5: verify data integrity
// 验证数据完整性
let downloaded_data = get_response.body.collect().await?.into_bytes();
assert_eq!(downloaded_data.len(), total_size);
assert_eq!(&downloaded_data[..], &test_data[..]);
// step5: verify data integrity
info!(
"✅ step5: verify data integrity for multipart upload with encryption {:?}",
encryption_type
);
info!("✅ {:?} 分片上传测试通过", encryption_type);
Ok(())
}

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod append;
mod conditional_writes;
mod lifecycle;
mod lock;

View File

@@ -167,19 +167,8 @@ async fn write_data_blocks<W>(
where
W: tokio::io::AsyncWrite + Send + Sync + Unpin,
{
let available = get_data_block_len(en_blocks, data_blocks);
if available < length {
let block_sizes: Vec<usize> = en_blocks
.iter()
.take(data_blocks)
.map(|block| block.as_ref().map(|buf| buf.len()).unwrap_or(0))
.collect();
error!(
expected = length,
available,
?block_sizes,
"write_data_blocks get_data_block_len < length"
);
if get_data_block_len(en_blocks, data_blocks) < length {
error!("write_data_blocks get_data_block_len < length");
return Err(io::Error::new(ErrorKind::UnexpectedEof, "Not enough data blocks to write"));
}

View File

@@ -33,7 +33,6 @@ pub mod file_cache;
pub mod global;
pub mod metrics_realtime;
pub mod notification_sys;
pub mod object_append;
pub mod pools;
pub mod rebalance;
pub mod rpc;

View File

@@ -1,725 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::erasure_coding::{Erasure, calc_shard_size};
use crate::error::{Error, StorageError};
use crate::store_api::ObjectInfo;
use rustfs_filemeta::TRANSITION_COMPLETE;
use rustfs_utils::HashAlgorithm;
use rustfs_utils::http::headers::{
AMZ_SERVER_SIDE_ENCRYPTION, AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, AMZ_SERVER_SIDE_ENCRYPTION_KMS_CONTEXT, AMZ_SERVER_SIDE_ENCRYPTION_KMS_ID,
RESERVED_METADATA_PREFIX_LOWER,
};
use std::collections::HashSet;
/// Ensure the target object can accept append writes under current state.
pub fn validate_append_preconditions(bucket: &str, object: &str, info: &ObjectInfo) -> Result<(), Error> {
if info.is_compressed() {
return Err(StorageError::InvalidArgument(
bucket.to_string(),
object.to_string(),
"append is not supported for compressed objects".to_string(),
));
}
let encryption_headers = [
AMZ_SERVER_SIDE_ENCRYPTION,
AMZ_SERVER_SIDE_ENCRYPTION_KMS_ID,
AMZ_SERVER_SIDE_ENCRYPTION_KMS_CONTEXT,
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
];
if encryption_headers
.iter()
.any(|header| info.user_defined.contains_key(*header) || info.user_defined.contains_key(&header.to_ascii_lowercase()))
{
return Err(StorageError::InvalidArgument(
bucket.to_string(),
object.to_string(),
"append is not supported for encrypted objects".to_string(),
));
}
if info.transitioned_object.status == TRANSITION_COMPLETE || !info.transitioned_object.tier.is_empty() {
return Err(StorageError::InvalidArgument(
bucket.to_string(),
object.to_string(),
"append is not supported for transitioned objects".to_string(),
));
}
Ok(())
}
/// Validate that the requested append position matches the current object length.
pub fn validate_append_position(bucket: &str, object: &str, info: &ObjectInfo, expected_position: i64) -> Result<(), Error> {
if expected_position != info.size {
return Err(StorageError::InvalidArgument(
bucket.to_string(),
object.to_string(),
format!("append position mismatch: provided {}, expected {}", expected_position, info.size),
));
}
Ok(())
}
pub struct InlineAppendContext<'a> {
pub existing_inline: Option<&'a [u8]>,
pub existing_plain: Option<&'a [u8]>,
pub existing_size: i64,
pub append_payload: &'a [u8],
pub erasure: &'a Erasure,
pub hash_algorithm: HashAlgorithm,
pub has_checksums: bool,
}
pub struct InlineAppendResult {
pub inline_data: Vec<u8>,
pub total_size: i64,
pub etag: String,
}
/// Decode inline payload using available checksum algorithms. Returns raw bytes when decoding fails but
/// the inline buffer already contains the plain payload.
pub async fn decode_inline_payload(
inline: &[u8],
size: usize,
erasure: &Erasure,
preferred: HashAlgorithm,
) -> Result<(Vec<u8>, HashAlgorithm), Error> {
match decode_inline_variants(inline, size, erasure, preferred).await {
Ok((data, algo)) => Ok((data, algo)),
Err(err) => {
if inline.len() >= size {
Ok((inline[..size].to_vec(), HashAlgorithm::None))
} else {
Err(err)
}
}
}
}
/// Append data to an inline object and return the re-encoded inline buffer.
pub async fn append_inline_data(ctx: InlineAppendContext<'_>) -> Result<InlineAppendResult, Error> {
let mut plain = Vec::with_capacity(ctx.existing_inline.map(|data| data.len()).unwrap_or(0) + ctx.append_payload.len());
let mut encode_algorithm = ctx.hash_algorithm.clone();
if let Some(existing_plain) = ctx.existing_plain {
if existing_plain.len() != ctx.existing_size as usize {
return Err(StorageError::other("existing plain payload length mismatch"));
}
plain.extend_from_slice(existing_plain);
} else if ctx.existing_size > 0 {
let inline = ctx
.existing_inline
.ok_or_else(|| StorageError::other("inline payload missing"))?;
let (decoded, detected_algo) =
decode_inline_payload(inline, ctx.existing_size as usize, ctx.erasure, ctx.hash_algorithm.clone()).await?;
encode_algorithm = detected_algo;
plain.extend_from_slice(&decoded);
} else if let Some(inline) = ctx.existing_inline {
plain.extend_from_slice(inline);
}
plain.extend_from_slice(ctx.append_payload);
let total_size = plain.len() as i64;
let etag = md5_hex(&plain);
if encode_algorithm == HashAlgorithm::None {
if ctx.has_checksums {
encode_algorithm = ctx.hash_algorithm.clone();
} else {
return Ok(InlineAppendResult {
inline_data: plain,
total_size,
etag,
});
}
}
let mut writer = create_bitrot_writer(
true,
None,
"",
"",
ctx.erasure.shard_file_size(total_size),
ctx.erasure.shard_size(),
encode_algorithm,
)
.await
.map_err(|e| StorageError::other(format!("failed to create inline writer: {e}")))?;
let mut remaining = plain.as_slice();
while !remaining.is_empty() {
let chunk_len = remaining.len().min(ctx.erasure.block_size);
writer
.write(&remaining[..chunk_len])
.await
.map_err(|e| StorageError::other(format!("failed to write inline data: {e}")))?;
remaining = &remaining[chunk_len..];
}
writer
.shutdown()
.await
.map_err(|e| StorageError::other(format!("failed to finalize inline writer: {e}")))?;
let inline_data = writer
.into_inline_data()
.ok_or_else(|| StorageError::other("inline writer did not return data"))?;
Ok(InlineAppendResult {
inline_data,
total_size,
etag,
})
}
fn md5_hex(data: &[u8]) -> String {
let digest = HashAlgorithm::Md5.hash_encode(data);
hex_from_bytes(digest.as_ref())
}
fn hex_from_bytes(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for byte in bytes {
use std::fmt::Write;
write!(&mut out, "{:02x}", byte).expect("write hex");
}
out
}
async fn decode_inline_variants(
inline: &[u8],
size: usize,
erasure: &Erasure,
preferred: HashAlgorithm,
) -> Result<(Vec<u8>, HashAlgorithm), Error> {
let mut tried = HashSet::new();
let candidates = [preferred, HashAlgorithm::HighwayHash256, HashAlgorithm::HighwayHash256S];
let mut last_err: Option<Error> = None;
for algo in candidates {
if !tried.insert(algo.clone()) {
continue;
}
match decode_inline_with_algo(inline, size, erasure, algo.clone()).await {
Ok(data) => return Ok((data, algo)),
Err(err) => last_err = Some(err),
}
}
Err(last_err.unwrap_or_else(|| StorageError::other("failed to decode inline data")))
}
async fn decode_inline_with_algo(inline: &[u8], size: usize, erasure: &Erasure, algo: HashAlgorithm) -> Result<Vec<u8>, Error> {
let total_len = inline
.len()
.max(erasure.shard_file_size(size as i64).max(size as i64) as usize);
let mut reader = create_bitrot_reader(Some(inline), None, "", "", 0, total_len, erasure.shard_size(), algo)
.await
.map_err(|e| StorageError::other(format!("failed to create inline reader: {e}")))?
.ok_or_else(|| StorageError::other("inline reader unavailable"))?;
let mut out = Vec::with_capacity(size);
while out.len() < size {
let remaining = size - out.len();
let plain_chunk = remaining.min(erasure.block_size);
let shard_payload = calc_shard_size(plain_chunk, erasure.data_shards).max(1);
let mut buf = vec![0u8; shard_payload];
let read = reader
.read(&mut buf)
.await
.map_err(|e| StorageError::other(format!("failed to read inline data: {e}")))?;
if read == 0 {
return Err(StorageError::other("incomplete inline data read"));
}
let copy_len = remaining.min(read);
out.extend_from_slice(&buf[..copy_len]);
}
Ok(out)
}
/// Background task to spill inline data to segmented format
pub struct InlineSpillProcessor {
pub disks: Vec<Option<crate::disk::DiskStore>>,
pub write_quorum: usize,
}
impl InlineSpillProcessor {
pub fn new(disks: Vec<Option<crate::disk::DiskStore>>, write_quorum: usize) -> Self {
Self { disks, write_quorum }
}
/// Process a single spill operation from InlinePendingSpill to SegmentedActive
pub async fn process_spill(
&self,
bucket: &str,
object: &str,
mut fi: rustfs_filemeta::FileInfo,
mut parts_metadata: Vec<rustfs_filemeta::FileInfo>,
epoch: u64,
) -> Result<(), Error> {
use rustfs_filemeta::AppendStateKind;
use tracing::{debug, error, info, warn};
// Verify we're in the correct state
let current_state = fi.get_append_state();
if current_state.state != AppendStateKind::InlinePendingSpill {
warn!(
bucket = bucket,
object = object,
current_state = ?current_state.state,
"Spill processor called on object not in InlinePendingSpill state"
);
return Ok(());
}
// Check epoch to ensure we're processing the correct version
if current_state.epoch != epoch {
debug!(
bucket = bucket,
object = object,
current_epoch = current_state.epoch,
expected_epoch = epoch,
"Spill operation skipped due to epoch mismatch"
);
return Ok(());
}
info!(
bucket = bucket,
object = object,
size = fi.size,
epoch = epoch,
"Starting inline data spill to segmented format"
);
// Extract inline data
let inline_data = fi
.data
.clone()
.ok_or_else(|| StorageError::other("Cannot spill object without inline data"))?;
// Create erasure encoder
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
// Decode inline data to plain data
let hash_algorithm = fi
.parts
.first()
.map(|part| fi.erasure.get_checksum_info(part.number).algorithm)
.unwrap_or(HashAlgorithm::HighwayHash256);
let plain_data = match decode_inline_payload(&inline_data, fi.size as usize, &erasure, hash_algorithm.clone()).await {
Ok((plain, _detected_algo)) => plain,
Err(err) => {
error!(
bucket = bucket,
object = object,
error = ?err,
"Failed to decode inline data during spill"
);
return Err(StorageError::other(format!("Failed to decode inline data for spill: {err}")));
}
};
// Generate data directory for the object
let data_dir = uuid::Uuid::new_v4();
// Create temporary directory for the spill operation
let tmp_root = format!("{}x{}", uuid::Uuid::new_v4(), time::OffsetDateTime::now_utc().unix_timestamp());
let tmp_path = format!("{tmp_root}/{}/part.1", data_dir);
// Encode and write the data to all disks
match self.write_segmented_data(&plain_data, &tmp_path, &erasure).await {
Ok(_) => {
// Move from temp to permanent location
let final_path = format!("{}/part.1", data_dir);
if let Err(err) = self.move_temp_to_final(&tmp_path, &final_path).await {
error!(
bucket = bucket,
object = object,
error = ?err,
"Failed to move spilled data to final location"
);
// Clean up temp files
let _ = self.cleanup_temp_files(&tmp_path).await;
return Err(err);
}
// Update file metadata
fi.data_dir = Some(data_dir);
fi.data = None; // Remove inline data
fi.metadata.remove(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER));
// Update append state to SegmentedActive
let mut new_state = current_state;
new_state.state = AppendStateKind::SegmentedActive;
new_state.epoch = new_state.epoch.saturating_add(1);
new_state.pending_segments.clear();
fi.set_append_state(&new_state)
.map_err(|err| StorageError::other(format!("Failed to update append state after spill: {err}")))?;
// Update all parts metadata
for meta in parts_metadata.iter_mut() {
if !meta.is_valid() {
continue;
}
meta.data_dir = Some(data_dir);
meta.data = None;
meta.metadata = fi.metadata.clone();
meta.metadata
.remove(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER));
}
// Write updated metadata back to disks
// TODO: Implement metadata write-back logic
// This would typically involve writing the updated FileInfo to all disks
info!(
bucket = bucket,
object = object,
data_dir = ?data_dir,
new_epoch = new_state.epoch,
"Successfully spilled inline data to segmented format"
);
Ok(())
}
Err(err) => {
error!(
bucket = bucket,
object = object,
error = ?err,
"Failed to write segmented data during spill"
);
// Clean up temp files
let _ = self.cleanup_temp_files(&tmp_path).await;
Err(err)
}
}
}
async fn write_segmented_data(&self, data: &[u8], tmp_path: &str, _erasure: &Erasure) -> Result<(), Error> {
use tracing::debug;
// TODO: Implement proper erasure encoding and writing to disks
// This is a placeholder implementation
debug!(
data_len = data.len(),
path = tmp_path,
"Writing segmented data (placeholder implementation)"
);
// For now, just return success - full implementation would:
// 1. Create bitrot writers for each disk
// 2. Erasure encode the data
// 3. Write each shard to its corresponding disk
Ok(())
}
async fn move_temp_to_final(&self, tmp_path: &str, final_path: &str) -> Result<(), Error> {
use tracing::debug;
// TODO: Implement moving temp files to final location
debug!(
tmp_path = tmp_path,
final_path = final_path,
"Moving temp files to final location (placeholder)"
);
Ok(())
}
async fn cleanup_temp_files(&self, tmp_path: &str) -> Result<(), Error> {
use tracing::debug;
// TODO: Implement temp file cleanup
debug!(tmp_path = tmp_path, "Cleaning up temp files (placeholder)");
Ok(())
}
}
/// Trigger background spill processing for an object
pub fn trigger_spill_process(
bucket: String,
object: String,
fi: rustfs_filemeta::FileInfo,
parts_metadata: Vec<rustfs_filemeta::FileInfo>,
epoch: u64,
disks: Vec<Option<crate::disk::DiskStore>>,
write_quorum: usize,
) {
use tracing::error;
tokio::spawn(async move {
let processor = InlineSpillProcessor::new(disks, write_quorum);
if let Err(err) = processor.process_spill(&bucket, &object, fi, parts_metadata, epoch).await {
error!(
bucket = bucket,
object = object,
epoch = epoch,
error = ?err,
"Background spill process failed"
);
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use rustfs_utils::HashAlgorithm;
fn make_object_info() -> ObjectInfo {
ObjectInfo {
bucket: "test-bucket".to_string(),
name: "obj".to_string(),
..Default::default()
}
}
#[test]
fn rejects_compressed_objects() {
let mut info = make_object_info();
info.user_defined
.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression"), "zstd".to_string());
let err = validate_append_preconditions("test-bucket", "obj", &info).unwrap_err();
matches!(err, StorageError::InvalidArgument(..))
.then_some(())
.expect("expected invalid argument");
}
#[test]
fn rejects_encrypted_objects() {
let mut info = make_object_info();
info.user_defined
.insert("x-amz-server-side-encryption".to_string(), "AES256".to_string());
let err = validate_append_preconditions("test-bucket", "obj", &info).unwrap_err();
matches!(err, StorageError::InvalidArgument(..))
.then_some(())
.expect("expected invalid argument");
}
#[test]
fn rejects_transitioned_objects() {
let mut info = make_object_info();
info.transitioned_object.tier = "GLACIER".to_string();
info.transitioned_object.status = TRANSITION_COMPLETE.to_string();
let err = validate_append_preconditions("test-bucket", "obj", &info).unwrap_err();
matches!(err, StorageError::InvalidArgument(..))
.then_some(())
.expect("expected invalid argument");
}
#[test]
fn accepts_plain_objects() {
let info = make_object_info();
validate_append_preconditions("test-bucket", "obj", &info).expect("append should be allowed");
}
#[test]
fn rejects_position_mismatch() {
let mut info = make_object_info();
info.size = 10;
let err = validate_append_position("test-bucket", "obj", &info, 5).unwrap_err();
matches!(err, StorageError::InvalidArgument(..))
.then_some(())
.expect("expected invalid argument");
}
fn make_inline_erasure() -> Erasure {
Erasure::new(1, 0, 1024)
}
async fn encode_inline(data: &[u8], erasure: &Erasure) -> Vec<u8> {
let mut writer = create_bitrot_writer(
true,
None,
"",
"",
erasure.shard_file_size(data.len() as i64),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await
.unwrap();
let mut remaining = data;
while !remaining.is_empty() {
let chunk_len = remaining.len().min(erasure.block_size);
writer.write(&remaining[..chunk_len]).await.unwrap();
remaining = &remaining[chunk_len..];
}
writer.shutdown().await.unwrap();
writer.into_inline_data().unwrap()
}
async fn decode_inline(encoded: &[u8], size: usize, erasure: &Erasure) -> Vec<u8> {
let mut reader =
create_bitrot_reader(Some(encoded), None, "", "", 0, size, erasure.shard_size(), HashAlgorithm::HighwayHash256)
.await
.unwrap()
.unwrap();
let mut out = Vec::with_capacity(size);
while out.len() < size {
let remaining = size - out.len();
let mut buf = vec![0u8; erasure.block_size.min(remaining.max(1))];
let read = reader.read(&mut buf).await.unwrap();
if read == 0 {
break;
}
out.extend_from_slice(&buf[..read.min(remaining)]);
}
out
}
#[tokio::test]
async fn append_inline_combines_payloads() {
let erasure = make_inline_erasure();
let existing_plain = b"hello";
let encoded = encode_inline(existing_plain, &erasure).await;
let ctx = InlineAppendContext {
existing_inline: Some(&encoded),
existing_plain: None,
existing_size: existing_plain.len() as i64,
append_payload: b" world",
erasure: &erasure,
hash_algorithm: HashAlgorithm::HighwayHash256,
has_checksums: true,
};
let result = append_inline_data(ctx).await.expect("inline append to succeed");
assert_eq!(result.total_size, 11);
assert_eq!(result.etag, md5_hex(b"hello world"));
let decoded = decode_inline(&result.inline_data, result.total_size as usize, &erasure).await;
assert_eq!(decoded, b"hello world");
}
#[tokio::test]
async fn decode_inline_handles_padded_shards() {
let erasure = Erasure::new(1, 0, 1024);
let plain = b"hello";
let mut padded = vec![0u8; calc_shard_size(plain.len(), erasure.data_shards)];
padded[..plain.len()].copy_from_slice(plain);
let mut writer = create_bitrot_writer(
true,
None,
"",
"",
erasure.shard_file_size(plain.len() as i64),
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await
.unwrap();
writer.write(&padded).await.unwrap();
writer.shutdown().await.unwrap();
let inline = writer.into_inline_data().unwrap();
let (decoded, algo) = decode_inline_payload(&inline, plain.len(), &erasure, HashAlgorithm::HighwayHash256)
.await
.expect("inline decode should succeed");
assert_eq!(decoded, plain);
assert_eq!(algo, HashAlgorithm::HighwayHash256);
}
#[tokio::test]
async fn append_inline_handles_empty_original() {
let erasure = make_inline_erasure();
let ctx = InlineAppendContext {
existing_inline: None,
existing_plain: None,
existing_size: 0,
append_payload: b"data",
erasure: &erasure,
hash_algorithm: HashAlgorithm::HighwayHash256,
has_checksums: true,
};
let result = append_inline_data(ctx).await.expect("inline append to succeed");
assert_eq!(result.total_size, 4);
assert_eq!(result.etag, md5_hex(b"data"));
let decoded = decode_inline(&result.inline_data, result.total_size as usize, &erasure).await;
assert_eq!(decoded, b"data");
}
#[tokio::test]
async fn append_inline_without_checksums_uses_raw_bytes() {
let erasure = Erasure::new(1, 0, 1024);
let existing = b"hello";
let ctx = InlineAppendContext {
existing_inline: Some(existing),
existing_plain: None,
existing_size: existing.len() as i64,
append_payload: b" world",
erasure: &erasure,
hash_algorithm: HashAlgorithm::HighwayHash256,
has_checksums: false,
};
let result = append_inline_data(ctx).await.expect("inline append to succeed");
assert_eq!(result.total_size, 11);
assert_eq!(result.etag, md5_hex(b"hello world"));
assert_eq!(result.inline_data, b"hello world");
}
#[tokio::test]
async fn append_inline_decodes_bitrot_without_checksums() {
let erasure = Erasure::new(1, 0, 1024);
let existing_plain = b"hello";
let encoded = encode_inline(existing_plain, &erasure).await;
let ctx = InlineAppendContext {
existing_inline: Some(&encoded),
existing_plain: None,
existing_size: existing_plain.len() as i64,
append_payload: b" world",
erasure: &erasure,
hash_algorithm: HashAlgorithm::HighwayHash256,
has_checksums: false,
};
let result = append_inline_data(ctx).await.expect("inline append to succeed");
assert_eq!(result.total_size, 11);
assert_eq!(result.etag, md5_hex(b"hello world"));
let decoded = decode_inline(&result.inline_data, result.total_size as usize, &erasure).await;
assert_eq!(decoded, b"hello world");
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -602,14 +602,6 @@ impl StorageAPI for Sets {
(del_objects, del_errs)
}
async fn complete_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).complete_append(bucket, object, opts).await
}
async fn abort_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).abort_append(bucket, object, opts).await
}
async fn list_object_parts(
&self,
bucket: &str,

View File

@@ -1709,17 +1709,6 @@ impl StorageAPI for ECStore {
// Ok((del_objects, del_errs))
}
async fn complete_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
let object = encode_dir_object(object);
let (pinfo, _) = self.internal_get_pool_info_existing_with_opts(bucket, &object, opts).await?;
self.pools[pinfo.index].complete_append(bucket, &object, opts).await
}
async fn abort_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
let object = encode_dir_object(object);
let (pinfo, _) = self.internal_get_pool_info_existing_with_opts(bucket, &object, opts).await?;
self.pools[pinfo.index].abort_append(bucket, &object, opts).await
}
#[tracing::instrument(skip(self))]
async fn list_object_parts(
&self,

View File

@@ -328,8 +328,6 @@ pub struct ObjectOptions {
pub max_parity: bool,
pub mod_time: Option<OffsetDateTime>,
pub part_number: Option<usize>,
pub append_object: bool,
pub append_position: Option<i64>,
pub delete_prefix: bool,
pub delete_prefix_object: bool,
@@ -658,15 +656,6 @@ impl ObjectInfo {
})
.collect();
let append_state = fi.get_append_state();
let pending_length: i64 = append_state.pending_segments.iter().map(|seg| seg.length).sum();
let logical_size = append_state.committed_length.saturating_add(pending_length);
let actual_size_meta = fi
.metadata
.get(&format!("{RESERVED_METADATA_PREFIX_LOWER}actual-size"))
.and_then(|o| o.parse::<i64>().ok())
.unwrap_or(logical_size);
ObjectInfo {
bucket: bucket.to_string(),
name,
@@ -676,7 +665,7 @@ impl ObjectInfo {
version_id,
delete_marker: fi.deleted,
mod_time: fi.mod_time,
size: logical_size,
size: fi.size,
parts,
is_latest: fi.is_latest,
user_tags,
@@ -688,7 +677,6 @@ impl ObjectInfo {
inlined,
user_defined: metadata,
transitioned_object,
actual_size: actual_size_meta,
..Default::default()
}
}
@@ -1200,10 +1188,6 @@ pub trait StorageAPI: ObjectIO + Debug {
opts: ObjectOptions,
) -> (Vec<DeletedObject>, Vec<Option<Error>>);
async fn complete_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn abort_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
// TransitionObject TODO:
// RestoreTransitionedObject TODO:

View File

@@ -30,7 +30,6 @@ crc32fast = { workspace = true }
rmp.workspace = true
rmp-serde.workspace = true
serde.workspace = true
serde_json.workspace = true
time.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
tokio = { workspace = true, features = ["io-util", "macros", "sync"] }

View File

@@ -1,541 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
const APPEND_STATE_META_KEY: &str = "x-rustfs-internal-append-state";
/// Tracks the state of append-enabled objects.
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct AppendState {
pub state: AppendStateKind,
pub epoch: u64,
pub committed_length: i64,
pub pending_segments: Vec<AppendSegment>,
}
/// Represents individual append segments that still need consolidation.
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct AppendSegment {
pub offset: i64,
pub length: i64,
pub data_dir: Option<Uuid>,
pub etag: Option<String>,
pub epoch: u64,
}
/// Possible append lifecycle states for an object version.
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub enum AppendStateKind {
#[default]
Disabled,
Inline,
InlinePendingSpill,
SegmentedActive,
SegmentedSealed,
}
/// Persist the provided append state into object metadata.
pub fn set_append_state(metadata: &mut HashMap<String, String>, state: &AppendState) -> Result<()> {
let encoded = serde_json::to_string(state).map_err(Error::other)?;
metadata.insert(APPEND_STATE_META_KEY.to_string(), encoded);
Ok(())
}
/// Remove the append state marker from metadata.
pub fn clear_append_state(metadata: &mut HashMap<String, String>) {
metadata.remove(APPEND_STATE_META_KEY);
}
/// Load append state stored in metadata, if any.
pub fn get_append_state(metadata: &HashMap<String, String>) -> Result<Option<AppendState>> {
let raw = match metadata.get(APPEND_STATE_META_KEY) {
Some(val) if !val.is_empty() => val,
_ => return Ok(None),
};
let decoded = serde_json::from_str(raw).map_err(Error::other)?;
Ok(Some(decoded))
}
/// Complete append operations by consolidating pending segments and sealing the object
pub fn complete_append_operation(state: &mut AppendState) -> Result<()> {
match state.state {
AppendStateKind::SegmentedActive => {
// Move all pending segments data to main parts and seal
state.committed_length += state.pending_segments.iter().map(|s| s.length).sum::<i64>();
state.pending_segments.clear();
state.state = AppendStateKind::SegmentedSealed;
state.epoch = state.epoch.saturating_add(1);
Ok(())
}
AppendStateKind::Inline => {
// Inline objects are always immediately committed, just seal them
state.state = AppendStateKind::SegmentedSealed; // Transition to sealed
state.epoch = state.epoch.saturating_add(1);
Ok(())
}
AppendStateKind::InlinePendingSpill => {
// Wait for spill to complete, then seal
// In practice, this might need to trigger the spill completion first
state.state = AppendStateKind::SegmentedSealed;
state.pending_segments.clear();
state.epoch = state.epoch.saturating_add(1);
Ok(())
}
AppendStateKind::SegmentedSealed | AppendStateKind::Disabled => {
// Already sealed or disabled
Err(Error::other("Cannot complete append on sealed or disabled object"))
}
}
}
/// Abort append operations by discarding pending segments and returning to sealed state
pub fn abort_append_operation(state: &mut AppendState) -> Result<()> {
match state.state {
AppendStateKind::SegmentedActive => {
// Discard all pending segments and seal
state.pending_segments.clear();
state.state = AppendStateKind::SegmentedSealed;
state.epoch = state.epoch.saturating_add(1);
Ok(())
}
AppendStateKind::Inline => {
// Inline data is already committed, just seal
state.state = AppendStateKind::SegmentedSealed;
state.epoch = state.epoch.saturating_add(1);
Ok(())
}
AppendStateKind::InlinePendingSpill => {
// Cancel spill and keep inline data, then seal
state.state = AppendStateKind::SegmentedSealed;
state.pending_segments.clear();
state.epoch = state.epoch.saturating_add(1);
Ok(())
}
AppendStateKind::SegmentedSealed | AppendStateKind::Disabled => {
// Already sealed or disabled
Err(Error::other("Cannot abort append on sealed or disabled object"))
}
}
}
/// Check if an append operation can be completed
pub fn can_complete_append(state: &AppendState) -> bool {
matches!(
state.state,
AppendStateKind::Inline | AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive
)
}
/// Check if an append operation can be aborted
pub fn can_abort_append(state: &AppendState) -> bool {
matches!(
state.state,
AppendStateKind::Inline | AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive
)
}
/// Verify epoch for optimistic concurrency control
pub fn verify_append_epoch(current_state: &AppendState, expected_epoch: u64) -> Result<()> {
if current_state.epoch != expected_epoch {
Err(Error::other(format!(
"Append operation conflict: expected epoch {}, found {}",
expected_epoch, current_state.epoch
)))
} else {
Ok(())
}
}
/// Prepare next append operation by incrementing epoch
pub fn prepare_next_append(state: &mut AppendState) {
state.epoch = state.epoch.saturating_add(1);
}
/// Validate that a new append segment doesn't conflict with existing segments
pub fn validate_new_segment(state: &AppendState, new_offset: i64, new_length: i64) -> Result<()> {
let new_end = new_offset + new_length;
// Check it doesn't overlap with committed data
if new_offset < state.committed_length {
return Err(Error::other(format!(
"New segment overlaps with committed data: offset {} < committed_length {}",
new_offset, state.committed_length
)));
}
// Check it doesn't overlap with existing pending segments
for existing in &state.pending_segments {
let existing_start = existing.offset;
let existing_end = existing.offset + existing.length;
// Check for any overlap
if new_offset < existing_end && new_end > existing_start {
return Err(Error::other(format!(
"New segment [{}, {}) overlaps with existing segment [{}, {})",
new_offset, new_end, existing_start, existing_end
)));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fileinfo::FileInfo;
#[test]
fn append_state_roundtrip_in_metadata() {
let mut metadata = HashMap::new();
let state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 42,
committed_length: 2048,
pending_segments: vec![AppendSegment {
offset: 2048,
length: 512,
data_dir: Some(Uuid::new_v4()),
etag: Some("abc123".to_string()),
epoch: 0,
}],
};
set_append_state(&mut metadata, &state).expect("persist append state");
assert!(metadata.contains_key(APPEND_STATE_META_KEY));
let decoded = get_append_state(&metadata)
.expect("decode append state")
.expect("state present");
assert_eq!(decoded, state);
clear_append_state(&mut metadata);
assert!(!metadata.contains_key(APPEND_STATE_META_KEY));
assert!(get_append_state(&metadata).unwrap().is_none());
}
#[test]
fn fileinfo_append_state_migration_compatibility() {
// Test old inline data object
let mut inline_fi = FileInfo {
size: 1024,
..Default::default()
};
inline_fi.set_inline_data();
let state = inline_fi.get_append_state();
assert_eq!(state.state, AppendStateKind::Inline);
assert_eq!(state.committed_length, 1024);
assert!(state.pending_segments.is_empty());
assert!(inline_fi.is_appendable());
assert!(!inline_fi.has_pending_appends());
// Test old regular object
let regular_fi = FileInfo {
size: 2048,
..Default::default()
};
// No inline_data marker
let state = regular_fi.get_append_state();
assert_eq!(state.state, AppendStateKind::SegmentedSealed);
assert_eq!(state.committed_length, 2048);
assert!(state.pending_segments.is_empty());
assert!(!regular_fi.is_appendable());
assert!(!regular_fi.has_pending_appends());
// Test explicit append state
let mut append_fi = FileInfo::default();
let explicit_state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 5,
committed_length: 1500,
pending_segments: vec![AppendSegment {
offset: 1500,
length: 300,
data_dir: Some(Uuid::new_v4()),
etag: Some("def456".to_string()),
epoch: 0,
}],
};
append_fi.set_append_state(&explicit_state).expect("set explicit state");
let retrieved_state = append_fi.get_append_state();
assert_eq!(retrieved_state, explicit_state);
assert!(append_fi.is_appendable());
assert!(append_fi.has_pending_appends());
}
#[test]
fn append_state_transitions() {
// Test state transition validation
assert_eq!(AppendStateKind::default(), AppendStateKind::Disabled);
let inline_state = AppendState {
state: AppendStateKind::Inline,
..Default::default()
};
let spill_state = AppendState {
state: AppendStateKind::InlinePendingSpill,
..Default::default()
};
let active_state = AppendState {
state: AppendStateKind::SegmentedActive,
..Default::default()
};
let sealed_state = AppendState {
state: AppendStateKind::SegmentedSealed,
..Default::default()
};
// Verify serialization works for all states
for state in [inline_state, spill_state, active_state, sealed_state] {
let mut metadata = HashMap::new();
set_append_state(&mut metadata, &state).expect("serialize state");
let decoded = get_append_state(&metadata).unwrap().unwrap();
assert_eq!(decoded, state);
}
}
#[test]
fn complete_append_transitions() {
// Test completing SegmentedActive with pending segments
let mut active_state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 5,
committed_length: 1000,
pending_segments: vec![
AppendSegment {
offset: 1000,
length: 200,
data_dir: Some(Uuid::new_v4()),
etag: Some("abc123".to_string()),
epoch: 0,
},
AppendSegment {
offset: 1200,
length: 300,
data_dir: Some(Uuid::new_v4()),
etag: Some("def456".to_string()),
epoch: 0,
},
],
};
assert!(can_complete_append(&active_state));
complete_append_operation(&mut active_state).expect("complete should succeed");
assert_eq!(active_state.state, AppendStateKind::SegmentedSealed);
assert_eq!(active_state.committed_length, 1500); // 1000 + 200 + 300
assert!(active_state.pending_segments.is_empty());
assert_eq!(active_state.epoch, 6);
// Test completing Inline state
let mut inline_state = AppendState {
state: AppendStateKind::Inline,
epoch: 2,
committed_length: 500,
..Default::default()
};
assert!(can_complete_append(&inline_state));
complete_append_operation(&mut inline_state).expect("complete should succeed");
assert_eq!(inline_state.state, AppendStateKind::SegmentedSealed);
assert_eq!(inline_state.committed_length, 500); // Unchanged
assert_eq!(inline_state.epoch, 3);
// Test completing already sealed state should fail
let mut sealed_state = AppendState {
state: AppendStateKind::SegmentedSealed,
..Default::default()
};
assert!(!can_complete_append(&sealed_state));
assert!(complete_append_operation(&mut sealed_state).is_err());
}
#[test]
fn abort_append_transitions() {
// Test aborting SegmentedActive with pending segments
let mut active_state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 3,
committed_length: 800,
pending_segments: vec![AppendSegment {
offset: 800,
length: 400,
data_dir: Some(Uuid::new_v4()),
etag: Some("xyz789".to_string()),
epoch: 0,
}],
};
assert!(can_abort_append(&active_state));
abort_append_operation(&mut active_state).expect("abort should succeed");
assert_eq!(active_state.state, AppendStateKind::SegmentedSealed);
assert_eq!(active_state.committed_length, 800); // Unchanged, pending discarded
assert!(active_state.pending_segments.is_empty());
assert_eq!(active_state.epoch, 4);
// Test aborting InlinePendingSpill
let mut spill_state = AppendState {
state: AppendStateKind::InlinePendingSpill,
epoch: 1,
committed_length: 100,
pending_segments: vec![],
};
assert!(can_abort_append(&spill_state));
abort_append_operation(&mut spill_state).expect("abort should succeed");
assert_eq!(spill_state.state, AppendStateKind::SegmentedSealed);
assert_eq!(spill_state.committed_length, 100);
assert_eq!(spill_state.epoch, 2);
// Test aborting disabled state should fail
let mut disabled_state = AppendState {
state: AppendStateKind::Disabled,
..Default::default()
};
assert!(!can_abort_append(&disabled_state));
assert!(abort_append_operation(&mut disabled_state).is_err());
}
#[test]
fn epoch_validation() {
let state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 10,
committed_length: 1000,
pending_segments: vec![],
};
// Valid epoch should succeed
assert!(verify_append_epoch(&state, 10).is_ok());
// Invalid epoch should fail
assert!(verify_append_epoch(&state, 9).is_err());
assert!(verify_append_epoch(&state, 11).is_err());
// Error message should contain epoch information
let error = verify_append_epoch(&state, 5).unwrap_err();
let error_msg = error.to_string();
assert!(error_msg.contains("expected epoch 5"));
assert!(error_msg.contains("found 10"));
}
#[test]
fn next_append_preparation() {
let mut state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 5,
committed_length: 1000,
pending_segments: vec![],
};
prepare_next_append(&mut state);
assert_eq!(state.epoch, 6);
// Test saturation behavior
let mut max_state = AppendState {
epoch: u64::MAX,
..Default::default()
};
prepare_next_append(&mut max_state);
assert_eq!(max_state.epoch, u64::MAX); // Should saturate, not overflow
}
#[test]
fn segment_validation() {
let state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 3,
committed_length: 1000,
pending_segments: vec![
AppendSegment {
offset: 1000,
length: 200,
data_dir: Some(Uuid::new_v4()),
etag: Some("abc123".to_string()),
epoch: 0,
},
AppendSegment {
offset: 1300,
length: 300,
data_dir: Some(Uuid::new_v4()),
etag: Some("def456".to_string()),
epoch: 0,
},
],
};
// Valid segment after existing segments
assert!(validate_new_segment(&state, 1600, 100).is_ok());
// Valid segment filling gap between committed and first pending
assert!(validate_new_segment(&state, 1200, 100).is_ok());
// Invalid segment overlapping with committed data
assert!(validate_new_segment(&state, 900, 200).is_err());
let error = validate_new_segment(&state, 900, 200).unwrap_err();
assert!(error.to_string().contains("overlaps with committed data"));
// Invalid segment overlapping with first pending segment
assert!(validate_new_segment(&state, 1100, 100).is_err());
let error = validate_new_segment(&state, 1100, 100).unwrap_err();
assert!(error.to_string().contains("overlaps with existing segment"));
// Invalid segment overlapping with second pending segment
assert!(validate_new_segment(&state, 1400, 100).is_err());
// Edge case: segment exactly touching committed data (should be valid)
assert!(validate_new_segment(&state, 1000, 0).is_ok());
// Edge case: segment exactly touching existing segment (should be valid)
assert!(validate_new_segment(&state, 1200, 0).is_ok());
}
#[test]
fn segment_validation_edge_cases() {
let empty_state = AppendState {
state: AppendStateKind::SegmentedActive,
epoch: 1,
committed_length: 500,
pending_segments: vec![],
};
// First segment after committed data
assert!(validate_new_segment(&empty_state, 500, 100).is_ok());
assert!(validate_new_segment(&empty_state, 600, 200).is_ok());
// Zero-length segments (edge case)
assert!(validate_new_segment(&empty_state, 500, 0).is_ok());
// Segment exactly at committed boundary
assert!(validate_new_segment(&empty_state, 499, 1).is_err());
assert!(validate_new_segment(&empty_state, 500, 1).is_ok());
}
}

View File

@@ -494,96 +494,6 @@ impl FileInfo {
ReplicationStatusType::Empty
}
}
/// Get the append state for this FileInfo, with migration compatibility
pub fn get_append_state(&self) -> crate::append::AppendState {
use crate::append::{AppendState, AppendStateKind, get_append_state};
// Try to load from metadata first
if let Ok(Some(state)) = get_append_state(&self.metadata) {
return state;
}
// Migration compatibility: determine state based on existing data
if self.inline_data() {
// Has inline data, treat as Inline state
AppendState {
state: AppendStateKind::Inline,
epoch: 0,
committed_length: self.size,
pending_segments: Vec::new(),
}
} else {
// No inline data, treat as SegmentedSealed (traditional object)
AppendState {
state: AppendStateKind::SegmentedSealed,
epoch: 0,
committed_length: self.size,
pending_segments: Vec::new(),
}
}
}
/// Set the append state for this FileInfo
pub fn set_append_state(&mut self, state: &crate::append::AppendState) -> crate::error::Result<()> {
crate::append::set_append_state(&mut self.metadata, state)
}
/// Check if this object supports append operations
pub fn is_appendable(&self) -> bool {
use crate::append::AppendStateKind;
match self.get_append_state().state {
AppendStateKind::Disabled => false,
AppendStateKind::Inline | AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive => true,
AppendStateKind::SegmentedSealed => false,
}
}
/// Check if this object has pending append operations
pub fn has_pending_appends(&self) -> bool {
use crate::append::AppendStateKind;
matches!(
self.get_append_state().state,
AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive
)
}
/// Complete all pending append operations and seal the object
pub fn complete_append(&mut self) -> crate::error::Result<()> {
let mut append_state = self.get_append_state();
crate::append::complete_append_operation(&mut append_state)?;
self.set_append_state(&append_state)?;
// Update file size to reflect completed operation
if append_state.state == crate::append::AppendStateKind::SegmentedSealed {
self.size = append_state.committed_length;
}
Ok(())
}
/// Abort all pending append operations and seal the object
pub fn abort_append(&mut self) -> crate::error::Result<()> {
let mut append_state = self.get_append_state();
crate::append::abort_append_operation(&mut append_state)?;
self.set_append_state(&append_state)?;
// Update file size to only include committed data
if append_state.state == crate::append::AppendStateKind::SegmentedSealed {
self.size = append_state.committed_length;
}
Ok(())
}
/// Check if append operations can be completed for this object
pub fn can_complete_append(&self) -> bool {
crate::append::can_complete_append(&self.get_append_state())
}
/// Check if append operations can be aborted for this object
pub fn can_abort_append(&self) -> bool {
crate::append::can_abort_append(&self.get_append_state())
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod append;
mod error;
pub mod fileinfo;
mod filemeta;
@@ -23,7 +22,6 @@ mod replication;
pub mod test_data;
pub use append::*;
pub use error::*;
pub use fileinfo::*;
pub use filemeta::*;

View File

@@ -420,7 +420,6 @@ mod tests {
}
#[tokio::test]
#[ignore]
async fn test_invalid_domain_resolution() {
let resolver = LayeredDnsResolver::new().await.unwrap();

View File

@@ -1,147 +0,0 @@
# Append Write Design
This document captures the current design of the append-write feature in RustFS so that new contributors can quickly understand the moving parts, data flows, and testing expectations.
## Goals & Non-Goals
### Goals
- Allow clients to append payloads to existing objects without re-uploading the full body.
- Support inline objects and spill seamlessly into segmented layout once thresholds are exceeded.
- Preserve strong read-after-write semantics via optimistic concurrency controls (ETag / epoch).
- Expose minimal S3-compatible surface area (`x-amz-object-append`, `x-amz-append-position`, `x-amz-append-action`).
### Non-Goals
- Full multipart-upload parity; append is intentionally simpler and serialized per object.
- Cross-object transactions; each object is isolated.
- Rebalancing or background compaction (future work).
## State Machine
Append state is persisted inside `FileInfo.metadata` under `x-rustfs-internal-append-state` and serialized as `AppendState` (`crates/filemeta/src/append.rs`).
```
Disabled --(initial PUT w/o append)--> SegmentedSealed
Inline --(inline append)--> Inline / InlinePendingSpill
InlinePendingSpill --(spill success)--> SegmentedActive
SegmentedActive --(Complete)--> SegmentedSealed
SegmentedActive --(Abort)--> SegmentedSealed
SegmentedSealed --(new append)--> SegmentedActive
```
Definitions:
- **Inline**: Object data fully stored in metadata (`FileInfo.data`).
- **InlinePendingSpill**: Inline data after append exceeded inline threshold; awaiting spill to disk.
- **SegmentedActive**: Object data lives in erasure-coded part(s) plus one or more pending append segments on disk (`append/<epoch>/<uuid>`).
- **SegmentedSealed**: No pending segments; logical content equals committed parts.
`AppendState` fields:
- `state`: current state enum (see above).
- `epoch`: monotonically increasing counter for concurrency control.
- `committed_length`: logical size already durable in the base parts/inline region.
- `pending_segments`: ordered list of `AppendSegment { offset, length, data_dir, etag, epoch }`.
## Metadata & Storage Layout
### Inline Objects
- Inline payload stored in `FileInfo.data`.
- Hash metadata maintained through `append_inline_data` (re-encoding with bitrot writer when checksums exist).
- When spilling is required, inline data is decoded, appended, and re-encoded into erasure shards written to per-disk `append/<epoch>/<segment_id>/part.1` temporary path before rename to primary data directory.
### Segmented Objects
- Base object content is represented by standard erasure-coded parts (`FileInfo.parts`, `FileInfo.data_dir`).
- Pending append segments live under `<object>/append/<epoch>/<segment_uuid>/part.1` (per disk).
- Each append stores segment metadata (`etag`, `offset`, `length`) inside `AppendState.pending_segments` and updates `FileInfo.size` to include pending bytes.
- Aggregate ETag is recomputed using multipart MD5 helper (`get_complete_multipart_md5`).
### Metadata Writes
- `SetDisks::write_unique_file_info` persists `FileInfo` updates to the quorum of disks.
- During spill/append/complete/abort, all mirrored `FileInfo` copies within `parts_metadata` are updated to keep nodes consistent.
- Abort ensures inline markers are cleared (`x-rustfs-internal-inline-data`) and `FileInfo.data = None` to avoid stale inline reads.
## Request Flows
### Append (Inline Path)
1. Handler (`rustfs/src/storage/ecfs.rs`) validates headers and fills `ObjectOptions.append_*`.
2. `SetDisks::append_inline_object` verifies append position using `AppendState` snapshot.
3. Existing inline payload decoded (if checksums present) and appended in-memory (`append_inline_data`).
4. Storage class decision determines whether to remain inline or spill.
5. Inline success updates `FileInfo.data`, metadata, `AppendState` (state `Inline`, lengths updated).
6. Spill path delegates to `spill_inline_into_segmented` (see segmented path below).
### Append (Segmented Path)
1. `SetDisks::append_segmented_object` validates state (must be `SegmentedActive` or `SegmentedSealed`).
2. Snapshot expected offset = committed length + sum of pending segments.
3. Payload encoded using erasure coding; shards written to temp volume; renamed into `append/<epoch>/<segment_uuid>` under object data directory.
4. New `AppendSegment` pushed, `AppendState.epoch` incremented, aggregated ETag recalculated.
5. `FileInfo.size` reflects committed + pending bytes; metadata persisted across quorum.
### GET / Range Reads
1. `SetDisks::get_object_with_fileinfo` inspects `AppendState`.
2. Reads committed data from inline or erasure parts (ignoring inline buffers once segmented).
3. If requested range includes pending segments, loader fetches each segment via `load_pending_segment`, decodes shards, and streams appended bytes.
### Complete Append (`x-amz-append-action: complete`)
1. `complete_append_object` fetches current `FileInfo`, ensures pending segments exist.
2. Entire logical object (committed + pending) streamed through `VecAsyncWriter` (TODO: potential optimization) to produce contiguous payload.
3. Inline spill routine (`spill_inline_into_segmented`) consolidates data into primary part, sets state `SegmentedSealed`, clears pending list, updates `committed_length`.
4. Pending segment directories removed and quorum metadata persisted.
### Abort Append (`x-amz-append-action: abort`)
1. `abort_append_object` removes pending segment directories.
2. Ensures `committed_length` matches actual durable data (inline length or sum of parts); logs and corrects if mismatch is found.
3. Clears pending list, sets state `SegmentedSealed`, bumps epoch, removes inline markers/data.
4. Persists metadata and returns base ETag (multipart MD5 of committed parts).
## Error Handling & Recovery
- All disk writes go through quorum helpers (`reduce_write_quorum_errs`, `reduce_read_quorum_errs`) and propagate `StorageError` variants for HTTP mapping.
- Append operations are single-threaded per object via locking in higher layers (`fast_lock_manager` in `SetDisks::put_object`).
- On spill/append rename failure, temp directories are cleaned up; operation aborts without mutating metadata.
- Abort path now realigns `committed_length` if metadata drifted (observed during development) and strips inline remnants to prevent stale reads.
- Pending segments are only removed once metadata update succeeds; no partial deletion is performed ahead of state persistence.
## Concurrency
- Append requests rely on exact `x-amz-append-position` to ensure the client has an up-to-date view.
- Optional header `If-Match` is honored in S3 handler before actual append (shared with regular PUT path).
- `AppendState.epoch` increments after each append/complete/abort; future work may expose it for stronger optimistic control.
- e2e test `append_segments_concurrency_then_complete` verifies that simultaneous appends result in exactly one success; the loser receives 400.
## Key Modules
- `crates/ecstore/src/set_disk.rs`: core implementation (inline append, spill, segmented append, complete, abort, GET integration).
- `crates/ecstore/src/erasure_coding/{encode,decode}.rs`: encode/decode helpers used by append pipeline.
- `crates/filemeta/src/append.rs`: metadata schema + helper functions.
- `rustfs/src/storage/ecfs.rs`: HTTP/S3 layer that parses headers and routes to append operations.
## Testing Strategy
### Unit Tests
- `crates/filemeta/src/append.rs` covers serialization and state transitions.
- `crates/ecstore/src/set_disk.rs` contains lower-level utilities and regression tests for metadata helpers.
- Additional unit coverage is recommended for spill/append failure paths (e.g., injected rename failures).
### End-to-End Tests (`cargo test --package e2e_test append`)
- Inline append success, wrong position, precondition failures.
- Segmented append success, wrong position, wrong ETag.
- Spill threshold transition (`append_threshold_crossing_inline_to_segmented`).
- Pending segment streaming (`append_range_requests_across_segments`).
- Complete append consolidates pending segments.
- Abort append discards pending data and allows new append.
- Concurrency: two clients racing to append, followed by additional append + complete.
### Tooling Considerations
- `make clippy` must pass; the append code relies on async operations and custom logging.
- `make test` / `cargo nextest run` recommended before submitting PRs.
- Use `RUST_LOG=rustfs_ecstore=debug` when debugging append flows; targeted `info!`/`warn!` logs are emitted during spill/abort.
## Future Work
- Streamed consolidation in `complete_append_object` to avoid buffering entire logical object.
- Throttling or automatic `Complete` when pending segments exceed size/quantity thresholds.
- Stronger epoch exposure to clients (header-based conflict detection).
- Automated cleanup or garbage collection for orphaned `append/*` directories.
---
For questions or design discussions, drop a note in the append-write channel or ping the storage team.

View File

@@ -2288,92 +2288,10 @@ impl S3 for FS {
let mt = metadata.clone();
let mt2 = metadata.clone();
let append_flag = req.headers.get("x-amz-object-append");
let append_action_header = req.headers.get("x-amz-append-action");
let mut append_requested = false;
let mut append_position: Option<i64> = None;
if let Some(flag_value) = append_flag {
let flag_str = flag_value.to_str().map_err(|_| {
S3Error::with_message(S3ErrorCode::InvalidArgument, "invalid x-amz-object-append header".to_string())
})?;
if flag_str.eq_ignore_ascii_case("true") {
append_requested = true;
let position_value = req.headers.get("x-amz-append-position").ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InvalidArgument,
"x-amz-append-position header required when x-amz-object-append is true".to_string(),
)
})?;
let position_str = position_value.to_str().map_err(|_| {
S3Error::with_message(S3ErrorCode::InvalidArgument, "invalid x-amz-append-position header".to_string())
})?;
let position = position_str.parse::<i64>().map_err(|_| {
S3Error::with_message(
S3ErrorCode::InvalidArgument,
"x-amz-append-position must be a non-negative integer".to_string(),
)
})?;
if position < 0 {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"x-amz-append-position must be a non-negative integer".to_string(),
));
}
append_position = Some(position);
} else if !flag_str.eq_ignore_ascii_case("false") {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"x-amz-object-append must be 'true' or 'false'".to_string(),
));
}
}
let mut append_action: Option<String> = None;
if let Some(action_value) = append_action_header {
let action_str = action_value.to_str().map_err(|_| {
S3Error::with_message(S3ErrorCode::InvalidArgument, "invalid x-amz-append-action header".to_string())
})?;
append_action = Some(action_str.to_ascii_lowercase());
}
let mut opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, mt)
.await
.map_err(ApiError::from)?;
if append_requested {
opts.append_object = true;
opts.append_position = append_position;
}
if let Some(action) = append_action {
if append_requested {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"x-amz-object-append cannot be combined with x-amz-append-action".to_string(),
));
}
let obj_info = match action.as_str() {
"complete" => store.complete_append(&bucket, &key, &opts).await,
"abort" => store.abort_append(&bucket, &key, &opts).await,
_ => {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"x-amz-append-action must be 'complete' or 'abort'".to_string(),
));
}
}
.map_err(ApiError::from)?;
let output = PutObjectOutput {
e_tag: obj_info.etag.clone(),
version_id: obj_info.version_id.map(|v| v.to_string()),
..Default::default()
};
return Ok(S3Response::new(output));
}
let repoptions =
get_must_replicate_options(&mt2, "".to_string(), ReplicationStatusType::Empty, ReplicationType::Object, opts.clone());