From ef0dbaaeb537646ee1f287522b5dd75ab1a08075 Mon Sep 17 00:00:00 2001 From: guojidan <63799833+guojidan@users.noreply.github.com> Date: Wed, 24 Sep 2025 02:09:04 -0700 Subject: [PATCH] feat(encryption): add managed encryption support for SSE-S3 and SSE-KMS (#583) Signed-off-by: junxiang Mu <1948535941@qq.com> --- .../src/kms/encryption_metadata_test.rs | 386 ++++++++++++++ crates/e2e_test/src/kms/mod.rs | 3 + crates/ecstore/src/set_disk.rs | 70 ++- crates/rio/src/encrypt_reader.rs | 110 +++- rustfs/src/storage/ecfs.rs | 493 ++++++++++++++++-- 5 files changed, 1007 insertions(+), 55 deletions(-) create mode 100644 crates/e2e_test/src/kms/encryption_metadata_test.rs diff --git a/crates/e2e_test/src/kms/encryption_metadata_test.rs b/crates/e2e_test/src/kms/encryption_metadata_test.rs new file mode 100644 index 00000000..cc458fc3 --- /dev/null +++ b/crates/e2e_test/src/kms/encryption_metadata_test.rs @@ -0,0 +1,386 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration tests that focus on surface headers/metadata emitted by the +//! managed encryption pipeline (SSE-S3/SSE-KMS). + +use super::common::LocalKMSTestEnvironment; +use crate::common::{TEST_BUCKET, init_logging}; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::types::{ + CompletedMultipartUpload, CompletedPart, ServerSideEncryption, ServerSideEncryptionByDefault, + ServerSideEncryptionConfiguration, ServerSideEncryptionRule, +}; +use serial_test::serial; +use std::collections::{HashMap, VecDeque}; +use tracing::info; + +fn assert_encryption_metadata(metadata: &HashMap, expected_size: usize) { + for key in [ + "x-rustfs-encryption-key", + "x-rustfs-encryption-iv", + "x-rustfs-encryption-context", + "x-rustfs-encryption-original-size", + ] { + assert!(metadata.contains_key(key), "expected managed encryption metadata '{}' to be present", key); + assert!( + !metadata.get(key).unwrap().is_empty(), + "managed encryption metadata '{}' should not be empty", + key + ); + } + + let size_value = metadata + .get("x-rustfs-encryption-original-size") + .expect("managed encryption metadata should include original size"); + let parsed_size: usize = size_value + .parse() + .expect("x-rustfs-encryption-original-size should be numeric"); + assert_eq!(parsed_size, expected_size, "recorded original size should match uploaded payload length"); +} + +fn assert_storage_encrypted(storage_root: &std::path::Path, bucket: &str, key: &str, plaintext: &[u8]) { + let mut stack = VecDeque::from([storage_root.to_path_buf()]); + let mut scanned = 0; + let mut plaintext_path: Option = None; + + while let Some(current) = stack.pop_front() { + let Ok(metadata) = std::fs::metadata(¤t) else { continue }; + if metadata.is_dir() { + if let Ok(entries) = std::fs::read_dir(¤t) { + for entry in entries.flatten() { + stack.push_back(entry.path()); + } + } + continue; + } + + let path_str = current.to_string_lossy(); + if !(path_str.contains(bucket) || path_str.contains(key)) { + continue; + } + + scanned += 1; + let Ok(bytes) = std::fs::read(¤t) else { continue }; + if bytes.len() < plaintext.len() { + continue; + } + if bytes.windows(plaintext.len()).any(|window| window == plaintext) { + plaintext_path = Some(current); + break; + } + } + + assert!( + scanned > 0, + "Failed to locate stored data files for bucket '{}' and key '{}' under {:?}", + bucket, + key, + storage_root + ); + assert!(plaintext_path.is_none(), "Plaintext detected on disk at {:?}", plaintext_path.unwrap()); +} + +#[tokio::test] +#[serial] +async fn test_head_reports_managed_metadata_for_sse_s3() -> Result<(), Box> { + init_logging(); + info!("Validating SSE-S3 managed encryption metadata exposure"); + + let mut kms_env = LocalKMSTestEnvironment::new().await?; + let _default_key = kms_env.start_rustfs_for_local_kms().await?; + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let s3_client = kms_env.base_env.create_s3_client(); + kms_env.base_env.create_test_bucket(TEST_BUCKET).await?; + + // Bucket level default SSE-S3 configuration. + let encryption_config = ServerSideEncryptionConfiguration::builder() + .rules( + ServerSideEncryptionRule::builder() + .apply_server_side_encryption_by_default( + ServerSideEncryptionByDefault::builder() + .sse_algorithm(ServerSideEncryption::Aes256) + .build() + .unwrap(), + ) + .build(), + ) + .build() + .unwrap(); + + s3_client + .put_bucket_encryption() + .bucket(TEST_BUCKET) + .server_side_encryption_configuration(encryption_config) + .send() + .await?; + + let payload = b"metadata-sse-s3-payload"; + let key = "metadata-sse-s3-object"; + + s3_client + .put_object() + .bucket(TEST_BUCKET) + .key(key) + .body(payload.to_vec().into()) + .send() + .await?; + + let head = s3_client.head_object().bucket(TEST_BUCKET).key(key).send().await?; + + assert_eq!( + head.server_side_encryption(), + Some(&ServerSideEncryption::Aes256), + "head_object should advertise SSE-S3" + ); + + let metadata = head + .metadata() + .expect("head_object should return managed encryption metadata"); + assert_encryption_metadata(metadata, payload.len()); + + assert_storage_encrypted(std::path::Path::new(&kms_env.base_env.temp_dir), TEST_BUCKET, key, payload); + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn test_head_reports_managed_metadata_for_sse_kms_and_copy() -> Result<(), Box> { + init_logging(); + info!("Validating SSE-KMS managed encryption metadata (including copy)"); + + let mut kms_env = LocalKMSTestEnvironment::new().await?; + let default_key_id = kms_env.start_rustfs_for_local_kms().await?; + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let s3_client = kms_env.base_env.create_s3_client(); + kms_env.base_env.create_test_bucket(TEST_BUCKET).await?; + + let encryption_config = ServerSideEncryptionConfiguration::builder() + .rules( + ServerSideEncryptionRule::builder() + .apply_server_side_encryption_by_default( + ServerSideEncryptionByDefault::builder() + .sse_algorithm(ServerSideEncryption::AwsKms) + .kms_master_key_id(&default_key_id) + .build() + .unwrap(), + ) + .build(), + ) + .build() + .unwrap(); + + s3_client + .put_bucket_encryption() + .bucket(TEST_BUCKET) + .server_side_encryption_configuration(encryption_config) + .send() + .await?; + + let payload = b"metadata-sse-kms-payload"; + let source_key = "metadata-sse-kms-object"; + + s3_client + .put_object() + .bucket(TEST_BUCKET) + .key(source_key) + .body(payload.to_vec().into()) + .send() + .await?; + + let head_source = s3_client.head_object().bucket(TEST_BUCKET).key(source_key).send().await?; + + assert_eq!( + head_source.server_side_encryption(), + Some(&ServerSideEncryption::AwsKms), + "source object should report SSE-KMS" + ); + assert_eq!( + head_source.ssekms_key_id().unwrap(), + &default_key_id, + "source object should maintain the configured KMS key id" + ); + let source_metadata = head_source + .metadata() + .expect("source object should include managed encryption metadata"); + assert_encryption_metadata(source_metadata, payload.len()); + + let dest_key = "metadata-sse-kms-object-copy"; + let copy_source = format!("{}/{}", TEST_BUCKET, source_key); + + s3_client + .copy_object() + .bucket(TEST_BUCKET) + .key(dest_key) + .copy_source(copy_source) + .send() + .await?; + + let head_dest = s3_client.head_object().bucket(TEST_BUCKET).key(dest_key).send().await?; + + assert_eq!( + head_dest.server_side_encryption(), + Some(&ServerSideEncryption::AwsKms), + "copied object should remain encrypted with SSE-KMS" + ); + assert_eq!( + head_dest.ssekms_key_id().unwrap(), + &default_key_id, + "copied object should keep the default KMS key id" + ); + let dest_metadata = head_dest + .metadata() + .expect("copied object should include managed encryption metadata"); + assert_encryption_metadata(dest_metadata, payload.len()); + + let copied_body = s3_client + .get_object() + .bucket(TEST_BUCKET) + .key(dest_key) + .send() + .await? + .body + .collect() + .await? + .into_bytes(); + assert_eq!(&copied_body[..], payload, "copied object payload should match source"); + + let storage_root = std::path::Path::new(&kms_env.base_env.temp_dir); + assert_storage_encrypted(storage_root, TEST_BUCKET, source_key, payload); + assert_storage_encrypted(storage_root, TEST_BUCKET, dest_key, payload); + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn test_multipart_upload_writes_encrypted_data() -> Result<(), Box> { + init_logging(); + info!("Validating ciphertext persistence for multipart SSE-KMS uploads"); + + let mut kms_env = LocalKMSTestEnvironment::new().await?; + let default_key_id = kms_env.start_rustfs_for_local_kms().await?; + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let s3_client = kms_env.base_env.create_s3_client(); + kms_env.base_env.create_test_bucket(TEST_BUCKET).await?; + + let encryption_config = ServerSideEncryptionConfiguration::builder() + .rules( + ServerSideEncryptionRule::builder() + .apply_server_side_encryption_by_default( + ServerSideEncryptionByDefault::builder() + .sse_algorithm(ServerSideEncryption::AwsKms) + .kms_master_key_id(&default_key_id) + .build() + .unwrap(), + ) + .build(), + ) + .build() + .unwrap(); + + s3_client + .put_bucket_encryption() + .bucket(TEST_BUCKET) + .server_side_encryption_configuration(encryption_config) + .send() + .await?; + + let key = "multipart-encryption-object"; + let part_size = 5 * 1024 * 1024; // minimum part size required by S3 semantics + let part_one = vec![0xA5; part_size]; + let part_two = vec![0x5A; part_size]; + let combined: Vec = part_one.iter().chain(part_two.iter()).copied().collect(); + + let create_output = s3_client + .create_multipart_upload() + .bucket(TEST_BUCKET) + .key(key) + .send() + .await?; + + let upload_id = create_output.upload_id().unwrap(); + + let part1 = s3_client + .upload_part() + .bucket(TEST_BUCKET) + .key(key) + .upload_id(upload_id) + .part_number(1) + .body(ByteStream::from(part_one.clone())) + .send() + .await?; + + let part2 = s3_client + .upload_part() + .bucket(TEST_BUCKET) + .key(key) + .upload_id(upload_id) + .part_number(2) + .body(ByteStream::from(part_two.clone())) + .send() + .await?; + + let completed = CompletedMultipartUpload::builder() + .parts(CompletedPart::builder().part_number(1).e_tag(part1.e_tag().unwrap()).build()) + .parts(CompletedPart::builder().part_number(2).e_tag(part2.e_tag().unwrap()).build()) + .build(); + + s3_client + .complete_multipart_upload() + .bucket(TEST_BUCKET) + .key(key) + .upload_id(upload_id) + .multipart_upload(completed) + .send() + .await?; + + let head = s3_client.head_object().bucket(TEST_BUCKET).key(key).send().await?; + assert_eq!( + head.server_side_encryption(), + Some(&ServerSideEncryption::AwsKms), + "multipart head_object should expose SSE-KMS" + ); + assert_eq!( + head.ssekms_key_id().unwrap(), + &default_key_id, + "multipart object should retain bucket default KMS key" + ); + + assert_encryption_metadata( + head.metadata().expect("multipart head_object should expose managed metadata"), + combined.len(), + ); + + // Data returned to clients should decrypt back to original payload + let fetched = s3_client + .get_object() + .bucket(TEST_BUCKET) + .key(key) + .send() + .await? + .body + .collect() + .await? + .into_bytes(); + assert_eq!(&fetched[..], &combined[..]); + + assert_storage_encrypted(std::path::Path::new(&kms_env.base_env.temp_dir), TEST_BUCKET, key, &combined); + + Ok(()) +} diff --git a/crates/e2e_test/src/kms/mod.rs b/crates/e2e_test/src/kms/mod.rs index 2a0d8a7a..d824f77d 100644 --- a/crates/e2e_test/src/kms/mod.rs +++ b/crates/e2e_test/src/kms/mod.rs @@ -44,3 +44,6 @@ mod test_runner; #[cfg(test)] mod bucket_default_encryption_test; + +#[cfg(test)] +mod encryption_metadata_test; diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 96e717f2..d963a208 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -2143,6 +2143,7 @@ impl SetDisks { where W: AsyncWrite + Send + Sync + Unpin + 'static, { + tracing::debug!(bucket, object, requested_length = length, offset, "get_object_with_fileinfo start"); let (disks, files) = Self::shuffle_disks_and_parts_metadata_by_index(disks, &files, &fi); let total_size = fi.size as usize; @@ -2160,28 +2161,46 @@ impl SetDisks { let (part_index, mut part_offset) = fi.to_part_offset(offset)?; - // debug!( - // "get_object_with_fileinfo start offset:{}, part_index:{},part_offset:{}", - // offset, part_index, part_offset - // ); - let mut end_offset = offset; if length > 0 { end_offset += length - 1 } - let (last_part_index, _) = fi.to_part_offset(end_offset)?; + let (last_part_index, last_part_relative_offset) = fi.to_part_offset(end_offset)?; + + tracing::debug!( + bucket, + object, + offset, + length, + end_offset, + part_index, + last_part_index, + last_part_relative_offset, + "Multipart read bounds" + ); let erasure = erasure_coding::Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); + let part_indices: Vec = (part_index..=last_part_index).collect(); + tracing::debug!(bucket, object, ?part_indices, "Multipart part indices to stream"); + let mut total_read = 0; - for i in part_index..=last_part_index { + for current_part in part_indices { if total_read == length { + tracing::debug!( + bucket, + object, + total_read, + requested_length = length, + part_index = current_part, + "Stopping multipart stream early because accumulated bytes match request" + ); break; } - let part_number = fi.parts[i].number; - let part_size = fi.parts[i].size; + let part_number = fi.parts[current_part].number; + let part_size = fi.parts[current_part].size; let mut part_length = part_size - part_offset; if part_length > (length - total_read) { part_length = length - total_read @@ -2191,6 +2210,21 @@ impl SetDisks { let read_offset = (part_offset / erasure.block_size) * erasure.shard_size(); + tracing::debug!( + bucket, + object, + part_index = current_part, + part_number, + part_offset, + part_size, + part_length, + read_offset, + till_offset, + total_read_before = total_read, + requested_length = length, + "Streaming multipart part" + ); + let mut readers = Vec::with_capacity(disks.len()); let mut errors = Vec::with_capacity(disks.len()); for (idx, disk_op) in disks.iter().enumerate() { @@ -2236,6 +2270,15 @@ impl SetDisks { // part_number, part_offset, part_length, part_size // ); let (written, err) = erasure.decode(writer, readers, part_offset, part_length, part_size).await; + tracing::debug!( + bucket, + object, + part_index = current_part, + part_number, + part_length, + bytes_written = written, + "Finished decoding multipart part" + ); if let Some(e) = err { let de_err: DiskError = e.into(); let mut has_err = true; @@ -2274,6 +2317,8 @@ impl SetDisks { // debug!("read end"); + tracing::debug!(bucket, object, total_read, expected_length = length, "Multipart read finished"); + Ok(()) } @@ -3462,12 +3507,13 @@ impl ObjectIO for SetDisks { // let _guard_to_hold = _read_lock_guard; // moved into closure below tokio::spawn(async move { // let _guard = _guard_to_hold; // keep guard alive until task ends + let mut writer = wd; if let Err(e) = Self::get_object_with_fileinfo( &bucket, &object, offset, length, - &mut Box::new(wd), + &mut writer, fi, files, &disks, @@ -5377,6 +5423,7 @@ impl StorageAPI for SetDisks { } let ext_part = &curr_fi.parts[i]; + tracing::info!(target:"rustfs_ecstore::set_disk", part_number = p.part_num, part_size = ext_part.size, part_actual_size = ext_part.actual_size, "Completing multipart part"); if p.etag != Some(ext_part.etag.clone()) { error!( @@ -5436,6 +5483,9 @@ impl StorageAPI for SetDisks { fi.metadata .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}actual-size"), object_actual_size.to_string()); + fi.metadata + .insert("x-rustfs-encryption-original-size".to_string(), object_actual_size.to_string()); + if fi.is_compressed() { fi.metadata .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression-size"), object_size.to_string()); diff --git a/crates/rio/src/encrypt_reader.rs b/crates/rio/src/encrypt_reader.rs index 5c60855a..d6dd24c8 100644 --- a/crates/rio/src/encrypt_reader.rs +++ b/crates/rio/src/encrypt_reader.rs @@ -23,6 +23,7 @@ use rustfs_utils::{put_uvarint, put_uvarint_len}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, ReadBuf}; +use tracing::debug; pin_project! { /// A reader wrapper that encrypts data on the fly using AES-256-GCM. @@ -119,7 +120,7 @@ where header[5] = ((crc >> 8) & 0xFF) as u8; header[6] = ((crc >> 16) & 0xFF) as u8; header[7] = ((crc >> 24) & 0xFF) as u8; - println!( + debug!( "encrypt block header typ=0 len={} header={:?} plaintext_len={} ciphertext_len={}", clen, header, @@ -184,7 +185,10 @@ pin_project! { #[pin] pub inner: R, key: [u8; 32], // AES-256-GCM key - nonce: [u8; 12], // 96-bit nonce for GCM + base_nonce: [u8; 12], // Base nonce recorded in object metadata + current_nonce: [u8; 12], // Active nonce for the current encrypted segment + multipart_mode: bool, + current_part: usize, buffer: Vec, buffer_pos: usize, finished: bool, @@ -206,7 +210,35 @@ where Self { inner, key, - nonce, + base_nonce: nonce, + current_nonce: nonce, + multipart_mode: false, + current_part: 0, + buffer: Vec::new(), + buffer_pos: 0, + finished: false, + header_buf: [0u8; 8], + header_read: 0, + header_done: false, + ciphertext_buf: None, + ciphertext_read: 0, + ciphertext_len: 0, + } + } + + pub fn new_multipart(inner: R, key: [u8; 32], base_nonce: [u8; 12]) -> Self { + let first_part = 1; + let initial_nonce = derive_part_nonce(&base_nonce, first_part); + + debug!("decrypt_reader: initialized multipart mode"); + + Self { + inner, + key, + base_nonce, + current_nonce: initial_nonce, + multipart_mode: true, + current_part: first_part, buffer: Vec::new(), buffer_pos: 0, finished: false, @@ -287,7 +319,23 @@ where *this.header_done = false; if typ == 0xFF { + if *this.multipart_mode { + debug!( + next_part = *this.current_part + 1, + "decrypt_reader: reached segment terminator, advancing to next part" + ); + *this.current_part += 1; + *this.current_nonce = derive_part_nonce(this.base_nonce, *this.current_part); + this.ciphertext_buf.take(); + *this.ciphertext_read = 0; + *this.ciphertext_len = 0; + continue; + } + *this.finished = true; + this.ciphertext_buf.take(); + *this.ciphertext_read = 0; + *this.ciphertext_len = 0; continue; } @@ -342,11 +390,17 @@ where let ciphertext = &ciphertext_buf[uvarint_len as usize..]; let cipher = Aes256Gcm::new_from_slice(this.key).expect("key"); - let nonce = Nonce::from_slice(this.nonce); + let nonce = Nonce::from_slice(this.current_nonce); let plaintext = cipher .decrypt(nonce, ciphertext) .map_err(|e| std::io::Error::other(format!("decrypt error: {e}")))?; + debug!( + part = *this.current_part, + plaintext_len = plaintext.len(), + "decrypt_reader: decrypted chunk" + ); + if plaintext.len() != plaintext_len as usize { this.ciphertext_buf.take(); *this.ciphertext_read = 0; @@ -407,6 +461,16 @@ where } } +fn derive_part_nonce(base: &[u8; 12], part_number: usize) -> [u8; 12] { + let mut nonce = *base; + let mut suffix = [0u8; 4]; + suffix.copy_from_slice(&nonce[8..12]); + let current = u32::from_be_bytes(suffix); + let next = current.wrapping_add(part_number as u32); + nonce[8..12].copy_from_slice(&next.to_be_bytes()); + nonce +} + #[cfg(test)] mod tests { use std::io::Cursor; @@ -495,4 +559,42 @@ mod tests { assert_eq!(&decrypted, &data); } + + #[tokio::test] + async fn test_decrypt_reader_multipart_segments() { + let mut key = [0u8; 32]; + let mut base_nonce = [0u8; 12]; + rand::rng().fill_bytes(&mut key); + rand::rng().fill_bytes(&mut base_nonce); + + let part_one = vec![0xA5; 512 * 1024]; + let part_two = vec![0x5A; 256 * 1024]; + + async fn encrypt_part(data: &[u8], key: [u8; 32], base_nonce: [u8; 12], part_number: usize) -> Vec { + let nonce = derive_part_nonce(&base_nonce, part_number); + let reader = BufReader::new(Cursor::new(data.to_vec())); + let mut encrypt_reader = EncryptReader::new(WarpReader::new(reader), key, nonce); + let mut encrypted = Vec::new(); + encrypt_reader.read_to_end(&mut encrypted).await.unwrap(); + encrypted + } + + let encrypted_one = encrypt_part(&part_one, key, base_nonce, 1).await; + let encrypted_two = encrypt_part(&part_two, key, base_nonce, 2).await; + + let mut combined = Vec::with_capacity(encrypted_one.len() + encrypted_two.len()); + combined.extend_from_slice(&encrypted_one); + combined.extend_from_slice(&encrypted_two); + + let reader = BufReader::new(Cursor::new(combined)); + let mut decrypt_reader = DecryptReader::new_multipart(WarpReader::new(reader), key, base_nonce); + let mut decrypted = Vec::new(); + decrypt_reader.read_to_end(&mut decrypted).await.unwrap(); + + let mut expected = Vec::with_capacity(part_one.len() + part_two.len()); + expected.extend_from_slice(&part_one); + expected.extend_from_slice(&part_two); + + assert_eq!(decrypted, expected); + } } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index fad6da0e..c21d8435 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -71,8 +71,12 @@ use rustfs_ecstore::store_api::ObjectOptions; use rustfs_ecstore::store_api::ObjectToDelete; use rustfs_ecstore::store_api::PutObjReader; use rustfs_ecstore::store_api::StorageAPI; +use rustfs_filemeta::fileinfo::ObjectPartInfo; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING}; +use rustfs_kms::DataKey; +use rustfs_kms::service_manager::get_global_encryption_service; +use rustfs_kms::types::{EncryptionMetadata, ObjectEncryptionContext}; use rustfs_notify::global::notifier_instance; use rustfs_policy::auth; use rustfs_policy::policy::action::Action; @@ -108,6 +112,7 @@ use std::sync::Arc; use std::sync::LazyLock; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; +use tokio::io::AsyncRead; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive; @@ -140,6 +145,199 @@ pub struct FS { // pub store: ECStore, } +struct ManagedEncryptionMaterial { + data_key: DataKey, + headers: HashMap, + kms_key_id: String, +} + +async fn create_managed_encryption_material( + bucket: &str, + key: &str, + algorithm: &ServerSideEncryption, + kms_key_id: Option, + original_size: i64, +) -> Result { + let Some(service) = get_global_encryption_service().await else { + return Err(ApiError::from(StorageError::other("KMS encryption service is not initialized"))); + }; + + if !is_managed_sse(algorithm) { + return Err(ApiError::from(StorageError::other(format!( + "Unsupported server-side encryption algorithm: {}", + algorithm.as_str() + )))); + } + + let algorithm_str = algorithm.as_str(); + + let mut context = ObjectEncryptionContext::new(bucket.to_string(), key.to_string()); + if original_size >= 0 { + context = context.with_size(original_size as u64); + } + + let mut kms_key_candidate = kms_key_id; + if kms_key_candidate.is_none() { + kms_key_candidate = service.get_default_key_id().cloned(); + } + + let kms_key_to_use = kms_key_candidate + .clone() + .ok_or_else(|| ApiError::from(StorageError::other("No KMS key available for managed server-side encryption")))?; + + let (data_key, encrypted_data_key) = service + .create_data_key(&kms_key_candidate, &context) + .await + .map_err(|e| ApiError::from(StorageError::other(format!("Failed to create data key: {}", e))))?; + + let metadata = EncryptionMetadata { + algorithm: algorithm_str.to_string(), + key_id: kms_key_to_use.clone(), + key_version: 1, + iv: data_key.nonce.to_vec(), + tag: None, + encryption_context: context.encryption_context.clone(), + encrypted_at: Utc::now(), + original_size: if original_size >= 0 { original_size as u64 } else { 0 }, + encrypted_data_key, + }; + + let mut headers = service.metadata_to_headers(&metadata); + headers.insert("x-rustfs-encryption-original-size".to_string(), metadata.original_size.to_string()); + + Ok(ManagedEncryptionMaterial { + data_key, + headers, + kms_key_id: kms_key_to_use, + }) +} + +async fn decrypt_managed_encryption_key( + bucket: &str, + key: &str, + metadata: &HashMap, +) -> Result)>, ApiError> { + if !metadata.contains_key("x-rustfs-encryption-key") { + return Ok(None); + } + + let Some(service) = get_global_encryption_service().await else { + return Err(ApiError::from(StorageError::other("KMS encryption service is not initialized"))); + }; + + let parsed = service + .headers_to_metadata(metadata) + .map_err(|e| ApiError::from(StorageError::other(format!("Failed to parse encryption metadata: {}", e))))?; + + if parsed.iv.len() != 12 { + return Err(ApiError::from(StorageError::other("Invalid encryption nonce length; expected 12 bytes"))); + } + + let context = ObjectEncryptionContext::new(bucket.to_string(), key.to_string()); + let data_key = service + .decrypt_data_key(&parsed.encrypted_data_key, &context) + .await + .map_err(|e| ApiError::from(StorageError::other(format!("Failed to decrypt data key: {}", e))))?; + + let key_bytes = data_key.plaintext_key; + let mut nonce = [0u8; 12]; + nonce.copy_from_slice(&parsed.iv[..12]); + + let original_size = metadata + .get("x-rustfs-encryption-original-size") + .and_then(|s| s.parse::().ok()); + + Ok(Some((key_bytes, nonce, original_size))) +} + +fn derive_part_nonce(base: [u8; 12], part_number: usize) -> [u8; 12] { + let mut nonce = base; + let current = u32::from_be_bytes([nonce[8], nonce[9], nonce[10], nonce[11]]); + let incremented = current.wrapping_add(part_number as u32); + nonce[8..12].copy_from_slice(&incremented.to_be_bytes()); + nonce +} + +struct InMemoryAsyncReader { + cursor: std::io::Cursor>, +} + +impl InMemoryAsyncReader { + fn new(data: Vec) -> Self { + Self { + cursor: std::io::Cursor::new(data), + } + } +} + +impl AsyncRead for InMemoryAsyncReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let unfilled = buf.initialize_unfilled(); + let bytes_read = std::io::Read::read(&mut self.cursor, unfilled)?; + buf.advance(bytes_read); + std::task::Poll::Ready(Ok(())) + } +} + +async fn decrypt_multipart_managed_stream( + mut encrypted_stream: Box, + parts: &[ObjectPartInfo], + key_bytes: [u8; 32], + base_nonce: [u8; 12], +) -> Result<(Box, i64), StorageError> { + let total_plain_capacity: usize = parts.iter().map(|part| part.actual_size.max(0) as usize).sum(); + + let mut plaintext = Vec::with_capacity(total_plain_capacity); + + for part in parts { + if part.size == 0 { + continue; + } + + let mut encrypted_part = vec![0u8; part.size]; + tokio::io::AsyncReadExt::read_exact(&mut encrypted_stream, &mut encrypted_part) + .await + .map_err(|e| StorageError::other(format!("failed to read encrypted multipart segment {}: {}", part.number, e)))?; + + let part_nonce = derive_part_nonce(base_nonce, part.number); + let cursor = std::io::Cursor::new(encrypted_part); + let mut decrypt_reader = DecryptReader::new(WarpReader::new(cursor), key_bytes, part_nonce); + + tokio::io::AsyncReadExt::read_to_end(&mut decrypt_reader, &mut plaintext) + .await + .map_err(|e| StorageError::other(format!("failed to decrypt multipart segment {}: {}", part.number, e)))?; + } + + let total_plain_size = plaintext.len() as i64; + let reader = Box::new(WarpReader::new(InMemoryAsyncReader::new(plaintext))) as Box; + + Ok((reader, total_plain_size)) +} + +fn strip_managed_encryption_metadata(metadata: &mut HashMap) { + const KEYS: [&str; 7] = [ + "x-amz-server-side-encryption", + "x-amz-server-side-encryption-aws-kms-key-id", + "x-rustfs-encryption-iv", + "x-rustfs-encryption-tag", + "x-rustfs-encryption-key", + "x-rustfs-encryption-context", + "x-rustfs-encryption-original-size", + ]; + + for key in KEYS.iter() { + metadata.remove(*key); + } +} + +fn is_managed_sse(algorithm: &ServerSideEncryption) -> bool { + matches!(algorithm.as_str(), "AES256" | "aws:kms") +} + impl FS { pub fn new() -> Self { // let store: ECStore = ECStore::new(address, endpoint_pools).await?; @@ -367,6 +565,8 @@ impl S3 for FS { copy_source, bucket, key, + server_side_encryption: requested_sse, + ssekms_key_id: requested_kms_key_id, .. } = req.input.clone(); let (src_bucket, src_key, version_id) = match copy_source { @@ -405,6 +605,30 @@ impl S3 for FS { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; + let bucket_sse_config = metadata_sys::get_sse_config(&bucket).await.ok(); + let effective_sse = requested_sse.or_else(|| { + bucket_sse_config.as_ref().and_then(|(config, _)| { + config.rules.first().and_then(|rule| { + rule.apply_server_side_encryption_by_default + .as_ref() + .and_then(|sse| match sse.sse_algorithm.as_str() { + "AES256" => Some(ServerSideEncryption::from_static(ServerSideEncryption::AES256)), + "aws:kms" => Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS)), + _ => None, + }) + }) + }) + }); + let mut effective_kms_key_id = requested_kms_key_id.or_else(|| { + bucket_sse_config.as_ref().and_then(|(config, _)| { + config.rules.first().and_then(|rule| { + rule.apply_server_side_encryption_by_default + .as_ref() + .and_then(|sse| sse.kms_master_key_id.clone()) + }) + }) + }); + let h = HeaderMap::new(); let gr = store @@ -420,6 +644,17 @@ impl S3 for FS { let mut reader: Box = Box::new(WarpReader::new(gr.stream)); + if let Some((key_bytes, nonce, original_size_opt)) = + decrypt_managed_encryption_key(&src_bucket, &src_key, &src_info.user_defined).await? + { + reader = Box::new(DecryptReader::new(reader, key_bytes, nonce)); + if let Some(original) = original_size_opt { + src_info.actual_size = original; + } + } + + strip_managed_encryption_metadata(&mut src_info.user_defined); + let actual_size = src_info.get_actual_size().map_err(ApiError::from)?; let mut length = actual_size; @@ -451,9 +686,31 @@ impl S3 for FS { .remove(&format!("{RESERVED_METADATA_PREFIX_LOWER}compression-size")); } - let hrd = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?; + let mut reader = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?; - src_info.put_object_reader = Some(PutObjReader::new(hrd)); + if let Some(ref sse_alg) = effective_sse { + if is_managed_sse(sse_alg) { + let material = + create_managed_encryption_material(&bucket, &key, sse_alg, effective_kms_key_id.clone(), actual_size).await?; + + let ManagedEncryptionMaterial { + data_key, + headers, + kms_key_id: kms_key_used, + } = material; + + let key_bytes = data_key.plaintext_key; + let nonce = data_key.nonce; + + src_info.user_defined.extend(headers.into_iter()); + effective_kms_key_id = Some(kms_key_used.clone()); + + let encrypt_reader = EncryptReader::new(reader, key_bytes, nonce); + reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?; + } + } + + src_info.put_object_reader = Some(PutObjReader::new(reader)); // check quota // TODO: src metadada @@ -479,6 +736,8 @@ impl S3 for FS { let output = CopyObjectOutput { copy_object_result: Some(copy_object_result), + server_side_encryption: effective_sse, + ssekms_key_id: effective_kms_key_id, ..Default::default() }; @@ -953,6 +1212,15 @@ impl S3 for FS { .map_err(ApiError::from)?; let info = reader.object_info; + tracing::debug!(object_size = info.size, part_count = info.parts.len(), "GET object metadata snapshot"); + for part in &info.parts { + tracing::debug!( + part_number = part.number, + part_size = part.size, + part_actual_size = part.actual_size, + "GET object part details" + ); + } let event_info = info.clone(); let content_type = { if let Some(content_type) = &info.content_type { @@ -993,6 +1261,8 @@ impl S3 for FS { let mut final_stream = reader.stream; let stored_sse_algorithm = info.user_defined.get("x-amz-server-side-encryption-customer-algorithm"); let stored_sse_key_md5 = info.user_defined.get("x-amz-server-side-encryption-customer-key-md5"); + let mut managed_encryption_applied = false; + let mut managed_original_size: Option = None; tracing::debug!( "GET object metadata check: stored_sse_algorithm={:?}, stored_sse_key_md5={:?}, provided_sse_key={:?}", @@ -1096,6 +1366,26 @@ impl S3 for FS { } } + if stored_sse_algorithm.is_none() { + if let Some((key_bytes, nonce, original_size)) = + decrypt_managed_encryption_key(&bucket, &key, &info.user_defined).await? + { + if info.parts.len() > 1 { + let (reader, plain_size) = decrypt_multipart_managed_stream(final_stream, &info.parts, key_bytes, nonce) + .await + .map_err(ApiError::from)?; + final_stream = reader; + managed_original_size = Some(plain_size); + } else { + let warp_reader = WarpReader::new(final_stream); + let decrypt_reader = DecryptReader::new(warp_reader, key_bytes, nonce); + final_stream = Box::new(decrypt_reader); + managed_original_size = original_size; + } + managed_encryption_applied = true; + } + } + // For SSE-C encrypted objects, use the original size instead of encrypted size let response_content_length = if stored_sse_algorithm.is_some() { if let Some(original_size_str) = info.user_defined.get("x-amz-server-side-encryption-customer-original-size") { @@ -1110,21 +1400,23 @@ impl S3 for FS { tracing::debug!("SSE-C decryption: no original size found, using content_length {}", content_length); content_length } + } else if managed_encryption_applied { + managed_original_size.unwrap_or(content_length) } else { content_length }; tracing::info!("Final response_content_length: {}", response_content_length); - if stored_sse_algorithm.is_some() { + if stored_sse_algorithm.is_some() || managed_encryption_applied { let limit_reader = HardLimitReader::new(Box::new(WarpReader::new(final_stream)), response_content_length); final_stream = Box::new(limit_reader); } // For SSE-C encrypted objects, don't use bytes_stream to limit the stream // because DecryptReader needs to read all encrypted data to produce decrypted output - let body = if stored_sse_algorithm.is_some() { - tracing::info!("SSE-C: Using unlimited stream for decryption"); + let body = if stored_sse_algorithm.is_some() || managed_encryption_applied { + tracing::info!("Managed SSE: Using unlimited stream for decryption"); Some(StreamingBlob::wrap(ReaderStream::with_capacity(final_stream, DEFAULT_READ_BUFFER_SIZE))) } else { Some(StreamingBlob::wrap(bytes_stream( @@ -1273,7 +1565,17 @@ impl S3 for FS { let content_length = info.get_actual_size().map_err(ApiError::from)?; - let metadata = info.user_defined; + let metadata_map = info.user_defined.clone(); + let server_side_encryption = metadata_map + .get("x-amz-server-side-encryption") + .map(|v| ServerSideEncryption::from(v.clone())); + let sse_customer_algorithm = metadata_map + .get("x-amz-server-side-encryption-customer-algorithm") + .map(|v| SSECustomerAlgorithm::from(v.clone())); + let sse_customer_key_md5 = metadata_map.get("x-amz-server-side-encryption-customer-key-md5").cloned(); + let ssekms_key_id = metadata_map.get("x-amz-server-side-encryption-aws-kms-key-id").cloned(); + + let metadata = metadata_map; let output = HeadObjectOutput { content_length: Some(content_length), @@ -1282,6 +1584,10 @@ impl S3 for FS { e_tag: info.etag, metadata: Some(metadata), version_id: info.version_id.map(|v| v.to_string()), + server_side_encryption, + sse_customer_algorithm, + sse_customer_key_md5, + ssekms_key_id, // metadata: object_metadata, ..Default::default() }; @@ -1630,8 +1936,7 @@ impl S3 for FS { }); tracing::debug!("TDD: effective_sse={:?} (original={:?})", effective_sse, original_sse); - let _original_kms_key_id = ssekms_key_id.clone(); - let effective_kms_key_id = ssekms_key_id.or_else(|| { + let mut effective_kms_key_id = ssekms_key_id.or_else(|| { bucket_sse_config.as_ref().and_then(|(config, _timestamp)| { config.rules.first().and_then(|rule| { rule.apply_server_side_encryption_by_default @@ -1650,9 +1955,6 @@ impl S3 for FS { } // TDD: Store effective SSE information in metadata for GET responses - if let Some(sse) = &effective_sse { - metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string()); - } if let Some(sse_alg) = &sse_customer_algorithm { metadata.insert( "x-amz-server-side-encryption-customer-algorithm".to_string(), @@ -1662,6 +1964,10 @@ impl S3 for FS { if let Some(sse_md5) = &sse_customer_key_md5 { metadata.insert("x-amz-server-side-encryption-customer-key-md5".to_string(), sse_md5.clone()); } + if let Some(sse) = &effective_sse { + metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string()); + } + if let Some(kms_key_id) = &effective_kms_key_id { metadata.insert("x-amz-server-side-encryption-aws-kms-key-id".to_string(), kms_key_id.clone()); } @@ -1728,6 +2034,32 @@ impl S3 for FS { reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?; } + // Apply managed SSE (SSE-S3 or SSE-KMS) when requested + if sse_customer_algorithm.is_none() { + if let Some(sse_alg) = &effective_sse { + if is_managed_sse(sse_alg) { + let material = + create_managed_encryption_material(&bucket, &key, sse_alg, effective_kms_key_id.clone(), actual_size) + .await?; + + let ManagedEncryptionMaterial { + data_key, + headers, + kms_key_id: kms_key_used, + } = material; + + let key_bytes = data_key.plaintext_key; + let nonce = data_key.nonce; + + metadata.extend(headers); + effective_kms_key_id = Some(kms_key_used.clone()); + + let encrypt_reader = EncryptReader::new(reader, key_bytes, nonce); + reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?; + } + } + } + let mut reader = PutObjReader::new(reader); let mt = metadata.clone(); @@ -1860,7 +2192,7 @@ impl S3 for FS { tracing::debug!("TDD: effective_sse for multipart={:?} (original={:?})", effective_sse, original_sse); let _original_kms_key_id = ssekms_key_id.clone(); - let effective_kms_key_id = ssekms_key_id.or_else(|| { + let mut effective_kms_key_id = ssekms_key_id.or_else(|| { bucket_sse_config.as_ref().and_then(|(config, _timestamp)| { config.rules.first().and_then(|rule| { rule.apply_server_side_encryption_by_default @@ -1871,9 +2203,6 @@ impl S3 for FS { }); // Store effective SSE information in metadata for multipart upload - if let Some(sse) = &effective_sse { - metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string()); - } if let Some(sse_alg) = &sse_customer_algorithm { metadata.insert( "x-amz-server-side-encryption-customer-algorithm".to_string(), @@ -1883,6 +2212,24 @@ impl S3 for FS { if let Some(sse_md5) = &sse_customer_key_md5 { metadata.insert("x-amz-server-side-encryption-customer-key-md5".to_string(), sse_md5.clone()); } + + if let Some(sse) = &effective_sse { + if is_managed_sse(sse) { + let material = create_managed_encryption_material(&bucket, &key, sse, effective_kms_key_id.clone(), 0).await?; + + let ManagedEncryptionMaterial { + data_key: _, + headers, + kms_key_id: kms_key_used, + } = material; + + metadata.extend(headers.into_iter()); + effective_kms_key_id = Some(kms_key_used.clone()); + } else { + metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string()); + } + } + if let Some(kms_key_id) = &effective_kms_key_id { metadata.insert("x-amz-server-side-encryption-aws-kms-key-id".to_string(), kms_key_id.clone()); } @@ -1961,41 +2308,84 @@ impl S3 for FS { // let upload_id = - let body = body.ok_or_else(|| s3_error!(IncompleteBody))?; - let size = match content_length { - Some(c) => c, - None => { - if let Some(val) = req.headers.get(AMZ_DECODED_CONTENT_LENGTH) { - match atoi::atoi::(val.as_bytes()) { - Some(x) => x, - None => return Err(s3_error!(UnexpectedContent)), - } - } else { - return Err(s3_error!(UnexpectedContent)); + let mut size = content_length; + let mut body_stream = body.ok_or_else(|| s3_error!(IncompleteBody))?; + + if size.is_none() { + if let Some(val) = req.headers.get(AMZ_DECODED_CONTENT_LENGTH) { + if let Some(x) = atoi::atoi::(val.as_bytes()) { + size = Some(x); } } - }; - let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string())))); + if size.is_none() { + let mut total = 0i64; + let mut buffer = bytes::BytesMut::new(); + while let Some(chunk) = body_stream.next().await { + let chunk = chunk.map_err(|e| ApiError::from(StorageError::other(e.to_string())))?; + total += chunk.len() as i64; + buffer.extend_from_slice(&chunk); + } - // mc cp step 4 + if total <= 0 { + return Err(s3_error!(UnexpectedContent)); + } - let opts = ObjectOptions::default(); + size = Some(total); + let combined = buffer.freeze(); + let stream = futures::stream::once(async move { Ok::(combined) }); + body_stream = StreamingBlob::wrap(stream); + } + } + // Get multipart info early to check if managed encryption will be applied let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; + let opts = ObjectOptions::default(); let fi = store .get_multipart_info(&bucket, &key, &upload_id, &opts) .await .map_err(ApiError::from)?; + // Check if managed encryption will be applied + let will_apply_managed_encryption = decrypt_managed_encryption_key(&bucket, &key, &fi.user_defined) + .await? + .is_some(); + + // If managed encryption will be applied and we have Content-Length, buffer the entire body + // This is necessary because encryption changes the data size, which causes Content-Length mismatches + if will_apply_managed_encryption && size.is_some() { + let mut total = 0i64; + let mut buffer = bytes::BytesMut::new(); + while let Some(chunk) = body_stream.next().await { + let chunk = chunk.map_err(|e| ApiError::from(StorageError::other(e.to_string())))?; + total += chunk.len() as i64; + buffer.extend_from_slice(&chunk); + } + + if total <= 0 { + return Err(s3_error!(UnexpectedContent)); + } + + size = Some(total); + let combined = buffer.freeze(); + let stream = futures::stream::once(async move { Ok::(combined) }); + body_stream = StreamingBlob::wrap(stream); + } + + let mut size = size.ok_or_else(|| s3_error!(UnexpectedContent))?; + + let body = StreamReader::new(body_stream.map(|f| f.map_err(|e| std::io::Error::other(e.to_string())))); + + // mc cp step 4 + let is_compressible = fi .user_defined .contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}compression").as_str()); - let reader: Box = Box::new(WarpReader::new(body)); + let mut reader: Box = Box::new(WarpReader::new(body)); let actual_size = size; @@ -2039,16 +2429,22 @@ impl S3 for FS { } */ - // TODO: md5 check - let reader = if is_compressible { + if is_compressible { let hrd = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?; let compress_reader = CompressReader::new(hrd, CompressionAlgorithm::default()); - Box::new(HashReader::new(Box::new(compress_reader), -1, actual_size, None, false).map_err(ApiError::from)?) - } else { - Box::new(HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?) - }; + reader = Box::new(compress_reader); + size = -1; + } - let mut reader = PutObjReader::new(*reader); + let mut reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?; + + if let Some((key_bytes, base_nonce, _)) = decrypt_managed_encryption_key(&bucket, &key, &fi.user_defined).await? { + let part_nonce = derive_part_nonce(base_nonce, part_id); + let encrypt_reader = EncryptReader::new(reader, key_bytes, part_nonce); + reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?; + } + + let mut reader = PutObjReader::new(reader); let info = store .put_object_part(&bucket, &key, &upload_id, part_id, &mut reader, &opts) @@ -2126,7 +2522,7 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let src_info = src_reader.object_info; + let mut src_info = src_reader.object_info; // Validate copy conditions (simplified for now) if let Some(if_match) = copy_source_if_match { @@ -2200,6 +2596,15 @@ impl S3 for FS { let mut reader: Box = Box::new(WarpReader::new(src_stream)); + if let Some((key_bytes, nonce, original_size_opt)) = + decrypt_managed_encryption_key(&src_bucket, &src_key, &src_info.user_defined).await? + { + reader = Box::new(DecryptReader::new(reader, key_bytes, nonce)); + if let Some(original) = original_size_opt { + src_info.actual_size = original; + } + } + let actual_size = length; let mut size = length; @@ -2209,8 +2614,14 @@ impl S3 for FS { size = -1; } - // TODO: md5 check - let reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?; + let mut reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?; + + if let Some((key_bytes, base_nonce, _)) = decrypt_managed_encryption_key(&bucket, &key, &mp_info.user_defined).await? { + let part_nonce = derive_part_nonce(base_nonce, part_id); + let encrypt_reader = EncryptReader::new(reader, key_bytes, part_nonce); + reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?; + } + let mut reader = PutObjReader::new(reader); // Set up destination options (inherit from multipart upload)