mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
1 Commits
chore/appe
...
1.0.0-alph
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
639bf0c233 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2923,7 +2923,6 @@ dependencies = [
|
||||
"chrono",
|
||||
"flatbuffers",
|
||||
"futures",
|
||||
"http 1.3.1",
|
||||
"md5",
|
||||
"rand 0.9.2",
|
||||
"reqwest",
|
||||
@@ -6645,7 +6644,6 @@ dependencies = [
|
||||
"rustfs-utils",
|
||||
"s3s",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.17",
|
||||
"time",
|
||||
"tokio",
|
||||
|
||||
@@ -49,5 +49,4 @@ uuid = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
http.workspace = true
|
||||
md5 = { workspace = true }
|
||||
|
||||
@@ -13,16 +13,25 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! 分片上传加密功能的分步测试用例
|
||||
//!
|
||||
//! 这个测试套件将验证分片上传加密功能的每一个步骤:
|
||||
//! 1. 测试基础的单分片加密(验证加密基础逻辑)
|
||||
//! 2. 测试多分片上传(验证分片拼接逻辑)
|
||||
//! 3. 测试加密元数据的保存和读取
|
||||
//! 4. 测试完整的分片上传加密流程
|
||||
|
||||
use super::common::LocalKMSTestEnvironment;
|
||||
use crate::common::{TEST_BUCKET, init_logging};
|
||||
use serial_test::serial;
|
||||
use tracing::{debug, info};
|
||||
|
||||
/// 步骤1:测试基础单文件加密功能(确保SSE-S3在非分片场景下正常工作)
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
info!("🧪 step1: test basic single file encryption");
|
||||
info!("🧪 步骤1:测试基础单文件加密功能");
|
||||
|
||||
let mut kms_env = LocalKMSTestEnvironment::new().await?;
|
||||
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
|
||||
@@ -31,11 +40,11 @@ async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::er
|
||||
let s3_client = kms_env.base_env.create_s3_client();
|
||||
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
|
||||
|
||||
// test small file encryption (should inline store)
|
||||
// 测试小文件加密(应该会内联存储)
|
||||
let test_data = b"Hello, this is a small test file for SSE-S3!";
|
||||
let object_key = "test-single-file-encrypted";
|
||||
|
||||
info!("📤 step1: upload small file ({}) with SSE-S3 encryption", test_data.len());
|
||||
info!("📤 上传小文件({}字节),启用SSE-S3加密", test_data.len());
|
||||
let put_response = s3_client
|
||||
.put_object()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -45,41 +54,41 @@ async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::er
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!("PUT response ETag: {:?}", put_response.e_tag());
|
||||
debug!("PUT response SSE: {:?}", put_response.server_side_encryption());
|
||||
debug!("PUT响应ETag: {:?}", put_response.e_tag());
|
||||
debug!("PUT响应SSE: {:?}", put_response.server_side_encryption());
|
||||
|
||||
// verify PUT response contains correct encryption header
|
||||
// 验证PUT响应包含正确的加密头
|
||||
assert_eq!(
|
||||
put_response.server_side_encryption(),
|
||||
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
|
||||
);
|
||||
|
||||
info!("📥 step1: download file and verify encryption status");
|
||||
info!("📥 下载文件并验证加密状态");
|
||||
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
|
||||
|
||||
debug!("GET response SSE: {:?}", get_response.server_side_encryption());
|
||||
debug!("GET响应SSE: {:?}", get_response.server_side_encryption());
|
||||
|
||||
// verify GET response contains correct encryption header
|
||||
// 验证GET响应包含正确的加密头
|
||||
assert_eq!(
|
||||
get_response.server_side_encryption(),
|
||||
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
|
||||
);
|
||||
|
||||
// verify data integrity
|
||||
// 验证数据完整性
|
||||
let downloaded_data = get_response.body.collect().await?.into_bytes();
|
||||
assert_eq!(&downloaded_data[..], test_data);
|
||||
|
||||
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
|
||||
info!("✅ step1: basic single file encryption works as expected");
|
||||
info!("✅ 步骤1通过:基础单文件加密功能正常");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// test basic multipart upload without encryption
|
||||
/// 步骤2:测试不加密的分片上传(确保分片上传基础功能正常)
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
info!("🧪 step2: test basic multipart upload without encryption");
|
||||
info!("🧪 步骤2:测试不加密的分片上传");
|
||||
|
||||
let mut kms_env = LocalKMSTestEnvironment::new().await?;
|
||||
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
|
||||
@@ -93,16 +102,12 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
|
||||
let total_parts = 2;
|
||||
let total_size = part_size * total_parts;
|
||||
|
||||
// generate test data (with clear pattern for easy verification)
|
||||
// 生成测试数据(有明显的模式便于验证)
|
||||
let test_data: Vec<u8> = (0..total_size).map(|i| (i % 256) as u8).collect();
|
||||
|
||||
info!(
|
||||
"🚀 step2: start multipart upload (no encryption) with {} parts, each {}MB",
|
||||
total_parts,
|
||||
part_size / (1024 * 1024)
|
||||
);
|
||||
info!("🚀 开始分片上传(无加密):{} parts,每个 {}MB", total_parts, part_size / (1024 * 1024));
|
||||
|
||||
// step1: create multipart upload
|
||||
// 步骤1:创建分片上传
|
||||
let create_multipart_output = s3_client
|
||||
.create_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -111,16 +116,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
|
||||
.await?;
|
||||
|
||||
let upload_id = create_multipart_output.upload_id().unwrap();
|
||||
info!("📋 step2: create multipart upload, ID: {}", upload_id);
|
||||
info!("📋 创建分片上传,ID: {}", upload_id);
|
||||
|
||||
// step2: upload each part
|
||||
// 步骤2:上传各个分片
|
||||
let mut completed_parts = Vec::new();
|
||||
for part_number in 1..=total_parts {
|
||||
let start = (part_number - 1) * part_size;
|
||||
let end = std::cmp::min(start + part_size, total_size);
|
||||
let part_data = &test_data[start..end];
|
||||
|
||||
info!("📤 step2: upload part {} ({} bytes)", part_number, part_data.len());
|
||||
info!("📤 上传分片 {} ({} bytes)", part_number, part_data.len());
|
||||
|
||||
let upload_part_output = s3_client
|
||||
.upload_part()
|
||||
@@ -140,15 +145,15 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
|
||||
.build(),
|
||||
);
|
||||
|
||||
debug!("step2: part {} uploaded, ETag: {}", part_number, etag);
|
||||
debug!("分片 {} 上传完成,ETag: {}", part_number, etag);
|
||||
}
|
||||
|
||||
// step3: complete multipart upload
|
||||
// 步骤3:完成分片上传
|
||||
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
|
||||
.set_parts(Some(completed_parts))
|
||||
.build();
|
||||
|
||||
info!("🔗 step2: complete multipart upload");
|
||||
info!("🔗 完成分片上传");
|
||||
let complete_output = s3_client
|
||||
.complete_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -158,16 +163,10 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!("step2: multipart upload completed, ETag: {:?}", complete_output.e_tag());
|
||||
debug!("完成分片上传,ETag: {:?}", complete_output.e_tag());
|
||||
|
||||
// step4: verify multipart upload completed successfully
|
||||
assert_eq!(
|
||||
complete_output.e_tag().unwrap().to_string(),
|
||||
format!("\"{}-{}-{}\"", object_key, upload_id, total_parts)
|
||||
);
|
||||
|
||||
// verify data integrity
|
||||
info!("📥 step2: download file and verify data integrity");
|
||||
// 步骤4:下载并验证
|
||||
info!("📥 下载文件并验证数据完整性");
|
||||
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
|
||||
|
||||
let downloaded_data = get_response.body.collect().await?.into_bytes();
|
||||
@@ -175,16 +174,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
|
||||
assert_eq!(&downloaded_data[..], &test_data[..]);
|
||||
|
||||
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
|
||||
info!("✅ step2: basic multipart upload without encryption works as expected");
|
||||
info!("✅ 步骤2通过:不加密的分片上传功能正常");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// test multipart upload with SSE-S3 encryption
|
||||
/// 步骤3:测试分片上传 + SSE-S3加密(重点测试)
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
info!("🧪 step3: test multipart upload with SSE-S3 encryption");
|
||||
info!("🧪 步骤3:测试分片上传 + SSE-S3加密");
|
||||
|
||||
let mut kms_env = LocalKMSTestEnvironment::new().await?;
|
||||
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
|
||||
@@ -198,16 +197,16 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
|
||||
let total_parts = 2;
|
||||
let total_size = part_size * total_parts;
|
||||
|
||||
// generate test data (with clear pattern for easy verification)
|
||||
// 生成测试数据
|
||||
let test_data: Vec<u8> = (0..total_size).map(|i| ((i / 1000) % 256) as u8).collect();
|
||||
|
||||
info!(
|
||||
"🔐 step3: start multipart upload with SSE-S3 encryption: {} parts, each {}MB",
|
||||
"🔐 开始分片上传(SSE-S3加密):{} parts,每个 {}MB",
|
||||
total_parts,
|
||||
part_size / (1024 * 1024)
|
||||
);
|
||||
|
||||
// step1: create multipart upload and enable SSE-S3
|
||||
// 步骤1:创建分片上传并启用SSE-S3
|
||||
let create_multipart_output = s3_client
|
||||
.create_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -217,24 +216,24 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
|
||||
.await?;
|
||||
|
||||
let upload_id = create_multipart_output.upload_id().unwrap();
|
||||
info!("📋 step3: create multipart upload with SSE-S3 encryption, ID: {}", upload_id);
|
||||
info!("📋 创建加密分片上传,ID: {}", upload_id);
|
||||
|
||||
// step2: verify CreateMultipartUpload response (SSE-S3 header should be included)
|
||||
// 验证CreateMultipartUpload响应(如果有SSE头的话)
|
||||
if let Some(sse) = create_multipart_output.server_side_encryption() {
|
||||
debug!("CreateMultipartUpload response contains SSE header: {:?}", sse);
|
||||
debug!("CreateMultipartUpload包含SSE响应: {:?}", sse);
|
||||
assert_eq!(sse, &aws_sdk_s3::types::ServerSideEncryption::Aes256);
|
||||
} else {
|
||||
debug!("CreateMultipartUpload response does not contain SSE header (some implementations may return empty string)");
|
||||
debug!("CreateMultipartUpload不包含SSE响应头(某些实现中正常)");
|
||||
}
|
||||
|
||||
// step2: upload each part
|
||||
// 步骤2:上传各个分片
|
||||
let mut completed_parts = Vec::new();
|
||||
for part_number in 1..=total_parts {
|
||||
let start = (part_number - 1) * part_size;
|
||||
let end = std::cmp::min(start + part_size, total_size);
|
||||
let part_data = &test_data[start..end];
|
||||
|
||||
info!("🔐 step3: upload encrypted part {} ({} bytes)", part_number, part_data.len());
|
||||
info!("🔐 上传加密分片 {} ({} bytes)", part_number, part_data.len());
|
||||
|
||||
let upload_part_output = s3_client
|
||||
.upload_part()
|
||||
@@ -254,15 +253,15 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
|
||||
.build(),
|
||||
);
|
||||
|
||||
debug!("step3: part {} uploaded, ETag: {}", part_number, etag);
|
||||
debug!("加密分片 {} 上传完成,ETag: {}", part_number, etag);
|
||||
}
|
||||
|
||||
// step3: complete multipart upload
|
||||
// 步骤3:完成分片上传
|
||||
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
|
||||
.set_parts(Some(completed_parts))
|
||||
.build();
|
||||
|
||||
info!("🔗 step3: complete multipart upload with SSE-S3 encryption");
|
||||
info!("🔗 完成加密分片上传");
|
||||
let complete_output = s3_client
|
||||
.complete_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -272,46 +271,43 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
"step3: complete multipart upload with SSE-S3 encryption, ETag: {:?}",
|
||||
complete_output.e_tag()
|
||||
);
|
||||
debug!("完成加密分片上传,ETag: {:?}", complete_output.e_tag());
|
||||
|
||||
// step4: HEAD request to check metadata
|
||||
info!("📋 step4: check object metadata");
|
||||
// 步骤4:HEAD请求检查元数据
|
||||
info!("📋 检查对象元数据");
|
||||
let head_response = s3_client.head_object().bucket(TEST_BUCKET).key(object_key).send().await?;
|
||||
|
||||
debug!("HEAD response SSE: {:?}", head_response.server_side_encryption());
|
||||
debug!("HEAD response metadata: {:?}", head_response.metadata());
|
||||
debug!("HEAD响应 SSE: {:?}", head_response.server_side_encryption());
|
||||
debug!("HEAD响应 元数据: {:?}", head_response.metadata());
|
||||
|
||||
// step5: GET request to download and verify
|
||||
info!("📥 step5: download encrypted file and verify");
|
||||
// 步骤5:GET请求下载并验证
|
||||
info!("📥 下载加密文件并验证");
|
||||
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
|
||||
|
||||
debug!("GET response SSE: {:?}", get_response.server_side_encryption());
|
||||
debug!("GET响应 SSE: {:?}", get_response.server_side_encryption());
|
||||
|
||||
// step5: verify GET response contains SSE-S3 encryption header
|
||||
// 🎯 关键验证:GET响应必须包含SSE-S3加密头
|
||||
assert_eq!(
|
||||
get_response.server_side_encryption(),
|
||||
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
|
||||
);
|
||||
|
||||
// step5: verify downloaded data matches original test data
|
||||
// 验证数据完整性
|
||||
let downloaded_data = get_response.body.collect().await?.into_bytes();
|
||||
assert_eq!(downloaded_data.len(), total_size);
|
||||
assert_eq!(&downloaded_data[..], &test_data[..]);
|
||||
|
||||
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
|
||||
info!("✅ step3: multipart upload with SSE-S3 encryption function is normal");
|
||||
info!("✅ 步骤3通过:分片上传 + SSE-S3加密功能正常");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// step4: test larger multipart upload with encryption (streaming encryption)
|
||||
/// 步骤4:测试更大的分片上传(测试流式加密)
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
info!("🧪 step4: test larger multipart upload with encryption (streaming encryption)");
|
||||
info!("🧪 步骤4:测试大文件分片上传加密");
|
||||
|
||||
let mut kms_env = LocalKMSTestEnvironment::new().await?;
|
||||
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
|
||||
@@ -326,13 +322,13 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
|
||||
let total_size = part_size * total_parts;
|
||||
|
||||
info!(
|
||||
"🗂️ step4: generate large test data: {} parts, each {}MB, total {}MB",
|
||||
"🗂️ 生成大文件测试数据:{} parts,每个 {}MB,总计 {}MB",
|
||||
total_parts,
|
||||
part_size / (1024 * 1024),
|
||||
total_size / (1024 * 1024)
|
||||
);
|
||||
|
||||
// step4: generate large test data (using complex pattern for verification)
|
||||
// 生成大文件测试数据(使用复杂模式便于验证)
|
||||
let test_data: Vec<u8> = (0..total_size)
|
||||
.map(|i| {
|
||||
let part_num = i / part_size;
|
||||
@@ -341,9 +337,9 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
|
||||
})
|
||||
.collect();
|
||||
|
||||
info!("🔐 step4: start large multipart upload with encryption (SSE-S3)");
|
||||
info!("🔐 开始大文件分片上传(SSE-S3加密)");
|
||||
|
||||
// step4: create multipart upload
|
||||
// 创建分片上传
|
||||
let create_multipart_output = s3_client
|
||||
.create_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -353,9 +349,9 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
|
||||
.await?;
|
||||
|
||||
let upload_id = create_multipart_output.upload_id().unwrap();
|
||||
info!("📋 step4: create multipart upload with encryption (SSE-S3), ID: {}", upload_id);
|
||||
info!("📋 创建大文件加密分片上传,ID: {}", upload_id);
|
||||
|
||||
// step4: upload parts
|
||||
// 上传各个分片
|
||||
let mut completed_parts = Vec::new();
|
||||
for part_number in 1..=total_parts {
|
||||
let start = (part_number - 1) * part_size;
|
||||
@@ -363,7 +359,7 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
|
||||
let part_data = &test_data[start..end];
|
||||
|
||||
info!(
|
||||
"🔐 step4: upload part {} ({:.2}MB)",
|
||||
"🔐 上传大文件加密分片 {} ({:.2}MB)",
|
||||
part_number,
|
||||
part_data.len() as f64 / (1024.0 * 1024.0)
|
||||
);
|
||||
@@ -386,15 +382,15 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
|
||||
.build(),
|
||||
);
|
||||
|
||||
debug!("step4: upload part {} completed, ETag: {}", part_number, etag);
|
||||
debug!("大文件加密分片 {} 上传完成,ETag: {}", part_number, etag);
|
||||
}
|
||||
|
||||
// step4: complete multipart upload
|
||||
// 完成分片上传
|
||||
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
|
||||
.set_parts(Some(completed_parts))
|
||||
.build();
|
||||
|
||||
info!("🔗 step4: complete multipart upload with encryption (SSE-S3)");
|
||||
info!("🔗 完成大文件加密分片上传");
|
||||
let complete_output = s3_client
|
||||
.complete_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
@@ -404,46 +400,40 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
"step4: complete multipart upload with encryption (SSE-S3), ETag: {:?}",
|
||||
complete_output.e_tag()
|
||||
);
|
||||
debug!("完成大文件加密分片上传,ETag: {:?}", complete_output.e_tag());
|
||||
|
||||
// step4: download and verify
|
||||
info!("📥 step4: download and verify large multipart upload with encryption (SSE-S3)");
|
||||
// 下载并验证
|
||||
info!("📥 下载大文件并验证");
|
||||
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
|
||||
|
||||
// step4: verify encryption header
|
||||
// 验证加密头
|
||||
assert_eq!(
|
||||
get_response.server_side_encryption(),
|
||||
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
|
||||
);
|
||||
|
||||
// step4: verify data integrity
|
||||
// 验证数据完整性
|
||||
let downloaded_data = get_response.body.collect().await?.into_bytes();
|
||||
assert_eq!(downloaded_data.len(), total_size);
|
||||
|
||||
// step4: verify data matches original test data
|
||||
// 逐字节验证数据(对于大文件更严格)
|
||||
for (i, (&actual, &expected)) in downloaded_data.iter().zip(test_data.iter()).enumerate() {
|
||||
if actual != expected {
|
||||
panic!(
|
||||
"step4: large multipart upload with encryption (SSE-S3) data mismatch at byte {}: actual={}, expected={}",
|
||||
i, actual, expected
|
||||
);
|
||||
panic!("大文件数据在第{i}字节不匹配: 实际={actual}, 期待={expected}");
|
||||
}
|
||||
}
|
||||
|
||||
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
|
||||
info!("✅ step4: large multipart upload with encryption (SSE-S3) functionality normal");
|
||||
info!("✅ 步骤4通过:大文件分片上传加密功能正常");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// step5: test all encryption types multipart upload
|
||||
/// 步骤5:测试所有加密类型的分片上传
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
info!("🧪 step5: test all encryption types multipart upload");
|
||||
info!("🧪 步骤5:测试所有加密类型的分片上传");
|
||||
|
||||
let mut kms_env = LocalKMSTestEnvironment::new().await?;
|
||||
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
|
||||
@@ -456,8 +446,8 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
|
||||
let total_parts = 2;
|
||||
let total_size = part_size * total_parts;
|
||||
|
||||
// step5: test SSE-KMS multipart upload
|
||||
info!("🔐 step5: test SSE-KMS multipart upload");
|
||||
// 测试SSE-KMS
|
||||
info!("🔐 测试 SSE-KMS 分片上传");
|
||||
test_multipart_encryption_type(
|
||||
&s3_client,
|
||||
TEST_BUCKET,
|
||||
@@ -469,8 +459,8 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
|
||||
)
|
||||
.await?;
|
||||
|
||||
// step5: test SSE-C multipart upload
|
||||
info!("🔐 step5: test SSE-C multipart upload");
|
||||
// 测试SSE-C
|
||||
info!("🔐 测试 SSE-C 分片上传");
|
||||
test_multipart_encryption_type(
|
||||
&s3_client,
|
||||
TEST_BUCKET,
|
||||
@@ -483,7 +473,7 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
|
||||
.await?;
|
||||
|
||||
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
|
||||
info!("✅ step5: all encryption types multipart upload functionality normal");
|
||||
info!("✅ 步骤5通过:所有加密类型的分片上传功能正常");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -493,7 +483,7 @@ enum EncryptionType {
|
||||
SSEC,
|
||||
}
|
||||
|
||||
/// step5: test specific encryption type multipart upload
|
||||
/// 辅助函数:测试特定加密类型的分片上传
|
||||
async fn test_multipart_encryption_type(
|
||||
s3_client: &aws_sdk_s3::Client,
|
||||
bucket: &str,
|
||||
@@ -503,10 +493,10 @@ async fn test_multipart_encryption_type(
|
||||
total_parts: usize,
|
||||
encryption_type: EncryptionType,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// step5: generate test data
|
||||
// 生成测试数据
|
||||
let test_data: Vec<u8> = (0..total_size).map(|i| ((i * 7) % 256) as u8).collect();
|
||||
|
||||
// step5: prepare SSE-C key and MD5 (if needed)
|
||||
// 准备SSE-C所需的密钥(如果需要)
|
||||
let (sse_c_key, sse_c_md5) = if matches!(encryption_type, EncryptionType::SSEC) {
|
||||
let key = "01234567890123456789012345678901";
|
||||
let key_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, key);
|
||||
@@ -516,10 +506,9 @@ async fn test_multipart_encryption_type(
|
||||
(None, None)
|
||||
};
|
||||
|
||||
// step5: create multipart upload
|
||||
info!("🔗 step5: create multipart upload with encryption {:?}", encryption_type);
|
||||
info!("📋 创建分片上传 - {:?}", encryption_type);
|
||||
|
||||
// step5: create multipart upload request
|
||||
// 创建分片上传
|
||||
let mut create_request = s3_client.create_multipart_upload().bucket(bucket).key(object_key);
|
||||
|
||||
create_request = match encryption_type {
|
||||
@@ -533,6 +522,7 @@ async fn test_multipart_encryption_type(
|
||||
let create_multipart_output = create_request.send().await?;
|
||||
let upload_id = create_multipart_output.upload_id().unwrap();
|
||||
|
||||
// 上传分片
|
||||
let mut completed_parts = Vec::new();
|
||||
for part_number in 1..=total_parts {
|
||||
let start = (part_number - 1) * part_size;
|
||||
@@ -547,7 +537,7 @@ async fn test_multipart_encryption_type(
|
||||
.part_number(part_number as i32)
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from(part_data.to_vec()));
|
||||
|
||||
// step5: include SSE-C key and MD5 in each UploadPart request (if needed)
|
||||
// SSE-C需要在每个UploadPart请求中包含密钥
|
||||
if matches!(encryption_type, EncryptionType::SSEC) {
|
||||
upload_request = upload_request
|
||||
.sse_customer_algorithm("AES256")
|
||||
@@ -564,11 +554,10 @@ async fn test_multipart_encryption_type(
|
||||
.build(),
|
||||
);
|
||||
|
||||
// step5: complete multipart upload request
|
||||
debug!("🔗 step5: complete multipart upload part {} with etag {}", part_number, etag);
|
||||
debug!("{:?} 分片 {} 上传完成", encryption_type, part_number);
|
||||
}
|
||||
|
||||
// step5: complete multipart upload
|
||||
// 完成分片上传
|
||||
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
|
||||
.set_parts(Some(completed_parts))
|
||||
.build();
|
||||
@@ -582,12 +571,10 @@ async fn test_multipart_encryption_type(
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
// step5: download and verify multipart upload
|
||||
info!("🔗 step5: download and verify multipart upload with encryption {:?}", encryption_type);
|
||||
|
||||
// 下载并验证
|
||||
let mut get_request = s3_client.get_object().bucket(bucket).key(object_key);
|
||||
|
||||
// step5: include SSE-C key and MD5 in each GET request (if needed)
|
||||
// SSE-C需要在GET请求中包含密钥
|
||||
if matches!(encryption_type, EncryptionType::SSEC) {
|
||||
get_request = get_request
|
||||
.sse_customer_algorithm("AES256")
|
||||
@@ -597,7 +584,7 @@ async fn test_multipart_encryption_type(
|
||||
|
||||
let get_response = get_request.send().await?;
|
||||
|
||||
// step5: verify encryption headers
|
||||
// 验证加密头
|
||||
match encryption_type {
|
||||
EncryptionType::SSEKMS => {
|
||||
assert_eq!(
|
||||
@@ -610,15 +597,11 @@ async fn test_multipart_encryption_type(
|
||||
}
|
||||
}
|
||||
|
||||
// step5: verify data integrity
|
||||
// 验证数据完整性
|
||||
let downloaded_data = get_response.body.collect().await?.into_bytes();
|
||||
assert_eq!(downloaded_data.len(), total_size);
|
||||
assert_eq!(&downloaded_data[..], &test_data[..]);
|
||||
|
||||
// step5: verify data integrity
|
||||
info!(
|
||||
"✅ step5: verify data integrity for multipart upload with encryption {:?}",
|
||||
encryption_type
|
||||
);
|
||||
info!("✅ {:?} 分片上传测试通过", encryption_type);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod append;
|
||||
mod conditional_writes;
|
||||
mod lifecycle;
|
||||
mod lock;
|
||||
|
||||
@@ -167,19 +167,8 @@ async fn write_data_blocks<W>(
|
||||
where
|
||||
W: tokio::io::AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
let available = get_data_block_len(en_blocks, data_blocks);
|
||||
if available < length {
|
||||
let block_sizes: Vec<usize> = en_blocks
|
||||
.iter()
|
||||
.take(data_blocks)
|
||||
.map(|block| block.as_ref().map(|buf| buf.len()).unwrap_or(0))
|
||||
.collect();
|
||||
error!(
|
||||
expected = length,
|
||||
available,
|
||||
?block_sizes,
|
||||
"write_data_blocks get_data_block_len < length"
|
||||
);
|
||||
if get_data_block_len(en_blocks, data_blocks) < length {
|
||||
error!("write_data_blocks get_data_block_len < length");
|
||||
return Err(io::Error::new(ErrorKind::UnexpectedEof, "Not enough data blocks to write"));
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,6 @@ pub mod file_cache;
|
||||
pub mod global;
|
||||
pub mod metrics_realtime;
|
||||
pub mod notification_sys;
|
||||
pub mod object_append;
|
||||
pub mod pools;
|
||||
pub mod rebalance;
|
||||
pub mod rpc;
|
||||
|
||||
@@ -1,725 +0,0 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
|
||||
use crate::erasure_coding::{Erasure, calc_shard_size};
|
||||
use crate::error::{Error, StorageError};
|
||||
use crate::store_api::ObjectInfo;
|
||||
use rustfs_filemeta::TRANSITION_COMPLETE;
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use rustfs_utils::http::headers::{
|
||||
AMZ_SERVER_SIDE_ENCRYPTION, AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
|
||||
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, AMZ_SERVER_SIDE_ENCRYPTION_KMS_CONTEXT, AMZ_SERVER_SIDE_ENCRYPTION_KMS_ID,
|
||||
RESERVED_METADATA_PREFIX_LOWER,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// Ensure the target object can accept append writes under current state.
|
||||
pub fn validate_append_preconditions(bucket: &str, object: &str, info: &ObjectInfo) -> Result<(), Error> {
|
||||
if info.is_compressed() {
|
||||
return Err(StorageError::InvalidArgument(
|
||||
bucket.to_string(),
|
||||
object.to_string(),
|
||||
"append is not supported for compressed objects".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let encryption_headers = [
|
||||
AMZ_SERVER_SIDE_ENCRYPTION,
|
||||
AMZ_SERVER_SIDE_ENCRYPTION_KMS_ID,
|
||||
AMZ_SERVER_SIDE_ENCRYPTION_KMS_CONTEXT,
|
||||
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
|
||||
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
|
||||
AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
|
||||
];
|
||||
|
||||
if encryption_headers
|
||||
.iter()
|
||||
.any(|header| info.user_defined.contains_key(*header) || info.user_defined.contains_key(&header.to_ascii_lowercase()))
|
||||
{
|
||||
return Err(StorageError::InvalidArgument(
|
||||
bucket.to_string(),
|
||||
object.to_string(),
|
||||
"append is not supported for encrypted objects".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if info.transitioned_object.status == TRANSITION_COMPLETE || !info.transitioned_object.tier.is_empty() {
|
||||
return Err(StorageError::InvalidArgument(
|
||||
bucket.to_string(),
|
||||
object.to_string(),
|
||||
"append is not supported for transitioned objects".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate that the requested append position matches the current object length.
|
||||
pub fn validate_append_position(bucket: &str, object: &str, info: &ObjectInfo, expected_position: i64) -> Result<(), Error> {
|
||||
if expected_position != info.size {
|
||||
return Err(StorageError::InvalidArgument(
|
||||
bucket.to_string(),
|
||||
object.to_string(),
|
||||
format!("append position mismatch: provided {}, expected {}", expected_position, info.size),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct InlineAppendContext<'a> {
|
||||
pub existing_inline: Option<&'a [u8]>,
|
||||
pub existing_plain: Option<&'a [u8]>,
|
||||
pub existing_size: i64,
|
||||
pub append_payload: &'a [u8],
|
||||
pub erasure: &'a Erasure,
|
||||
pub hash_algorithm: HashAlgorithm,
|
||||
pub has_checksums: bool,
|
||||
}
|
||||
|
||||
pub struct InlineAppendResult {
|
||||
pub inline_data: Vec<u8>,
|
||||
pub total_size: i64,
|
||||
pub etag: String,
|
||||
}
|
||||
|
||||
/// Decode inline payload using available checksum algorithms. Returns raw bytes when decoding fails but
|
||||
/// the inline buffer already contains the plain payload.
|
||||
pub async fn decode_inline_payload(
|
||||
inline: &[u8],
|
||||
size: usize,
|
||||
erasure: &Erasure,
|
||||
preferred: HashAlgorithm,
|
||||
) -> Result<(Vec<u8>, HashAlgorithm), Error> {
|
||||
match decode_inline_variants(inline, size, erasure, preferred).await {
|
||||
Ok((data, algo)) => Ok((data, algo)),
|
||||
Err(err) => {
|
||||
if inline.len() >= size {
|
||||
Ok((inline[..size].to_vec(), HashAlgorithm::None))
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Append data to an inline object and return the re-encoded inline buffer.
|
||||
pub async fn append_inline_data(ctx: InlineAppendContext<'_>) -> Result<InlineAppendResult, Error> {
|
||||
let mut plain = Vec::with_capacity(ctx.existing_inline.map(|data| data.len()).unwrap_or(0) + ctx.append_payload.len());
|
||||
let mut encode_algorithm = ctx.hash_algorithm.clone();
|
||||
|
||||
if let Some(existing_plain) = ctx.existing_plain {
|
||||
if existing_plain.len() != ctx.existing_size as usize {
|
||||
return Err(StorageError::other("existing plain payload length mismatch"));
|
||||
}
|
||||
plain.extend_from_slice(existing_plain);
|
||||
} else if ctx.existing_size > 0 {
|
||||
let inline = ctx
|
||||
.existing_inline
|
||||
.ok_or_else(|| StorageError::other("inline payload missing"))?;
|
||||
|
||||
let (decoded, detected_algo) =
|
||||
decode_inline_payload(inline, ctx.existing_size as usize, ctx.erasure, ctx.hash_algorithm.clone()).await?;
|
||||
encode_algorithm = detected_algo;
|
||||
plain.extend_from_slice(&decoded);
|
||||
} else if let Some(inline) = ctx.existing_inline {
|
||||
plain.extend_from_slice(inline);
|
||||
}
|
||||
|
||||
plain.extend_from_slice(ctx.append_payload);
|
||||
let total_size = plain.len() as i64;
|
||||
let etag = md5_hex(&plain);
|
||||
|
||||
if encode_algorithm == HashAlgorithm::None {
|
||||
if ctx.has_checksums {
|
||||
encode_algorithm = ctx.hash_algorithm.clone();
|
||||
} else {
|
||||
return Ok(InlineAppendResult {
|
||||
inline_data: plain,
|
||||
total_size,
|
||||
etag,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut writer = create_bitrot_writer(
|
||||
true,
|
||||
None,
|
||||
"",
|
||||
"",
|
||||
ctx.erasure.shard_file_size(total_size),
|
||||
ctx.erasure.shard_size(),
|
||||
encode_algorithm,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| StorageError::other(format!("failed to create inline writer: {e}")))?;
|
||||
|
||||
let mut remaining = plain.as_slice();
|
||||
while !remaining.is_empty() {
|
||||
let chunk_len = remaining.len().min(ctx.erasure.block_size);
|
||||
writer
|
||||
.write(&remaining[..chunk_len])
|
||||
.await
|
||||
.map_err(|e| StorageError::other(format!("failed to write inline data: {e}")))?;
|
||||
remaining = &remaining[chunk_len..];
|
||||
}
|
||||
|
||||
writer
|
||||
.shutdown()
|
||||
.await
|
||||
.map_err(|e| StorageError::other(format!("failed to finalize inline writer: {e}")))?;
|
||||
|
||||
let inline_data = writer
|
||||
.into_inline_data()
|
||||
.ok_or_else(|| StorageError::other("inline writer did not return data"))?;
|
||||
|
||||
Ok(InlineAppendResult {
|
||||
inline_data,
|
||||
total_size,
|
||||
etag,
|
||||
})
|
||||
}
|
||||
|
||||
fn md5_hex(data: &[u8]) -> String {
|
||||
let digest = HashAlgorithm::Md5.hash_encode(data);
|
||||
hex_from_bytes(digest.as_ref())
|
||||
}
|
||||
|
||||
fn hex_from_bytes(bytes: &[u8]) -> String {
|
||||
let mut out = String::with_capacity(bytes.len() * 2);
|
||||
for byte in bytes {
|
||||
use std::fmt::Write;
|
||||
write!(&mut out, "{:02x}", byte).expect("write hex");
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
async fn decode_inline_variants(
|
||||
inline: &[u8],
|
||||
size: usize,
|
||||
erasure: &Erasure,
|
||||
preferred: HashAlgorithm,
|
||||
) -> Result<(Vec<u8>, HashAlgorithm), Error> {
|
||||
let mut tried = HashSet::new();
|
||||
let candidates = [preferred, HashAlgorithm::HighwayHash256, HashAlgorithm::HighwayHash256S];
|
||||
|
||||
let mut last_err: Option<Error> = None;
|
||||
|
||||
for algo in candidates {
|
||||
if !tried.insert(algo.clone()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
match decode_inline_with_algo(inline, size, erasure, algo.clone()).await {
|
||||
Ok(data) => return Ok((data, algo)),
|
||||
Err(err) => last_err = Some(err),
|
||||
}
|
||||
}
|
||||
|
||||
Err(last_err.unwrap_or_else(|| StorageError::other("failed to decode inline data")))
|
||||
}
|
||||
|
||||
async fn decode_inline_with_algo(inline: &[u8], size: usize, erasure: &Erasure, algo: HashAlgorithm) -> Result<Vec<u8>, Error> {
|
||||
let total_len = inline
|
||||
.len()
|
||||
.max(erasure.shard_file_size(size as i64).max(size as i64) as usize);
|
||||
let mut reader = create_bitrot_reader(Some(inline), None, "", "", 0, total_len, erasure.shard_size(), algo)
|
||||
.await
|
||||
.map_err(|e| StorageError::other(format!("failed to create inline reader: {e}")))?
|
||||
.ok_or_else(|| StorageError::other("inline reader unavailable"))?;
|
||||
|
||||
let mut out = Vec::with_capacity(size);
|
||||
while out.len() < size {
|
||||
let remaining = size - out.len();
|
||||
let plain_chunk = remaining.min(erasure.block_size);
|
||||
let shard_payload = calc_shard_size(plain_chunk, erasure.data_shards).max(1);
|
||||
let mut buf = vec![0u8; shard_payload];
|
||||
let read = reader
|
||||
.read(&mut buf)
|
||||
.await
|
||||
.map_err(|e| StorageError::other(format!("failed to read inline data: {e}")))?;
|
||||
if read == 0 {
|
||||
return Err(StorageError::other("incomplete inline data read"));
|
||||
}
|
||||
|
||||
let copy_len = remaining.min(read);
|
||||
out.extend_from_slice(&buf[..copy_len]);
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Background task to spill inline data to segmented format
|
||||
pub struct InlineSpillProcessor {
|
||||
pub disks: Vec<Option<crate::disk::DiskStore>>,
|
||||
pub write_quorum: usize,
|
||||
}
|
||||
|
||||
impl InlineSpillProcessor {
|
||||
pub fn new(disks: Vec<Option<crate::disk::DiskStore>>, write_quorum: usize) -> Self {
|
||||
Self { disks, write_quorum }
|
||||
}
|
||||
|
||||
/// Process a single spill operation from InlinePendingSpill to SegmentedActive
|
||||
pub async fn process_spill(
|
||||
&self,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
mut fi: rustfs_filemeta::FileInfo,
|
||||
mut parts_metadata: Vec<rustfs_filemeta::FileInfo>,
|
||||
epoch: u64,
|
||||
) -> Result<(), Error> {
|
||||
use rustfs_filemeta::AppendStateKind;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
// Verify we're in the correct state
|
||||
let current_state = fi.get_append_state();
|
||||
if current_state.state != AppendStateKind::InlinePendingSpill {
|
||||
warn!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
current_state = ?current_state.state,
|
||||
"Spill processor called on object not in InlinePendingSpill state"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Check epoch to ensure we're processing the correct version
|
||||
if current_state.epoch != epoch {
|
||||
debug!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
current_epoch = current_state.epoch,
|
||||
expected_epoch = epoch,
|
||||
"Spill operation skipped due to epoch mismatch"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
size = fi.size,
|
||||
epoch = epoch,
|
||||
"Starting inline data spill to segmented format"
|
||||
);
|
||||
|
||||
// Extract inline data
|
||||
let inline_data = fi
|
||||
.data
|
||||
.clone()
|
||||
.ok_or_else(|| StorageError::other("Cannot spill object without inline data"))?;
|
||||
|
||||
// Create erasure encoder
|
||||
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
|
||||
|
||||
// Decode inline data to plain data
|
||||
let hash_algorithm = fi
|
||||
.parts
|
||||
.first()
|
||||
.map(|part| fi.erasure.get_checksum_info(part.number).algorithm)
|
||||
.unwrap_or(HashAlgorithm::HighwayHash256);
|
||||
|
||||
let plain_data = match decode_inline_payload(&inline_data, fi.size as usize, &erasure, hash_algorithm.clone()).await {
|
||||
Ok((plain, _detected_algo)) => plain,
|
||||
Err(err) => {
|
||||
error!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
error = ?err,
|
||||
"Failed to decode inline data during spill"
|
||||
);
|
||||
return Err(StorageError::other(format!("Failed to decode inline data for spill: {err}")));
|
||||
}
|
||||
};
|
||||
|
||||
// Generate data directory for the object
|
||||
let data_dir = uuid::Uuid::new_v4();
|
||||
|
||||
// Create temporary directory for the spill operation
|
||||
let tmp_root = format!("{}x{}", uuid::Uuid::new_v4(), time::OffsetDateTime::now_utc().unix_timestamp());
|
||||
let tmp_path = format!("{tmp_root}/{}/part.1", data_dir);
|
||||
|
||||
// Encode and write the data to all disks
|
||||
match self.write_segmented_data(&plain_data, &tmp_path, &erasure).await {
|
||||
Ok(_) => {
|
||||
// Move from temp to permanent location
|
||||
let final_path = format!("{}/part.1", data_dir);
|
||||
if let Err(err) = self.move_temp_to_final(&tmp_path, &final_path).await {
|
||||
error!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
error = ?err,
|
||||
"Failed to move spilled data to final location"
|
||||
);
|
||||
// Clean up temp files
|
||||
let _ = self.cleanup_temp_files(&tmp_path).await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
// Update file metadata
|
||||
fi.data_dir = Some(data_dir);
|
||||
fi.data = None; // Remove inline data
|
||||
fi.metadata.remove(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER));
|
||||
|
||||
// Update append state to SegmentedActive
|
||||
let mut new_state = current_state;
|
||||
new_state.state = AppendStateKind::SegmentedActive;
|
||||
new_state.epoch = new_state.epoch.saturating_add(1);
|
||||
new_state.pending_segments.clear();
|
||||
|
||||
fi.set_append_state(&new_state)
|
||||
.map_err(|err| StorageError::other(format!("Failed to update append state after spill: {err}")))?;
|
||||
|
||||
// Update all parts metadata
|
||||
for meta in parts_metadata.iter_mut() {
|
||||
if !meta.is_valid() {
|
||||
continue;
|
||||
}
|
||||
meta.data_dir = Some(data_dir);
|
||||
meta.data = None;
|
||||
meta.metadata = fi.metadata.clone();
|
||||
meta.metadata
|
||||
.remove(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER));
|
||||
}
|
||||
|
||||
// Write updated metadata back to disks
|
||||
// TODO: Implement metadata write-back logic
|
||||
// This would typically involve writing the updated FileInfo to all disks
|
||||
|
||||
info!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
data_dir = ?data_dir,
|
||||
new_epoch = new_state.epoch,
|
||||
"Successfully spilled inline data to segmented format"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
error = ?err,
|
||||
"Failed to write segmented data during spill"
|
||||
);
|
||||
// Clean up temp files
|
||||
let _ = self.cleanup_temp_files(&tmp_path).await;
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_segmented_data(&self, data: &[u8], tmp_path: &str, _erasure: &Erasure) -> Result<(), Error> {
|
||||
use tracing::debug;
|
||||
|
||||
// TODO: Implement proper erasure encoding and writing to disks
|
||||
// This is a placeholder implementation
|
||||
debug!(
|
||||
data_len = data.len(),
|
||||
path = tmp_path,
|
||||
"Writing segmented data (placeholder implementation)"
|
||||
);
|
||||
|
||||
// For now, just return success - full implementation would:
|
||||
// 1. Create bitrot writers for each disk
|
||||
// 2. Erasure encode the data
|
||||
// 3. Write each shard to its corresponding disk
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn move_temp_to_final(&self, tmp_path: &str, final_path: &str) -> Result<(), Error> {
|
||||
use tracing::debug;
|
||||
|
||||
// TODO: Implement moving temp files to final location
|
||||
debug!(
|
||||
tmp_path = tmp_path,
|
||||
final_path = final_path,
|
||||
"Moving temp files to final location (placeholder)"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup_temp_files(&self, tmp_path: &str) -> Result<(), Error> {
|
||||
use tracing::debug;
|
||||
|
||||
// TODO: Implement temp file cleanup
|
||||
debug!(tmp_path = tmp_path, "Cleaning up temp files (placeholder)");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Trigger background spill processing for an object
|
||||
pub fn trigger_spill_process(
|
||||
bucket: String,
|
||||
object: String,
|
||||
fi: rustfs_filemeta::FileInfo,
|
||||
parts_metadata: Vec<rustfs_filemeta::FileInfo>,
|
||||
epoch: u64,
|
||||
disks: Vec<Option<crate::disk::DiskStore>>,
|
||||
write_quorum: usize,
|
||||
) {
|
||||
use tracing::error;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let processor = InlineSpillProcessor::new(disks, write_quorum);
|
||||
if let Err(err) = processor.process_spill(&bucket, &object, fi, parts_metadata, epoch).await {
|
||||
error!(
|
||||
bucket = bucket,
|
||||
object = object,
|
||||
epoch = epoch,
|
||||
error = ?err,
|
||||
"Background spill process failed"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
|
||||
fn make_object_info() -> ObjectInfo {
|
||||
ObjectInfo {
|
||||
bucket: "test-bucket".to_string(),
|
||||
name: "obj".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_compressed_objects() {
|
||||
let mut info = make_object_info();
|
||||
info.user_defined
|
||||
.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression"), "zstd".to_string());
|
||||
|
||||
let err = validate_append_preconditions("test-bucket", "obj", &info).unwrap_err();
|
||||
matches!(err, StorageError::InvalidArgument(..))
|
||||
.then_some(())
|
||||
.expect("expected invalid argument");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_encrypted_objects() {
|
||||
let mut info = make_object_info();
|
||||
info.user_defined
|
||||
.insert("x-amz-server-side-encryption".to_string(), "AES256".to_string());
|
||||
|
||||
let err = validate_append_preconditions("test-bucket", "obj", &info).unwrap_err();
|
||||
matches!(err, StorageError::InvalidArgument(..))
|
||||
.then_some(())
|
||||
.expect("expected invalid argument");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_transitioned_objects() {
|
||||
let mut info = make_object_info();
|
||||
info.transitioned_object.tier = "GLACIER".to_string();
|
||||
info.transitioned_object.status = TRANSITION_COMPLETE.to_string();
|
||||
|
||||
let err = validate_append_preconditions("test-bucket", "obj", &info).unwrap_err();
|
||||
matches!(err, StorageError::InvalidArgument(..))
|
||||
.then_some(())
|
||||
.expect("expected invalid argument");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accepts_plain_objects() {
|
||||
let info = make_object_info();
|
||||
validate_append_preconditions("test-bucket", "obj", &info).expect("append should be allowed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_position_mismatch() {
|
||||
let mut info = make_object_info();
|
||||
info.size = 10;
|
||||
let err = validate_append_position("test-bucket", "obj", &info, 5).unwrap_err();
|
||||
matches!(err, StorageError::InvalidArgument(..))
|
||||
.then_some(())
|
||||
.expect("expected invalid argument");
|
||||
}
|
||||
|
||||
fn make_inline_erasure() -> Erasure {
|
||||
Erasure::new(1, 0, 1024)
|
||||
}
|
||||
|
||||
async fn encode_inline(data: &[u8], erasure: &Erasure) -> Vec<u8> {
|
||||
let mut writer = create_bitrot_writer(
|
||||
true,
|
||||
None,
|
||||
"",
|
||||
"",
|
||||
erasure.shard_file_size(data.len() as i64),
|
||||
erasure.shard_size(),
|
||||
HashAlgorithm::HighwayHash256,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut remaining = data;
|
||||
while !remaining.is_empty() {
|
||||
let chunk_len = remaining.len().min(erasure.block_size);
|
||||
writer.write(&remaining[..chunk_len]).await.unwrap();
|
||||
remaining = &remaining[chunk_len..];
|
||||
}
|
||||
|
||||
writer.shutdown().await.unwrap();
|
||||
writer.into_inline_data().unwrap()
|
||||
}
|
||||
|
||||
async fn decode_inline(encoded: &[u8], size: usize, erasure: &Erasure) -> Vec<u8> {
|
||||
let mut reader =
|
||||
create_bitrot_reader(Some(encoded), None, "", "", 0, size, erasure.shard_size(), HashAlgorithm::HighwayHash256)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let mut out = Vec::with_capacity(size);
|
||||
while out.len() < size {
|
||||
let remaining = size - out.len();
|
||||
let mut buf = vec![0u8; erasure.block_size.min(remaining.max(1))];
|
||||
let read = reader.read(&mut buf).await.unwrap();
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
out.extend_from_slice(&buf[..read.min(remaining)]);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_inline_combines_payloads() {
|
||||
let erasure = make_inline_erasure();
|
||||
let existing_plain = b"hello";
|
||||
let encoded = encode_inline(existing_plain, &erasure).await;
|
||||
|
||||
let ctx = InlineAppendContext {
|
||||
existing_inline: Some(&encoded),
|
||||
existing_plain: None,
|
||||
existing_size: existing_plain.len() as i64,
|
||||
append_payload: b" world",
|
||||
erasure: &erasure,
|
||||
hash_algorithm: HashAlgorithm::HighwayHash256,
|
||||
has_checksums: true,
|
||||
};
|
||||
|
||||
let result = append_inline_data(ctx).await.expect("inline append to succeed");
|
||||
assert_eq!(result.total_size, 11);
|
||||
assert_eq!(result.etag, md5_hex(b"hello world"));
|
||||
|
||||
let decoded = decode_inline(&result.inline_data, result.total_size as usize, &erasure).await;
|
||||
assert_eq!(decoded, b"hello world");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn decode_inline_handles_padded_shards() {
|
||||
let erasure = Erasure::new(1, 0, 1024);
|
||||
let plain = b"hello";
|
||||
|
||||
let mut padded = vec![0u8; calc_shard_size(plain.len(), erasure.data_shards)];
|
||||
padded[..plain.len()].copy_from_slice(plain);
|
||||
|
||||
let mut writer = create_bitrot_writer(
|
||||
true,
|
||||
None,
|
||||
"",
|
||||
"",
|
||||
erasure.shard_file_size(plain.len() as i64),
|
||||
erasure.shard_size(),
|
||||
HashAlgorithm::HighwayHash256,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
writer.write(&padded).await.unwrap();
|
||||
writer.shutdown().await.unwrap();
|
||||
let inline = writer.into_inline_data().unwrap();
|
||||
|
||||
let (decoded, algo) = decode_inline_payload(&inline, plain.len(), &erasure, HashAlgorithm::HighwayHash256)
|
||||
.await
|
||||
.expect("inline decode should succeed");
|
||||
|
||||
assert_eq!(decoded, plain);
|
||||
assert_eq!(algo, HashAlgorithm::HighwayHash256);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_inline_handles_empty_original() {
|
||||
let erasure = make_inline_erasure();
|
||||
let ctx = InlineAppendContext {
|
||||
existing_inline: None,
|
||||
existing_plain: None,
|
||||
existing_size: 0,
|
||||
append_payload: b"data",
|
||||
erasure: &erasure,
|
||||
hash_algorithm: HashAlgorithm::HighwayHash256,
|
||||
has_checksums: true,
|
||||
};
|
||||
|
||||
let result = append_inline_data(ctx).await.expect("inline append to succeed");
|
||||
assert_eq!(result.total_size, 4);
|
||||
assert_eq!(result.etag, md5_hex(b"data"));
|
||||
|
||||
let decoded = decode_inline(&result.inline_data, result.total_size as usize, &erasure).await;
|
||||
assert_eq!(decoded, b"data");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_inline_without_checksums_uses_raw_bytes() {
|
||||
let erasure = Erasure::new(1, 0, 1024);
|
||||
let existing = b"hello";
|
||||
|
||||
let ctx = InlineAppendContext {
|
||||
existing_inline: Some(existing),
|
||||
existing_plain: None,
|
||||
existing_size: existing.len() as i64,
|
||||
append_payload: b" world",
|
||||
erasure: &erasure,
|
||||
hash_algorithm: HashAlgorithm::HighwayHash256,
|
||||
has_checksums: false,
|
||||
};
|
||||
|
||||
let result = append_inline_data(ctx).await.expect("inline append to succeed");
|
||||
assert_eq!(result.total_size, 11);
|
||||
assert_eq!(result.etag, md5_hex(b"hello world"));
|
||||
|
||||
assert_eq!(result.inline_data, b"hello world");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn append_inline_decodes_bitrot_without_checksums() {
|
||||
let erasure = Erasure::new(1, 0, 1024);
|
||||
let existing_plain = b"hello";
|
||||
let encoded = encode_inline(existing_plain, &erasure).await;
|
||||
|
||||
let ctx = InlineAppendContext {
|
||||
existing_inline: Some(&encoded),
|
||||
existing_plain: None,
|
||||
existing_size: existing_plain.len() as i64,
|
||||
append_payload: b" world",
|
||||
erasure: &erasure,
|
||||
hash_algorithm: HashAlgorithm::HighwayHash256,
|
||||
has_checksums: false,
|
||||
};
|
||||
|
||||
let result = append_inline_data(ctx).await.expect("inline append to succeed");
|
||||
assert_eq!(result.total_size, 11);
|
||||
assert_eq!(result.etag, md5_hex(b"hello world"));
|
||||
|
||||
let decoded = decode_inline(&result.inline_data, result.total_size as usize, &erasure).await;
|
||||
assert_eq!(decoded, b"hello world");
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -602,14 +602,6 @@ impl StorageAPI for Sets {
|
||||
(del_objects, del_errs)
|
||||
}
|
||||
|
||||
async fn complete_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
self.get_disks_by_key(object).complete_append(bucket, object, opts).await
|
||||
}
|
||||
|
||||
async fn abort_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
self.get_disks_by_key(object).abort_append(bucket, object, opts).await
|
||||
}
|
||||
|
||||
async fn list_object_parts(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
@@ -1709,17 +1709,6 @@ impl StorageAPI for ECStore {
|
||||
// Ok((del_objects, del_errs))
|
||||
}
|
||||
|
||||
async fn complete_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
let object = encode_dir_object(object);
|
||||
let (pinfo, _) = self.internal_get_pool_info_existing_with_opts(bucket, &object, opts).await?;
|
||||
self.pools[pinfo.index].complete_append(bucket, &object, opts).await
|
||||
}
|
||||
|
||||
async fn abort_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
let object = encode_dir_object(object);
|
||||
let (pinfo, _) = self.internal_get_pool_info_existing_with_opts(bucket, &object, opts).await?;
|
||||
self.pools[pinfo.index].abort_append(bucket, &object, opts).await
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn list_object_parts(
|
||||
&self,
|
||||
|
||||
@@ -328,8 +328,6 @@ pub struct ObjectOptions {
|
||||
pub max_parity: bool,
|
||||
pub mod_time: Option<OffsetDateTime>,
|
||||
pub part_number: Option<usize>,
|
||||
pub append_object: bool,
|
||||
pub append_position: Option<i64>,
|
||||
|
||||
pub delete_prefix: bool,
|
||||
pub delete_prefix_object: bool,
|
||||
@@ -658,15 +656,6 @@ impl ObjectInfo {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let append_state = fi.get_append_state();
|
||||
let pending_length: i64 = append_state.pending_segments.iter().map(|seg| seg.length).sum();
|
||||
let logical_size = append_state.committed_length.saturating_add(pending_length);
|
||||
let actual_size_meta = fi
|
||||
.metadata
|
||||
.get(&format!("{RESERVED_METADATA_PREFIX_LOWER}actual-size"))
|
||||
.and_then(|o| o.parse::<i64>().ok())
|
||||
.unwrap_or(logical_size);
|
||||
|
||||
ObjectInfo {
|
||||
bucket: bucket.to_string(),
|
||||
name,
|
||||
@@ -676,7 +665,7 @@ impl ObjectInfo {
|
||||
version_id,
|
||||
delete_marker: fi.deleted,
|
||||
mod_time: fi.mod_time,
|
||||
size: logical_size,
|
||||
size: fi.size,
|
||||
parts,
|
||||
is_latest: fi.is_latest,
|
||||
user_tags,
|
||||
@@ -688,7 +677,6 @@ impl ObjectInfo {
|
||||
inlined,
|
||||
user_defined: metadata,
|
||||
transitioned_object,
|
||||
actual_size: actual_size_meta,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -1200,10 +1188,6 @@ pub trait StorageAPI: ObjectIO + Debug {
|
||||
opts: ObjectOptions,
|
||||
) -> (Vec<DeletedObject>, Vec<Option<Error>>);
|
||||
|
||||
async fn complete_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
|
||||
async fn abort_append(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
|
||||
// TransitionObject TODO:
|
||||
// RestoreTransitionedObject TODO:
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ crc32fast = { workspace = true }
|
||||
rmp.workspace = true
|
||||
rmp-serde.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
time.workspace = true
|
||||
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
|
||||
tokio = { workspace = true, features = ["io-util", "macros", "sync"] }
|
||||
|
||||
@@ -1,541 +0,0 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
const APPEND_STATE_META_KEY: &str = "x-rustfs-internal-append-state";
|
||||
|
||||
/// Tracks the state of append-enabled objects.
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AppendState {
|
||||
pub state: AppendStateKind,
|
||||
pub epoch: u64,
|
||||
pub committed_length: i64,
|
||||
pub pending_segments: Vec<AppendSegment>,
|
||||
}
|
||||
|
||||
/// Represents individual append segments that still need consolidation.
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AppendSegment {
|
||||
pub offset: i64,
|
||||
pub length: i64,
|
||||
pub data_dir: Option<Uuid>,
|
||||
pub etag: Option<String>,
|
||||
pub epoch: u64,
|
||||
}
|
||||
|
||||
/// Possible append lifecycle states for an object version.
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub enum AppendStateKind {
|
||||
#[default]
|
||||
Disabled,
|
||||
Inline,
|
||||
InlinePendingSpill,
|
||||
SegmentedActive,
|
||||
SegmentedSealed,
|
||||
}
|
||||
|
||||
/// Persist the provided append state into object metadata.
|
||||
pub fn set_append_state(metadata: &mut HashMap<String, String>, state: &AppendState) -> Result<()> {
|
||||
let encoded = serde_json::to_string(state).map_err(Error::other)?;
|
||||
metadata.insert(APPEND_STATE_META_KEY.to_string(), encoded);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove the append state marker from metadata.
|
||||
pub fn clear_append_state(metadata: &mut HashMap<String, String>) {
|
||||
metadata.remove(APPEND_STATE_META_KEY);
|
||||
}
|
||||
|
||||
/// Load append state stored in metadata, if any.
|
||||
pub fn get_append_state(metadata: &HashMap<String, String>) -> Result<Option<AppendState>> {
|
||||
let raw = match metadata.get(APPEND_STATE_META_KEY) {
|
||||
Some(val) if !val.is_empty() => val,
|
||||
_ => return Ok(None),
|
||||
};
|
||||
|
||||
let decoded = serde_json::from_str(raw).map_err(Error::other)?;
|
||||
Ok(Some(decoded))
|
||||
}
|
||||
|
||||
/// Complete append operations by consolidating pending segments and sealing the object
|
||||
pub fn complete_append_operation(state: &mut AppendState) -> Result<()> {
|
||||
match state.state {
|
||||
AppendStateKind::SegmentedActive => {
|
||||
// Move all pending segments data to main parts and seal
|
||||
state.committed_length += state.pending_segments.iter().map(|s| s.length).sum::<i64>();
|
||||
state.pending_segments.clear();
|
||||
state.state = AppendStateKind::SegmentedSealed;
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
Ok(())
|
||||
}
|
||||
AppendStateKind::Inline => {
|
||||
// Inline objects are always immediately committed, just seal them
|
||||
state.state = AppendStateKind::SegmentedSealed; // Transition to sealed
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
Ok(())
|
||||
}
|
||||
AppendStateKind::InlinePendingSpill => {
|
||||
// Wait for spill to complete, then seal
|
||||
// In practice, this might need to trigger the spill completion first
|
||||
state.state = AppendStateKind::SegmentedSealed;
|
||||
state.pending_segments.clear();
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
Ok(())
|
||||
}
|
||||
AppendStateKind::SegmentedSealed | AppendStateKind::Disabled => {
|
||||
// Already sealed or disabled
|
||||
Err(Error::other("Cannot complete append on sealed or disabled object"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Abort append operations by discarding pending segments and returning to sealed state
|
||||
pub fn abort_append_operation(state: &mut AppendState) -> Result<()> {
|
||||
match state.state {
|
||||
AppendStateKind::SegmentedActive => {
|
||||
// Discard all pending segments and seal
|
||||
state.pending_segments.clear();
|
||||
state.state = AppendStateKind::SegmentedSealed;
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
Ok(())
|
||||
}
|
||||
AppendStateKind::Inline => {
|
||||
// Inline data is already committed, just seal
|
||||
state.state = AppendStateKind::SegmentedSealed;
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
Ok(())
|
||||
}
|
||||
AppendStateKind::InlinePendingSpill => {
|
||||
// Cancel spill and keep inline data, then seal
|
||||
state.state = AppendStateKind::SegmentedSealed;
|
||||
state.pending_segments.clear();
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
Ok(())
|
||||
}
|
||||
AppendStateKind::SegmentedSealed | AppendStateKind::Disabled => {
|
||||
// Already sealed or disabled
|
||||
Err(Error::other("Cannot abort append on sealed or disabled object"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if an append operation can be completed
|
||||
pub fn can_complete_append(state: &AppendState) -> bool {
|
||||
matches!(
|
||||
state.state,
|
||||
AppendStateKind::Inline | AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive
|
||||
)
|
||||
}
|
||||
|
||||
/// Check if an append operation can be aborted
|
||||
pub fn can_abort_append(state: &AppendState) -> bool {
|
||||
matches!(
|
||||
state.state,
|
||||
AppendStateKind::Inline | AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive
|
||||
)
|
||||
}
|
||||
|
||||
/// Verify epoch for optimistic concurrency control
|
||||
pub fn verify_append_epoch(current_state: &AppendState, expected_epoch: u64) -> Result<()> {
|
||||
if current_state.epoch != expected_epoch {
|
||||
Err(Error::other(format!(
|
||||
"Append operation conflict: expected epoch {}, found {}",
|
||||
expected_epoch, current_state.epoch
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare next append operation by incrementing epoch
|
||||
pub fn prepare_next_append(state: &mut AppendState) {
|
||||
state.epoch = state.epoch.saturating_add(1);
|
||||
}
|
||||
|
||||
/// Validate that a new append segment doesn't conflict with existing segments
|
||||
pub fn validate_new_segment(state: &AppendState, new_offset: i64, new_length: i64) -> Result<()> {
|
||||
let new_end = new_offset + new_length;
|
||||
|
||||
// Check it doesn't overlap with committed data
|
||||
if new_offset < state.committed_length {
|
||||
return Err(Error::other(format!(
|
||||
"New segment overlaps with committed data: offset {} < committed_length {}",
|
||||
new_offset, state.committed_length
|
||||
)));
|
||||
}
|
||||
|
||||
// Check it doesn't overlap with existing pending segments
|
||||
for existing in &state.pending_segments {
|
||||
let existing_start = existing.offset;
|
||||
let existing_end = existing.offset + existing.length;
|
||||
|
||||
// Check for any overlap
|
||||
if new_offset < existing_end && new_end > existing_start {
|
||||
return Err(Error::other(format!(
|
||||
"New segment [{}, {}) overlaps with existing segment [{}, {})",
|
||||
new_offset, new_end, existing_start, existing_end
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::fileinfo::FileInfo;
|
||||
|
||||
#[test]
|
||||
fn append_state_roundtrip_in_metadata() {
|
||||
let mut metadata = HashMap::new();
|
||||
let state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 42,
|
||||
committed_length: 2048,
|
||||
pending_segments: vec![AppendSegment {
|
||||
offset: 2048,
|
||||
length: 512,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("abc123".to_string()),
|
||||
epoch: 0,
|
||||
}],
|
||||
};
|
||||
|
||||
set_append_state(&mut metadata, &state).expect("persist append state");
|
||||
assert!(metadata.contains_key(APPEND_STATE_META_KEY));
|
||||
|
||||
let decoded = get_append_state(&metadata)
|
||||
.expect("decode append state")
|
||||
.expect("state present");
|
||||
assert_eq!(decoded, state);
|
||||
|
||||
clear_append_state(&mut metadata);
|
||||
assert!(!metadata.contains_key(APPEND_STATE_META_KEY));
|
||||
assert!(get_append_state(&metadata).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fileinfo_append_state_migration_compatibility() {
|
||||
// Test old inline data object
|
||||
let mut inline_fi = FileInfo {
|
||||
size: 1024,
|
||||
..Default::default()
|
||||
};
|
||||
inline_fi.set_inline_data();
|
||||
|
||||
let state = inline_fi.get_append_state();
|
||||
assert_eq!(state.state, AppendStateKind::Inline);
|
||||
assert_eq!(state.committed_length, 1024);
|
||||
assert!(state.pending_segments.is_empty());
|
||||
assert!(inline_fi.is_appendable());
|
||||
assert!(!inline_fi.has_pending_appends());
|
||||
|
||||
// Test old regular object
|
||||
let regular_fi = FileInfo {
|
||||
size: 2048,
|
||||
..Default::default()
|
||||
};
|
||||
// No inline_data marker
|
||||
|
||||
let state = regular_fi.get_append_state();
|
||||
assert_eq!(state.state, AppendStateKind::SegmentedSealed);
|
||||
assert_eq!(state.committed_length, 2048);
|
||||
assert!(state.pending_segments.is_empty());
|
||||
assert!(!regular_fi.is_appendable());
|
||||
assert!(!regular_fi.has_pending_appends());
|
||||
|
||||
// Test explicit append state
|
||||
let mut append_fi = FileInfo::default();
|
||||
let explicit_state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 5,
|
||||
committed_length: 1500,
|
||||
pending_segments: vec![AppendSegment {
|
||||
offset: 1500,
|
||||
length: 300,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("def456".to_string()),
|
||||
epoch: 0,
|
||||
}],
|
||||
};
|
||||
|
||||
append_fi.set_append_state(&explicit_state).expect("set explicit state");
|
||||
let retrieved_state = append_fi.get_append_state();
|
||||
assert_eq!(retrieved_state, explicit_state);
|
||||
assert!(append_fi.is_appendable());
|
||||
assert!(append_fi.has_pending_appends());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn append_state_transitions() {
|
||||
// Test state transition validation
|
||||
assert_eq!(AppendStateKind::default(), AppendStateKind::Disabled);
|
||||
|
||||
let inline_state = AppendState {
|
||||
state: AppendStateKind::Inline,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let spill_state = AppendState {
|
||||
state: AppendStateKind::InlinePendingSpill,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let active_state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let sealed_state = AppendState {
|
||||
state: AppendStateKind::SegmentedSealed,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Verify serialization works for all states
|
||||
for state in [inline_state, spill_state, active_state, sealed_state] {
|
||||
let mut metadata = HashMap::new();
|
||||
set_append_state(&mut metadata, &state).expect("serialize state");
|
||||
let decoded = get_append_state(&metadata).unwrap().unwrap();
|
||||
assert_eq!(decoded, state);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn complete_append_transitions() {
|
||||
// Test completing SegmentedActive with pending segments
|
||||
let mut active_state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 5,
|
||||
committed_length: 1000,
|
||||
pending_segments: vec![
|
||||
AppendSegment {
|
||||
offset: 1000,
|
||||
length: 200,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("abc123".to_string()),
|
||||
epoch: 0,
|
||||
},
|
||||
AppendSegment {
|
||||
offset: 1200,
|
||||
length: 300,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("def456".to_string()),
|
||||
epoch: 0,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
assert!(can_complete_append(&active_state));
|
||||
complete_append_operation(&mut active_state).expect("complete should succeed");
|
||||
|
||||
assert_eq!(active_state.state, AppendStateKind::SegmentedSealed);
|
||||
assert_eq!(active_state.committed_length, 1500); // 1000 + 200 + 300
|
||||
assert!(active_state.pending_segments.is_empty());
|
||||
assert_eq!(active_state.epoch, 6);
|
||||
|
||||
// Test completing Inline state
|
||||
let mut inline_state = AppendState {
|
||||
state: AppendStateKind::Inline,
|
||||
epoch: 2,
|
||||
committed_length: 500,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(can_complete_append(&inline_state));
|
||||
complete_append_operation(&mut inline_state).expect("complete should succeed");
|
||||
|
||||
assert_eq!(inline_state.state, AppendStateKind::SegmentedSealed);
|
||||
assert_eq!(inline_state.committed_length, 500); // Unchanged
|
||||
assert_eq!(inline_state.epoch, 3);
|
||||
|
||||
// Test completing already sealed state should fail
|
||||
let mut sealed_state = AppendState {
|
||||
state: AppendStateKind::SegmentedSealed,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(!can_complete_append(&sealed_state));
|
||||
assert!(complete_append_operation(&mut sealed_state).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn abort_append_transitions() {
|
||||
// Test aborting SegmentedActive with pending segments
|
||||
let mut active_state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 3,
|
||||
committed_length: 800,
|
||||
pending_segments: vec![AppendSegment {
|
||||
offset: 800,
|
||||
length: 400,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("xyz789".to_string()),
|
||||
epoch: 0,
|
||||
}],
|
||||
};
|
||||
|
||||
assert!(can_abort_append(&active_state));
|
||||
abort_append_operation(&mut active_state).expect("abort should succeed");
|
||||
|
||||
assert_eq!(active_state.state, AppendStateKind::SegmentedSealed);
|
||||
assert_eq!(active_state.committed_length, 800); // Unchanged, pending discarded
|
||||
assert!(active_state.pending_segments.is_empty());
|
||||
assert_eq!(active_state.epoch, 4);
|
||||
|
||||
// Test aborting InlinePendingSpill
|
||||
let mut spill_state = AppendState {
|
||||
state: AppendStateKind::InlinePendingSpill,
|
||||
epoch: 1,
|
||||
committed_length: 100,
|
||||
pending_segments: vec![],
|
||||
};
|
||||
|
||||
assert!(can_abort_append(&spill_state));
|
||||
abort_append_operation(&mut spill_state).expect("abort should succeed");
|
||||
|
||||
assert_eq!(spill_state.state, AppendStateKind::SegmentedSealed);
|
||||
assert_eq!(spill_state.committed_length, 100);
|
||||
assert_eq!(spill_state.epoch, 2);
|
||||
|
||||
// Test aborting disabled state should fail
|
||||
let mut disabled_state = AppendState {
|
||||
state: AppendStateKind::Disabled,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(!can_abort_append(&disabled_state));
|
||||
assert!(abort_append_operation(&mut disabled_state).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn epoch_validation() {
|
||||
let state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 10,
|
||||
committed_length: 1000,
|
||||
pending_segments: vec![],
|
||||
};
|
||||
|
||||
// Valid epoch should succeed
|
||||
assert!(verify_append_epoch(&state, 10).is_ok());
|
||||
|
||||
// Invalid epoch should fail
|
||||
assert!(verify_append_epoch(&state, 9).is_err());
|
||||
assert!(verify_append_epoch(&state, 11).is_err());
|
||||
|
||||
// Error message should contain epoch information
|
||||
let error = verify_append_epoch(&state, 5).unwrap_err();
|
||||
let error_msg = error.to_string();
|
||||
assert!(error_msg.contains("expected epoch 5"));
|
||||
assert!(error_msg.contains("found 10"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_append_preparation() {
|
||||
let mut state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 5,
|
||||
committed_length: 1000,
|
||||
pending_segments: vec![],
|
||||
};
|
||||
|
||||
prepare_next_append(&mut state);
|
||||
assert_eq!(state.epoch, 6);
|
||||
|
||||
// Test saturation behavior
|
||||
let mut max_state = AppendState {
|
||||
epoch: u64::MAX,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
prepare_next_append(&mut max_state);
|
||||
assert_eq!(max_state.epoch, u64::MAX); // Should saturate, not overflow
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn segment_validation() {
|
||||
let state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 3,
|
||||
committed_length: 1000,
|
||||
pending_segments: vec![
|
||||
AppendSegment {
|
||||
offset: 1000,
|
||||
length: 200,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("abc123".to_string()),
|
||||
epoch: 0,
|
||||
},
|
||||
AppendSegment {
|
||||
offset: 1300,
|
||||
length: 300,
|
||||
data_dir: Some(Uuid::new_v4()),
|
||||
etag: Some("def456".to_string()),
|
||||
epoch: 0,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
// Valid segment after existing segments
|
||||
assert!(validate_new_segment(&state, 1600, 100).is_ok());
|
||||
|
||||
// Valid segment filling gap between committed and first pending
|
||||
assert!(validate_new_segment(&state, 1200, 100).is_ok());
|
||||
|
||||
// Invalid segment overlapping with committed data
|
||||
assert!(validate_new_segment(&state, 900, 200).is_err());
|
||||
let error = validate_new_segment(&state, 900, 200).unwrap_err();
|
||||
assert!(error.to_string().contains("overlaps with committed data"));
|
||||
|
||||
// Invalid segment overlapping with first pending segment
|
||||
assert!(validate_new_segment(&state, 1100, 100).is_err());
|
||||
let error = validate_new_segment(&state, 1100, 100).unwrap_err();
|
||||
assert!(error.to_string().contains("overlaps with existing segment"));
|
||||
|
||||
// Invalid segment overlapping with second pending segment
|
||||
assert!(validate_new_segment(&state, 1400, 100).is_err());
|
||||
|
||||
// Edge case: segment exactly touching committed data (should be valid)
|
||||
assert!(validate_new_segment(&state, 1000, 0).is_ok());
|
||||
|
||||
// Edge case: segment exactly touching existing segment (should be valid)
|
||||
assert!(validate_new_segment(&state, 1200, 0).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn segment_validation_edge_cases() {
|
||||
let empty_state = AppendState {
|
||||
state: AppendStateKind::SegmentedActive,
|
||||
epoch: 1,
|
||||
committed_length: 500,
|
||||
pending_segments: vec![],
|
||||
};
|
||||
|
||||
// First segment after committed data
|
||||
assert!(validate_new_segment(&empty_state, 500, 100).is_ok());
|
||||
assert!(validate_new_segment(&empty_state, 600, 200).is_ok());
|
||||
|
||||
// Zero-length segments (edge case)
|
||||
assert!(validate_new_segment(&empty_state, 500, 0).is_ok());
|
||||
|
||||
// Segment exactly at committed boundary
|
||||
assert!(validate_new_segment(&empty_state, 499, 1).is_err());
|
||||
assert!(validate_new_segment(&empty_state, 500, 1).is_ok());
|
||||
}
|
||||
}
|
||||
@@ -494,96 +494,6 @@ impl FileInfo {
|
||||
ReplicationStatusType::Empty
|
||||
}
|
||||
}
|
||||
/// Get the append state for this FileInfo, with migration compatibility
|
||||
pub fn get_append_state(&self) -> crate::append::AppendState {
|
||||
use crate::append::{AppendState, AppendStateKind, get_append_state};
|
||||
|
||||
// Try to load from metadata first
|
||||
if let Ok(Some(state)) = get_append_state(&self.metadata) {
|
||||
return state;
|
||||
}
|
||||
|
||||
// Migration compatibility: determine state based on existing data
|
||||
if self.inline_data() {
|
||||
// Has inline data, treat as Inline state
|
||||
AppendState {
|
||||
state: AppendStateKind::Inline,
|
||||
epoch: 0,
|
||||
committed_length: self.size,
|
||||
pending_segments: Vec::new(),
|
||||
}
|
||||
} else {
|
||||
// No inline data, treat as SegmentedSealed (traditional object)
|
||||
AppendState {
|
||||
state: AppendStateKind::SegmentedSealed,
|
||||
epoch: 0,
|
||||
committed_length: self.size,
|
||||
pending_segments: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the append state for this FileInfo
|
||||
pub fn set_append_state(&mut self, state: &crate::append::AppendState) -> crate::error::Result<()> {
|
||||
crate::append::set_append_state(&mut self.metadata, state)
|
||||
}
|
||||
|
||||
/// Check if this object supports append operations
|
||||
pub fn is_appendable(&self) -> bool {
|
||||
use crate::append::AppendStateKind;
|
||||
match self.get_append_state().state {
|
||||
AppendStateKind::Disabled => false,
|
||||
AppendStateKind::Inline | AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive => true,
|
||||
AppendStateKind::SegmentedSealed => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if this object has pending append operations
|
||||
pub fn has_pending_appends(&self) -> bool {
|
||||
use crate::append::AppendStateKind;
|
||||
matches!(
|
||||
self.get_append_state().state,
|
||||
AppendStateKind::InlinePendingSpill | AppendStateKind::SegmentedActive
|
||||
)
|
||||
}
|
||||
|
||||
/// Complete all pending append operations and seal the object
|
||||
pub fn complete_append(&mut self) -> crate::error::Result<()> {
|
||||
let mut append_state = self.get_append_state();
|
||||
crate::append::complete_append_operation(&mut append_state)?;
|
||||
self.set_append_state(&append_state)?;
|
||||
|
||||
// Update file size to reflect completed operation
|
||||
if append_state.state == crate::append::AppendStateKind::SegmentedSealed {
|
||||
self.size = append_state.committed_length;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Abort all pending append operations and seal the object
|
||||
pub fn abort_append(&mut self) -> crate::error::Result<()> {
|
||||
let mut append_state = self.get_append_state();
|
||||
crate::append::abort_append_operation(&mut append_state)?;
|
||||
self.set_append_state(&append_state)?;
|
||||
|
||||
// Update file size to only include committed data
|
||||
if append_state.state == crate::append::AppendStateKind::SegmentedSealed {
|
||||
self.size = append_state.committed_length;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if append operations can be completed for this object
|
||||
pub fn can_complete_append(&self) -> bool {
|
||||
crate::append::can_complete_append(&self.get_append_state())
|
||||
}
|
||||
|
||||
/// Check if append operations can be aborted for this object
|
||||
pub fn can_abort_append(&self) -> bool {
|
||||
crate::append::can_abort_append(&self.get_append_state())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod append;
|
||||
mod error;
|
||||
pub mod fileinfo;
|
||||
mod filemeta;
|
||||
@@ -23,7 +22,6 @@ mod replication;
|
||||
|
||||
pub mod test_data;
|
||||
|
||||
pub use append::*;
|
||||
pub use error::*;
|
||||
pub use fileinfo::*;
|
||||
pub use filemeta::*;
|
||||
|
||||
@@ -420,7 +420,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn test_invalid_domain_resolution() {
|
||||
let resolver = LayeredDnsResolver::new().await.unwrap();
|
||||
|
||||
|
||||
@@ -1,147 +0,0 @@
|
||||
# Append Write Design
|
||||
|
||||
This document captures the current design of the append-write feature in RustFS so that new contributors can quickly understand the moving parts, data flows, and testing expectations.
|
||||
|
||||
## Goals & Non-Goals
|
||||
|
||||
### Goals
|
||||
- Allow clients to append payloads to existing objects without re-uploading the full body.
|
||||
- Support inline objects and spill seamlessly into segmented layout once thresholds are exceeded.
|
||||
- Preserve strong read-after-write semantics via optimistic concurrency controls (ETag / epoch).
|
||||
- Expose minimal S3-compatible surface area (`x-amz-object-append`, `x-amz-append-position`, `x-amz-append-action`).
|
||||
|
||||
### Non-Goals
|
||||
- Full multipart-upload parity; append is intentionally simpler and serialized per object.
|
||||
- Cross-object transactions; each object is isolated.
|
||||
- Rebalancing or background compaction (future work).
|
||||
|
||||
## State Machine
|
||||
|
||||
Append state is persisted inside `FileInfo.metadata` under `x-rustfs-internal-append-state` and serialized as `AppendState` (`crates/filemeta/src/append.rs`).
|
||||
|
||||
```
|
||||
Disabled --(initial PUT w/o append)--> SegmentedSealed
|
||||
Inline --(inline append)--> Inline / InlinePendingSpill
|
||||
InlinePendingSpill --(spill success)--> SegmentedActive
|
||||
SegmentedActive --(Complete)--> SegmentedSealed
|
||||
SegmentedActive --(Abort)--> SegmentedSealed
|
||||
SegmentedSealed --(new append)--> SegmentedActive
|
||||
```
|
||||
|
||||
Definitions:
|
||||
- **Inline**: Object data fully stored in metadata (`FileInfo.data`).
|
||||
- **InlinePendingSpill**: Inline data after append exceeded inline threshold; awaiting spill to disk.
|
||||
- **SegmentedActive**: Object data lives in erasure-coded part(s) plus one or more pending append segments on disk (`append/<epoch>/<uuid>`).
|
||||
- **SegmentedSealed**: No pending segments; logical content equals committed parts.
|
||||
|
||||
`AppendState` fields:
|
||||
- `state`: current state enum (see above).
|
||||
- `epoch`: monotonically increasing counter for concurrency control.
|
||||
- `committed_length`: logical size already durable in the base parts/inline region.
|
||||
- `pending_segments`: ordered list of `AppendSegment { offset, length, data_dir, etag, epoch }`.
|
||||
|
||||
## Metadata & Storage Layout
|
||||
|
||||
### Inline Objects
|
||||
- Inline payload stored in `FileInfo.data`.
|
||||
- Hash metadata maintained through `append_inline_data` (re-encoding with bitrot writer when checksums exist).
|
||||
- When spilling is required, inline data is decoded, appended, and re-encoded into erasure shards written to per-disk `append/<epoch>/<segment_id>/part.1` temporary path before rename to primary data directory.
|
||||
|
||||
### Segmented Objects
|
||||
- Base object content is represented by standard erasure-coded parts (`FileInfo.parts`, `FileInfo.data_dir`).
|
||||
- Pending append segments live under `<object>/append/<epoch>/<segment_uuid>/part.1` (per disk).
|
||||
- Each append stores segment metadata (`etag`, `offset`, `length`) inside `AppendState.pending_segments` and updates `FileInfo.size` to include pending bytes.
|
||||
- Aggregate ETag is recomputed using multipart MD5 helper (`get_complete_multipart_md5`).
|
||||
|
||||
### Metadata Writes
|
||||
- `SetDisks::write_unique_file_info` persists `FileInfo` updates to the quorum of disks.
|
||||
- During spill/append/complete/abort, all mirrored `FileInfo` copies within `parts_metadata` are updated to keep nodes consistent.
|
||||
- Abort ensures inline markers are cleared (`x-rustfs-internal-inline-data`) and `FileInfo.data = None` to avoid stale inline reads.
|
||||
|
||||
## Request Flows
|
||||
|
||||
### Append (Inline Path)
|
||||
1. Handler (`rustfs/src/storage/ecfs.rs`) validates headers and fills `ObjectOptions.append_*`.
|
||||
2. `SetDisks::append_inline_object` verifies append position using `AppendState` snapshot.
|
||||
3. Existing inline payload decoded (if checksums present) and appended in-memory (`append_inline_data`).
|
||||
4. Storage class decision determines whether to remain inline or spill.
|
||||
5. Inline success updates `FileInfo.data`, metadata, `AppendState` (state `Inline`, lengths updated).
|
||||
6. Spill path delegates to `spill_inline_into_segmented` (see segmented path below).
|
||||
|
||||
### Append (Segmented Path)
|
||||
1. `SetDisks::append_segmented_object` validates state (must be `SegmentedActive` or `SegmentedSealed`).
|
||||
2. Snapshot expected offset = committed length + sum of pending segments.
|
||||
3. Payload encoded using erasure coding; shards written to temp volume; renamed into `append/<epoch>/<segment_uuid>` under object data directory.
|
||||
4. New `AppendSegment` pushed, `AppendState.epoch` incremented, aggregated ETag recalculated.
|
||||
5. `FileInfo.size` reflects committed + pending bytes; metadata persisted across quorum.
|
||||
|
||||
### GET / Range Reads
|
||||
1. `SetDisks::get_object_with_fileinfo` inspects `AppendState`.
|
||||
2. Reads committed data from inline or erasure parts (ignoring inline buffers once segmented).
|
||||
3. If requested range includes pending segments, loader fetches each segment via `load_pending_segment`, decodes shards, and streams appended bytes.
|
||||
|
||||
### Complete Append (`x-amz-append-action: complete`)
|
||||
1. `complete_append_object` fetches current `FileInfo`, ensures pending segments exist.
|
||||
2. Entire logical object (committed + pending) streamed through `VecAsyncWriter` (TODO: potential optimization) to produce contiguous payload.
|
||||
3. Inline spill routine (`spill_inline_into_segmented`) consolidates data into primary part, sets state `SegmentedSealed`, clears pending list, updates `committed_length`.
|
||||
4. Pending segment directories removed and quorum metadata persisted.
|
||||
|
||||
### Abort Append (`x-amz-append-action: abort`)
|
||||
1. `abort_append_object` removes pending segment directories.
|
||||
2. Ensures `committed_length` matches actual durable data (inline length or sum of parts); logs and corrects if mismatch is found.
|
||||
3. Clears pending list, sets state `SegmentedSealed`, bumps epoch, removes inline markers/data.
|
||||
4. Persists metadata and returns base ETag (multipart MD5 of committed parts).
|
||||
|
||||
## Error Handling & Recovery
|
||||
|
||||
- All disk writes go through quorum helpers (`reduce_write_quorum_errs`, `reduce_read_quorum_errs`) and propagate `StorageError` variants for HTTP mapping.
|
||||
- Append operations are single-threaded per object via locking in higher layers (`fast_lock_manager` in `SetDisks::put_object`).
|
||||
- On spill/append rename failure, temp directories are cleaned up; operation aborts without mutating metadata.
|
||||
- Abort path now realigns `committed_length` if metadata drifted (observed during development) and strips inline remnants to prevent stale reads.
|
||||
- Pending segments are only removed once metadata update succeeds; no partial deletion is performed ahead of state persistence.
|
||||
|
||||
## Concurrency
|
||||
|
||||
- Append requests rely on exact `x-amz-append-position` to ensure the client has an up-to-date view.
|
||||
- Optional header `If-Match` is honored in S3 handler before actual append (shared with regular PUT path).
|
||||
- `AppendState.epoch` increments after each append/complete/abort; future work may expose it for stronger optimistic control.
|
||||
- e2e test `append_segments_concurrency_then_complete` verifies that simultaneous appends result in exactly one success; the loser receives 400.
|
||||
|
||||
## Key Modules
|
||||
|
||||
- `crates/ecstore/src/set_disk.rs`: core implementation (inline append, spill, segmented append, complete, abort, GET integration).
|
||||
- `crates/ecstore/src/erasure_coding/{encode,decode}.rs`: encode/decode helpers used by append pipeline.
|
||||
- `crates/filemeta/src/append.rs`: metadata schema + helper functions.
|
||||
- `rustfs/src/storage/ecfs.rs`: HTTP/S3 layer that parses headers and routes to append operations.
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### Unit Tests
|
||||
- `crates/filemeta/src/append.rs` covers serialization and state transitions.
|
||||
- `crates/ecstore/src/set_disk.rs` contains lower-level utilities and regression tests for metadata helpers.
|
||||
- Additional unit coverage is recommended for spill/append failure paths (e.g., injected rename failures).
|
||||
|
||||
### End-to-End Tests (`cargo test --package e2e_test append`)
|
||||
- Inline append success, wrong position, precondition failures.
|
||||
- Segmented append success, wrong position, wrong ETag.
|
||||
- Spill threshold transition (`append_threshold_crossing_inline_to_segmented`).
|
||||
- Pending segment streaming (`append_range_requests_across_segments`).
|
||||
- Complete append consolidates pending segments.
|
||||
- Abort append discards pending data and allows new append.
|
||||
- Concurrency: two clients racing to append, followed by additional append + complete.
|
||||
|
||||
### Tooling Considerations
|
||||
- `make clippy` must pass; the append code relies on async operations and custom logging.
|
||||
- `make test` / `cargo nextest run` recommended before submitting PRs.
|
||||
- Use `RUST_LOG=rustfs_ecstore=debug` when debugging append flows; targeted `info!`/`warn!` logs are emitted during spill/abort.
|
||||
|
||||
## Future Work
|
||||
|
||||
- Streamed consolidation in `complete_append_object` to avoid buffering entire logical object.
|
||||
- Throttling or automatic `Complete` when pending segments exceed size/quantity thresholds.
|
||||
- Stronger epoch exposure to clients (header-based conflict detection).
|
||||
- Automated cleanup or garbage collection for orphaned `append/*` directories.
|
||||
|
||||
---
|
||||
|
||||
For questions or design discussions, drop a note in the append-write channel or ping the storage team.
|
||||
@@ -2288,92 +2288,10 @@ impl S3 for FS {
|
||||
let mt = metadata.clone();
|
||||
let mt2 = metadata.clone();
|
||||
|
||||
let append_flag = req.headers.get("x-amz-object-append");
|
||||
let append_action_header = req.headers.get("x-amz-append-action");
|
||||
let mut append_requested = false;
|
||||
let mut append_position: Option<i64> = None;
|
||||
if let Some(flag_value) = append_flag {
|
||||
let flag_str = flag_value.to_str().map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::InvalidArgument, "invalid x-amz-object-append header".to_string())
|
||||
})?;
|
||||
if flag_str.eq_ignore_ascii_case("true") {
|
||||
append_requested = true;
|
||||
let position_value = req.headers.get("x-amz-append-position").ok_or_else(|| {
|
||||
S3Error::with_message(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"x-amz-append-position header required when x-amz-object-append is true".to_string(),
|
||||
)
|
||||
})?;
|
||||
let position_str = position_value.to_str().map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::InvalidArgument, "invalid x-amz-append-position header".to_string())
|
||||
})?;
|
||||
let position = position_str.parse::<i64>().map_err(|_| {
|
||||
S3Error::with_message(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"x-amz-append-position must be a non-negative integer".to_string(),
|
||||
)
|
||||
})?;
|
||||
if position < 0 {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"x-amz-append-position must be a non-negative integer".to_string(),
|
||||
));
|
||||
}
|
||||
append_position = Some(position);
|
||||
} else if !flag_str.eq_ignore_ascii_case("false") {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"x-amz-object-append must be 'true' or 'false'".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let mut append_action: Option<String> = None;
|
||||
if let Some(action_value) = append_action_header {
|
||||
let action_str = action_value.to_str().map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::InvalidArgument, "invalid x-amz-append-action header".to_string())
|
||||
})?;
|
||||
append_action = Some(action_str.to_ascii_lowercase());
|
||||
}
|
||||
|
||||
let mut opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, mt)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
if append_requested {
|
||||
opts.append_object = true;
|
||||
opts.append_position = append_position;
|
||||
}
|
||||
|
||||
if let Some(action) = append_action {
|
||||
if append_requested {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"x-amz-object-append cannot be combined with x-amz-append-action".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let obj_info = match action.as_str() {
|
||||
"complete" => store.complete_append(&bucket, &key, &opts).await,
|
||||
"abort" => store.abort_append(&bucket, &key, &opts).await,
|
||||
_ => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"x-amz-append-action must be 'complete' or 'abort'".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let output = PutObjectOutput {
|
||||
e_tag: obj_info.etag.clone(),
|
||||
version_id: obj_info.version_id.map(|v| v.to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
return Ok(S3Response::new(output));
|
||||
}
|
||||
|
||||
let repoptions =
|
||||
get_must_replicate_options(&mt2, "".to_string(), ReplicationStatusType::Empty, ReplicationType::Object, opts.clone());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user