feat(encryption): add managed encryption support for SSE-S3 and SSE-KMS (#583)

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
guojidan
2025-09-24 02:09:04 -07:00
committed by GitHub
parent 29b0935be7
commit ef0dbaaeb5
5 changed files with 1007 additions and 55 deletions

View File

@@ -0,0 +1,386 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Integration tests that focus on surface headers/metadata emitted by the
//! managed encryption pipeline (SSE-S3/SSE-KMS).
use super::common::LocalKMSTestEnvironment;
use crate::common::{TEST_BUCKET, init_logging};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{
CompletedMultipartUpload, CompletedPart, ServerSideEncryption, ServerSideEncryptionByDefault,
ServerSideEncryptionConfiguration, ServerSideEncryptionRule,
};
use serial_test::serial;
use std::collections::{HashMap, VecDeque};
use tracing::info;
fn assert_encryption_metadata(metadata: &HashMap<String, String>, expected_size: usize) {
for key in [
"x-rustfs-encryption-key",
"x-rustfs-encryption-iv",
"x-rustfs-encryption-context",
"x-rustfs-encryption-original-size",
] {
assert!(metadata.contains_key(key), "expected managed encryption metadata '{}' to be present", key);
assert!(
!metadata.get(key).unwrap().is_empty(),
"managed encryption metadata '{}' should not be empty",
key
);
}
let size_value = metadata
.get("x-rustfs-encryption-original-size")
.expect("managed encryption metadata should include original size");
let parsed_size: usize = size_value
.parse()
.expect("x-rustfs-encryption-original-size should be numeric");
assert_eq!(parsed_size, expected_size, "recorded original size should match uploaded payload length");
}
fn assert_storage_encrypted(storage_root: &std::path::Path, bucket: &str, key: &str, plaintext: &[u8]) {
let mut stack = VecDeque::from([storage_root.to_path_buf()]);
let mut scanned = 0;
let mut plaintext_path: Option<std::path::PathBuf> = None;
while let Some(current) = stack.pop_front() {
let Ok(metadata) = std::fs::metadata(&current) else { continue };
if metadata.is_dir() {
if let Ok(entries) = std::fs::read_dir(&current) {
for entry in entries.flatten() {
stack.push_back(entry.path());
}
}
continue;
}
let path_str = current.to_string_lossy();
if !(path_str.contains(bucket) || path_str.contains(key)) {
continue;
}
scanned += 1;
let Ok(bytes) = std::fs::read(&current) else { continue };
if bytes.len() < plaintext.len() {
continue;
}
if bytes.windows(plaintext.len()).any(|window| window == plaintext) {
plaintext_path = Some(current);
break;
}
}
assert!(
scanned > 0,
"Failed to locate stored data files for bucket '{}' and key '{}' under {:?}",
bucket,
key,
storage_root
);
assert!(plaintext_path.is_none(), "Plaintext detected on disk at {:?}", plaintext_path.unwrap());
}
#[tokio::test]
#[serial]
async fn test_head_reports_managed_metadata_for_sse_s3() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("Validating SSE-S3 managed encryption metadata exposure");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key = kms_env.start_rustfs_for_local_kms().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let s3_client = kms_env.base_env.create_s3_client();
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
// Bucket level default SSE-S3 configuration.
let encryption_config = ServerSideEncryptionConfiguration::builder()
.rules(
ServerSideEncryptionRule::builder()
.apply_server_side_encryption_by_default(
ServerSideEncryptionByDefault::builder()
.sse_algorithm(ServerSideEncryption::Aes256)
.build()
.unwrap(),
)
.build(),
)
.build()
.unwrap();
s3_client
.put_bucket_encryption()
.bucket(TEST_BUCKET)
.server_side_encryption_configuration(encryption_config)
.send()
.await?;
let payload = b"metadata-sse-s3-payload";
let key = "metadata-sse-s3-object";
s3_client
.put_object()
.bucket(TEST_BUCKET)
.key(key)
.body(payload.to_vec().into())
.send()
.await?;
let head = s3_client.head_object().bucket(TEST_BUCKET).key(key).send().await?;
assert_eq!(
head.server_side_encryption(),
Some(&ServerSideEncryption::Aes256),
"head_object should advertise SSE-S3"
);
let metadata = head
.metadata()
.expect("head_object should return managed encryption metadata");
assert_encryption_metadata(metadata, payload.len());
assert_storage_encrypted(std::path::Path::new(&kms_env.base_env.temp_dir), TEST_BUCKET, key, payload);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_head_reports_managed_metadata_for_sse_kms_and_copy() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("Validating SSE-KMS managed encryption metadata (including copy)");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let default_key_id = kms_env.start_rustfs_for_local_kms().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let s3_client = kms_env.base_env.create_s3_client();
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
let encryption_config = ServerSideEncryptionConfiguration::builder()
.rules(
ServerSideEncryptionRule::builder()
.apply_server_side_encryption_by_default(
ServerSideEncryptionByDefault::builder()
.sse_algorithm(ServerSideEncryption::AwsKms)
.kms_master_key_id(&default_key_id)
.build()
.unwrap(),
)
.build(),
)
.build()
.unwrap();
s3_client
.put_bucket_encryption()
.bucket(TEST_BUCKET)
.server_side_encryption_configuration(encryption_config)
.send()
.await?;
let payload = b"metadata-sse-kms-payload";
let source_key = "metadata-sse-kms-object";
s3_client
.put_object()
.bucket(TEST_BUCKET)
.key(source_key)
.body(payload.to_vec().into())
.send()
.await?;
let head_source = s3_client.head_object().bucket(TEST_BUCKET).key(source_key).send().await?;
assert_eq!(
head_source.server_side_encryption(),
Some(&ServerSideEncryption::AwsKms),
"source object should report SSE-KMS"
);
assert_eq!(
head_source.ssekms_key_id().unwrap(),
&default_key_id,
"source object should maintain the configured KMS key id"
);
let source_metadata = head_source
.metadata()
.expect("source object should include managed encryption metadata");
assert_encryption_metadata(source_metadata, payload.len());
let dest_key = "metadata-sse-kms-object-copy";
let copy_source = format!("{}/{}", TEST_BUCKET, source_key);
s3_client
.copy_object()
.bucket(TEST_BUCKET)
.key(dest_key)
.copy_source(copy_source)
.send()
.await?;
let head_dest = s3_client.head_object().bucket(TEST_BUCKET).key(dest_key).send().await?;
assert_eq!(
head_dest.server_side_encryption(),
Some(&ServerSideEncryption::AwsKms),
"copied object should remain encrypted with SSE-KMS"
);
assert_eq!(
head_dest.ssekms_key_id().unwrap(),
&default_key_id,
"copied object should keep the default KMS key id"
);
let dest_metadata = head_dest
.metadata()
.expect("copied object should include managed encryption metadata");
assert_encryption_metadata(dest_metadata, payload.len());
let copied_body = s3_client
.get_object()
.bucket(TEST_BUCKET)
.key(dest_key)
.send()
.await?
.body
.collect()
.await?
.into_bytes();
assert_eq!(&copied_body[..], payload, "copied object payload should match source");
let storage_root = std::path::Path::new(&kms_env.base_env.temp_dir);
assert_storage_encrypted(storage_root, TEST_BUCKET, source_key, payload);
assert_storage_encrypted(storage_root, TEST_BUCKET, dest_key, payload);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_multipart_upload_writes_encrypted_data() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("Validating ciphertext persistence for multipart SSE-KMS uploads");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let default_key_id = kms_env.start_rustfs_for_local_kms().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let s3_client = kms_env.base_env.create_s3_client();
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
let encryption_config = ServerSideEncryptionConfiguration::builder()
.rules(
ServerSideEncryptionRule::builder()
.apply_server_side_encryption_by_default(
ServerSideEncryptionByDefault::builder()
.sse_algorithm(ServerSideEncryption::AwsKms)
.kms_master_key_id(&default_key_id)
.build()
.unwrap(),
)
.build(),
)
.build()
.unwrap();
s3_client
.put_bucket_encryption()
.bucket(TEST_BUCKET)
.server_side_encryption_configuration(encryption_config)
.send()
.await?;
let key = "multipart-encryption-object";
let part_size = 5 * 1024 * 1024; // minimum part size required by S3 semantics
let part_one = vec![0xA5; part_size];
let part_two = vec![0x5A; part_size];
let combined: Vec<u8> = part_one.iter().chain(part_two.iter()).copied().collect();
let create_output = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
.key(key)
.send()
.await?;
let upload_id = create_output.upload_id().unwrap();
let part1 = s3_client
.upload_part()
.bucket(TEST_BUCKET)
.key(key)
.upload_id(upload_id)
.part_number(1)
.body(ByteStream::from(part_one.clone()))
.send()
.await?;
let part2 = s3_client
.upload_part()
.bucket(TEST_BUCKET)
.key(key)
.upload_id(upload_id)
.part_number(2)
.body(ByteStream::from(part_two.clone()))
.send()
.await?;
let completed = CompletedMultipartUpload::builder()
.parts(CompletedPart::builder().part_number(1).e_tag(part1.e_tag().unwrap()).build())
.parts(CompletedPart::builder().part_number(2).e_tag(part2.e_tag().unwrap()).build())
.build();
s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
.key(key)
.upload_id(upload_id)
.multipart_upload(completed)
.send()
.await?;
let head = s3_client.head_object().bucket(TEST_BUCKET).key(key).send().await?;
assert_eq!(
head.server_side_encryption(),
Some(&ServerSideEncryption::AwsKms),
"multipart head_object should expose SSE-KMS"
);
assert_eq!(
head.ssekms_key_id().unwrap(),
&default_key_id,
"multipart object should retain bucket default KMS key"
);
assert_encryption_metadata(
head.metadata().expect("multipart head_object should expose managed metadata"),
combined.len(),
);
// Data returned to clients should decrypt back to original payload
let fetched = s3_client
.get_object()
.bucket(TEST_BUCKET)
.key(key)
.send()
.await?
.body
.collect()
.await?
.into_bytes();
assert_eq!(&fetched[..], &combined[..]);
assert_storage_encrypted(std::path::Path::new(&kms_env.base_env.temp_dir), TEST_BUCKET, key, &combined);
Ok(())
}

View File

@@ -44,3 +44,6 @@ mod test_runner;
#[cfg(test)]
mod bucket_default_encryption_test;
#[cfg(test)]
mod encryption_metadata_test;

View File

@@ -2143,6 +2143,7 @@ impl SetDisks {
where
W: AsyncWrite + Send + Sync + Unpin + 'static,
{
tracing::debug!(bucket, object, requested_length = length, offset, "get_object_with_fileinfo start");
let (disks, files) = Self::shuffle_disks_and_parts_metadata_by_index(disks, &files, &fi);
let total_size = fi.size as usize;
@@ -2160,28 +2161,46 @@ impl SetDisks {
let (part_index, mut part_offset) = fi.to_part_offset(offset)?;
// debug!(
// "get_object_with_fileinfo start offset:{}, part_index:{},part_offset:{}",
// offset, part_index, part_offset
// );
let mut end_offset = offset;
if length > 0 {
end_offset += length - 1
}
let (last_part_index, _) = fi.to_part_offset(end_offset)?;
let (last_part_index, last_part_relative_offset) = fi.to_part_offset(end_offset)?;
tracing::debug!(
bucket,
object,
offset,
length,
end_offset,
part_index,
last_part_index,
last_part_relative_offset,
"Multipart read bounds"
);
let erasure = erasure_coding::Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
let part_indices: Vec<usize> = (part_index..=last_part_index).collect();
tracing::debug!(bucket, object, ?part_indices, "Multipart part indices to stream");
let mut total_read = 0;
for i in part_index..=last_part_index {
for current_part in part_indices {
if total_read == length {
tracing::debug!(
bucket,
object,
total_read,
requested_length = length,
part_index = current_part,
"Stopping multipart stream early because accumulated bytes match request"
);
break;
}
let part_number = fi.parts[i].number;
let part_size = fi.parts[i].size;
let part_number = fi.parts[current_part].number;
let part_size = fi.parts[current_part].size;
let mut part_length = part_size - part_offset;
if part_length > (length - total_read) {
part_length = length - total_read
@@ -2191,6 +2210,21 @@ impl SetDisks {
let read_offset = (part_offset / erasure.block_size) * erasure.shard_size();
tracing::debug!(
bucket,
object,
part_index = current_part,
part_number,
part_offset,
part_size,
part_length,
read_offset,
till_offset,
total_read_before = total_read,
requested_length = length,
"Streaming multipart part"
);
let mut readers = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
for (idx, disk_op) in disks.iter().enumerate() {
@@ -2236,6 +2270,15 @@ impl SetDisks {
// part_number, part_offset, part_length, part_size
// );
let (written, err) = erasure.decode(writer, readers, part_offset, part_length, part_size).await;
tracing::debug!(
bucket,
object,
part_index = current_part,
part_number,
part_length,
bytes_written = written,
"Finished decoding multipart part"
);
if let Some(e) = err {
let de_err: DiskError = e.into();
let mut has_err = true;
@@ -2274,6 +2317,8 @@ impl SetDisks {
// debug!("read end");
tracing::debug!(bucket, object, total_read, expected_length = length, "Multipart read finished");
Ok(())
}
@@ -3462,12 +3507,13 @@ impl ObjectIO for SetDisks {
// let _guard_to_hold = _read_lock_guard; // moved into closure below
tokio::spawn(async move {
// let _guard = _guard_to_hold; // keep guard alive until task ends
let mut writer = wd;
if let Err(e) = Self::get_object_with_fileinfo(
&bucket,
&object,
offset,
length,
&mut Box::new(wd),
&mut writer,
fi,
files,
&disks,
@@ -5377,6 +5423,7 @@ impl StorageAPI for SetDisks {
}
let ext_part = &curr_fi.parts[i];
tracing::info!(target:"rustfs_ecstore::set_disk", part_number = p.part_num, part_size = ext_part.size, part_actual_size = ext_part.actual_size, "Completing multipart part");
if p.etag != Some(ext_part.etag.clone()) {
error!(
@@ -5436,6 +5483,9 @@ impl StorageAPI for SetDisks {
fi.metadata
.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}actual-size"), object_actual_size.to_string());
fi.metadata
.insert("x-rustfs-encryption-original-size".to_string(), object_actual_size.to_string());
if fi.is_compressed() {
fi.metadata
.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression-size"), object_size.to_string());

View File

@@ -23,6 +23,7 @@ use rustfs_utils::{put_uvarint, put_uvarint_len};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tracing::debug;
pin_project! {
/// A reader wrapper that encrypts data on the fly using AES-256-GCM.
@@ -119,7 +120,7 @@ where
header[5] = ((crc >> 8) & 0xFF) as u8;
header[6] = ((crc >> 16) & 0xFF) as u8;
header[7] = ((crc >> 24) & 0xFF) as u8;
println!(
debug!(
"encrypt block header typ=0 len={} header={:?} plaintext_len={} ciphertext_len={}",
clen,
header,
@@ -184,7 +185,10 @@ pin_project! {
#[pin]
pub inner: R,
key: [u8; 32], // AES-256-GCM key
nonce: [u8; 12], // 96-bit nonce for GCM
base_nonce: [u8; 12], // Base nonce recorded in object metadata
current_nonce: [u8; 12], // Active nonce for the current encrypted segment
multipart_mode: bool,
current_part: usize,
buffer: Vec<u8>,
buffer_pos: usize,
finished: bool,
@@ -206,7 +210,35 @@ where
Self {
inner,
key,
nonce,
base_nonce: nonce,
current_nonce: nonce,
multipart_mode: false,
current_part: 0,
buffer: Vec::new(),
buffer_pos: 0,
finished: false,
header_buf: [0u8; 8],
header_read: 0,
header_done: false,
ciphertext_buf: None,
ciphertext_read: 0,
ciphertext_len: 0,
}
}
pub fn new_multipart(inner: R, key: [u8; 32], base_nonce: [u8; 12]) -> Self {
let first_part = 1;
let initial_nonce = derive_part_nonce(&base_nonce, first_part);
debug!("decrypt_reader: initialized multipart mode");
Self {
inner,
key,
base_nonce,
current_nonce: initial_nonce,
multipart_mode: true,
current_part: first_part,
buffer: Vec::new(),
buffer_pos: 0,
finished: false,
@@ -287,7 +319,23 @@ where
*this.header_done = false;
if typ == 0xFF {
if *this.multipart_mode {
debug!(
next_part = *this.current_part + 1,
"decrypt_reader: reached segment terminator, advancing to next part"
);
*this.current_part += 1;
*this.current_nonce = derive_part_nonce(this.base_nonce, *this.current_part);
this.ciphertext_buf.take();
*this.ciphertext_read = 0;
*this.ciphertext_len = 0;
continue;
}
*this.finished = true;
this.ciphertext_buf.take();
*this.ciphertext_read = 0;
*this.ciphertext_len = 0;
continue;
}
@@ -342,11 +390,17 @@ where
let ciphertext = &ciphertext_buf[uvarint_len as usize..];
let cipher = Aes256Gcm::new_from_slice(this.key).expect("key");
let nonce = Nonce::from_slice(this.nonce);
let nonce = Nonce::from_slice(this.current_nonce);
let plaintext = cipher
.decrypt(nonce, ciphertext)
.map_err(|e| std::io::Error::other(format!("decrypt error: {e}")))?;
debug!(
part = *this.current_part,
plaintext_len = plaintext.len(),
"decrypt_reader: decrypted chunk"
);
if plaintext.len() != plaintext_len as usize {
this.ciphertext_buf.take();
*this.ciphertext_read = 0;
@@ -407,6 +461,16 @@ where
}
}
fn derive_part_nonce(base: &[u8; 12], part_number: usize) -> [u8; 12] {
let mut nonce = *base;
let mut suffix = [0u8; 4];
suffix.copy_from_slice(&nonce[8..12]);
let current = u32::from_be_bytes(suffix);
let next = current.wrapping_add(part_number as u32);
nonce[8..12].copy_from_slice(&next.to_be_bytes());
nonce
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
@@ -495,4 +559,42 @@ mod tests {
assert_eq!(&decrypted, &data);
}
#[tokio::test]
async fn test_decrypt_reader_multipart_segments() {
let mut key = [0u8; 32];
let mut base_nonce = [0u8; 12];
rand::rng().fill_bytes(&mut key);
rand::rng().fill_bytes(&mut base_nonce);
let part_one = vec![0xA5; 512 * 1024];
let part_two = vec![0x5A; 256 * 1024];
async fn encrypt_part(data: &[u8], key: [u8; 32], base_nonce: [u8; 12], part_number: usize) -> Vec<u8> {
let nonce = derive_part_nonce(&base_nonce, part_number);
let reader = BufReader::new(Cursor::new(data.to_vec()));
let mut encrypt_reader = EncryptReader::new(WarpReader::new(reader), key, nonce);
let mut encrypted = Vec::new();
encrypt_reader.read_to_end(&mut encrypted).await.unwrap();
encrypted
}
let encrypted_one = encrypt_part(&part_one, key, base_nonce, 1).await;
let encrypted_two = encrypt_part(&part_two, key, base_nonce, 2).await;
let mut combined = Vec::with_capacity(encrypted_one.len() + encrypted_two.len());
combined.extend_from_slice(&encrypted_one);
combined.extend_from_slice(&encrypted_two);
let reader = BufReader::new(Cursor::new(combined));
let mut decrypt_reader = DecryptReader::new_multipart(WarpReader::new(reader), key, base_nonce);
let mut decrypted = Vec::new();
decrypt_reader.read_to_end(&mut decrypted).await.unwrap();
let mut expected = Vec::with_capacity(part_one.len() + part_two.len());
expected.extend_from_slice(&part_one);
expected.extend_from_slice(&part_two);
assert_eq!(decrypted, expected);
}
}

View File

@@ -71,8 +71,12 @@ use rustfs_ecstore::store_api::ObjectOptions;
use rustfs_ecstore::store_api::ObjectToDelete;
use rustfs_ecstore::store_api::PutObjReader;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_filemeta::fileinfo::ObjectPartInfo;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_kms::DataKey;
use rustfs_kms::service_manager::get_global_encryption_service;
use rustfs_kms::types::{EncryptionMetadata, ObjectEncryptionContext};
use rustfs_notify::global::notifier_instance;
use rustfs_policy::auth;
use rustfs_policy::policy::action::Action;
@@ -108,6 +112,7 @@ use std::sync::Arc;
use std::sync::LazyLock;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
@@ -140,6 +145,199 @@ pub struct FS {
// pub store: ECStore,
}
struct ManagedEncryptionMaterial {
data_key: DataKey,
headers: HashMap<String, String>,
kms_key_id: String,
}
async fn create_managed_encryption_material(
bucket: &str,
key: &str,
algorithm: &ServerSideEncryption,
kms_key_id: Option<String>,
original_size: i64,
) -> Result<ManagedEncryptionMaterial, ApiError> {
let Some(service) = get_global_encryption_service().await else {
return Err(ApiError::from(StorageError::other("KMS encryption service is not initialized")));
};
if !is_managed_sse(algorithm) {
return Err(ApiError::from(StorageError::other(format!(
"Unsupported server-side encryption algorithm: {}",
algorithm.as_str()
))));
}
let algorithm_str = algorithm.as_str();
let mut context = ObjectEncryptionContext::new(bucket.to_string(), key.to_string());
if original_size >= 0 {
context = context.with_size(original_size as u64);
}
let mut kms_key_candidate = kms_key_id;
if kms_key_candidate.is_none() {
kms_key_candidate = service.get_default_key_id().cloned();
}
let kms_key_to_use = kms_key_candidate
.clone()
.ok_or_else(|| ApiError::from(StorageError::other("No KMS key available for managed server-side encryption")))?;
let (data_key, encrypted_data_key) = service
.create_data_key(&kms_key_candidate, &context)
.await
.map_err(|e| ApiError::from(StorageError::other(format!("Failed to create data key: {}", e))))?;
let metadata = EncryptionMetadata {
algorithm: algorithm_str.to_string(),
key_id: kms_key_to_use.clone(),
key_version: 1,
iv: data_key.nonce.to_vec(),
tag: None,
encryption_context: context.encryption_context.clone(),
encrypted_at: Utc::now(),
original_size: if original_size >= 0 { original_size as u64 } else { 0 },
encrypted_data_key,
};
let mut headers = service.metadata_to_headers(&metadata);
headers.insert("x-rustfs-encryption-original-size".to_string(), metadata.original_size.to_string());
Ok(ManagedEncryptionMaterial {
data_key,
headers,
kms_key_id: kms_key_to_use,
})
}
async fn decrypt_managed_encryption_key(
bucket: &str,
key: &str,
metadata: &HashMap<String, String>,
) -> Result<Option<([u8; 32], [u8; 12], Option<i64>)>, ApiError> {
if !metadata.contains_key("x-rustfs-encryption-key") {
return Ok(None);
}
let Some(service) = get_global_encryption_service().await else {
return Err(ApiError::from(StorageError::other("KMS encryption service is not initialized")));
};
let parsed = service
.headers_to_metadata(metadata)
.map_err(|e| ApiError::from(StorageError::other(format!("Failed to parse encryption metadata: {}", e))))?;
if parsed.iv.len() != 12 {
return Err(ApiError::from(StorageError::other("Invalid encryption nonce length; expected 12 bytes")));
}
let context = ObjectEncryptionContext::new(bucket.to_string(), key.to_string());
let data_key = service
.decrypt_data_key(&parsed.encrypted_data_key, &context)
.await
.map_err(|e| ApiError::from(StorageError::other(format!("Failed to decrypt data key: {}", e))))?;
let key_bytes = data_key.plaintext_key;
let mut nonce = [0u8; 12];
nonce.copy_from_slice(&parsed.iv[..12]);
let original_size = metadata
.get("x-rustfs-encryption-original-size")
.and_then(|s| s.parse::<i64>().ok());
Ok(Some((key_bytes, nonce, original_size)))
}
fn derive_part_nonce(base: [u8; 12], part_number: usize) -> [u8; 12] {
let mut nonce = base;
let current = u32::from_be_bytes([nonce[8], nonce[9], nonce[10], nonce[11]]);
let incremented = current.wrapping_add(part_number as u32);
nonce[8..12].copy_from_slice(&incremented.to_be_bytes());
nonce
}
struct InMemoryAsyncReader {
cursor: std::io::Cursor<Vec<u8>>,
}
impl InMemoryAsyncReader {
fn new(data: Vec<u8>) -> Self {
Self {
cursor: std::io::Cursor::new(data),
}
}
}
impl AsyncRead for InMemoryAsyncReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let unfilled = buf.initialize_unfilled();
let bytes_read = std::io::Read::read(&mut self.cursor, unfilled)?;
buf.advance(bytes_read);
std::task::Poll::Ready(Ok(()))
}
}
async fn decrypt_multipart_managed_stream(
mut encrypted_stream: Box<dyn AsyncRead + Unpin + Send + Sync>,
parts: &[ObjectPartInfo],
key_bytes: [u8; 32],
base_nonce: [u8; 12],
) -> Result<(Box<dyn Reader>, i64), StorageError> {
let total_plain_capacity: usize = parts.iter().map(|part| part.actual_size.max(0) as usize).sum();
let mut plaintext = Vec::with_capacity(total_plain_capacity);
for part in parts {
if part.size == 0 {
continue;
}
let mut encrypted_part = vec![0u8; part.size];
tokio::io::AsyncReadExt::read_exact(&mut encrypted_stream, &mut encrypted_part)
.await
.map_err(|e| StorageError::other(format!("failed to read encrypted multipart segment {}: {}", part.number, e)))?;
let part_nonce = derive_part_nonce(base_nonce, part.number);
let cursor = std::io::Cursor::new(encrypted_part);
let mut decrypt_reader = DecryptReader::new(WarpReader::new(cursor), key_bytes, part_nonce);
tokio::io::AsyncReadExt::read_to_end(&mut decrypt_reader, &mut plaintext)
.await
.map_err(|e| StorageError::other(format!("failed to decrypt multipart segment {}: {}", part.number, e)))?;
}
let total_plain_size = plaintext.len() as i64;
let reader = Box::new(WarpReader::new(InMemoryAsyncReader::new(plaintext))) as Box<dyn Reader>;
Ok((reader, total_plain_size))
}
fn strip_managed_encryption_metadata(metadata: &mut HashMap<String, String>) {
const KEYS: [&str; 7] = [
"x-amz-server-side-encryption",
"x-amz-server-side-encryption-aws-kms-key-id",
"x-rustfs-encryption-iv",
"x-rustfs-encryption-tag",
"x-rustfs-encryption-key",
"x-rustfs-encryption-context",
"x-rustfs-encryption-original-size",
];
for key in KEYS.iter() {
metadata.remove(*key);
}
}
fn is_managed_sse(algorithm: &ServerSideEncryption) -> bool {
matches!(algorithm.as_str(), "AES256" | "aws:kms")
}
impl FS {
pub fn new() -> Self {
// let store: ECStore = ECStore::new(address, endpoint_pools).await?;
@@ -367,6 +565,8 @@ impl S3 for FS {
copy_source,
bucket,
key,
server_side_encryption: requested_sse,
ssekms_key_id: requested_kms_key_id,
..
} = req.input.clone();
let (src_bucket, src_key, version_id) = match copy_source {
@@ -405,6 +605,30 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let bucket_sse_config = metadata_sys::get_sse_config(&bucket).await.ok();
let effective_sse = requested_sse.or_else(|| {
bucket_sse_config.as_ref().and_then(|(config, _)| {
config.rules.first().and_then(|rule| {
rule.apply_server_side_encryption_by_default
.as_ref()
.and_then(|sse| match sse.sse_algorithm.as_str() {
"AES256" => Some(ServerSideEncryption::from_static(ServerSideEncryption::AES256)),
"aws:kms" => Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS)),
_ => None,
})
})
})
});
let mut effective_kms_key_id = requested_kms_key_id.or_else(|| {
bucket_sse_config.as_ref().and_then(|(config, _)| {
config.rules.first().and_then(|rule| {
rule.apply_server_side_encryption_by_default
.as_ref()
.and_then(|sse| sse.kms_master_key_id.clone())
})
})
});
let h = HeaderMap::new();
let gr = store
@@ -420,6 +644,17 @@ impl S3 for FS {
let mut reader: Box<dyn Reader> = Box::new(WarpReader::new(gr.stream));
if let Some((key_bytes, nonce, original_size_opt)) =
decrypt_managed_encryption_key(&src_bucket, &src_key, &src_info.user_defined).await?
{
reader = Box::new(DecryptReader::new(reader, key_bytes, nonce));
if let Some(original) = original_size_opt {
src_info.actual_size = original;
}
}
strip_managed_encryption_metadata(&mut src_info.user_defined);
let actual_size = src_info.get_actual_size().map_err(ApiError::from)?;
let mut length = actual_size;
@@ -451,9 +686,31 @@ impl S3 for FS {
.remove(&format!("{RESERVED_METADATA_PREFIX_LOWER}compression-size"));
}
let hrd = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?;
let mut reader = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?;
src_info.put_object_reader = Some(PutObjReader::new(hrd));
if let Some(ref sse_alg) = effective_sse {
if is_managed_sse(sse_alg) {
let material =
create_managed_encryption_material(&bucket, &key, sse_alg, effective_kms_key_id.clone(), actual_size).await?;
let ManagedEncryptionMaterial {
data_key,
headers,
kms_key_id: kms_key_used,
} = material;
let key_bytes = data_key.plaintext_key;
let nonce = data_key.nonce;
src_info.user_defined.extend(headers.into_iter());
effective_kms_key_id = Some(kms_key_used.clone());
let encrypt_reader = EncryptReader::new(reader, key_bytes, nonce);
reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?;
}
}
src_info.put_object_reader = Some(PutObjReader::new(reader));
// check quota
// TODO: src metadada
@@ -479,6 +736,8 @@ impl S3 for FS {
let output = CopyObjectOutput {
copy_object_result: Some(copy_object_result),
server_side_encryption: effective_sse,
ssekms_key_id: effective_kms_key_id,
..Default::default()
};
@@ -953,6 +1212,15 @@ impl S3 for FS {
.map_err(ApiError::from)?;
let info = reader.object_info;
tracing::debug!(object_size = info.size, part_count = info.parts.len(), "GET object metadata snapshot");
for part in &info.parts {
tracing::debug!(
part_number = part.number,
part_size = part.size,
part_actual_size = part.actual_size,
"GET object part details"
);
}
let event_info = info.clone();
let content_type = {
if let Some(content_type) = &info.content_type {
@@ -993,6 +1261,8 @@ impl S3 for FS {
let mut final_stream = reader.stream;
let stored_sse_algorithm = info.user_defined.get("x-amz-server-side-encryption-customer-algorithm");
let stored_sse_key_md5 = info.user_defined.get("x-amz-server-side-encryption-customer-key-md5");
let mut managed_encryption_applied = false;
let mut managed_original_size: Option<i64> = None;
tracing::debug!(
"GET object metadata check: stored_sse_algorithm={:?}, stored_sse_key_md5={:?}, provided_sse_key={:?}",
@@ -1096,6 +1366,26 @@ impl S3 for FS {
}
}
if stored_sse_algorithm.is_none() {
if let Some((key_bytes, nonce, original_size)) =
decrypt_managed_encryption_key(&bucket, &key, &info.user_defined).await?
{
if info.parts.len() > 1 {
let (reader, plain_size) = decrypt_multipart_managed_stream(final_stream, &info.parts, key_bytes, nonce)
.await
.map_err(ApiError::from)?;
final_stream = reader;
managed_original_size = Some(plain_size);
} else {
let warp_reader = WarpReader::new(final_stream);
let decrypt_reader = DecryptReader::new(warp_reader, key_bytes, nonce);
final_stream = Box::new(decrypt_reader);
managed_original_size = original_size;
}
managed_encryption_applied = true;
}
}
// For SSE-C encrypted objects, use the original size instead of encrypted size
let response_content_length = if stored_sse_algorithm.is_some() {
if let Some(original_size_str) = info.user_defined.get("x-amz-server-side-encryption-customer-original-size") {
@@ -1110,21 +1400,23 @@ impl S3 for FS {
tracing::debug!("SSE-C decryption: no original size found, using content_length {}", content_length);
content_length
}
} else if managed_encryption_applied {
managed_original_size.unwrap_or(content_length)
} else {
content_length
};
tracing::info!("Final response_content_length: {}", response_content_length);
if stored_sse_algorithm.is_some() {
if stored_sse_algorithm.is_some() || managed_encryption_applied {
let limit_reader = HardLimitReader::new(Box::new(WarpReader::new(final_stream)), response_content_length);
final_stream = Box::new(limit_reader);
}
// For SSE-C encrypted objects, don't use bytes_stream to limit the stream
// because DecryptReader needs to read all encrypted data to produce decrypted output
let body = if stored_sse_algorithm.is_some() {
tracing::info!("SSE-C: Using unlimited stream for decryption");
let body = if stored_sse_algorithm.is_some() || managed_encryption_applied {
tracing::info!("Managed SSE: Using unlimited stream for decryption");
Some(StreamingBlob::wrap(ReaderStream::with_capacity(final_stream, DEFAULT_READ_BUFFER_SIZE)))
} else {
Some(StreamingBlob::wrap(bytes_stream(
@@ -1273,7 +1565,17 @@ impl S3 for FS {
let content_length = info.get_actual_size().map_err(ApiError::from)?;
let metadata = info.user_defined;
let metadata_map = info.user_defined.clone();
let server_side_encryption = metadata_map
.get("x-amz-server-side-encryption")
.map(|v| ServerSideEncryption::from(v.clone()));
let sse_customer_algorithm = metadata_map
.get("x-amz-server-side-encryption-customer-algorithm")
.map(|v| SSECustomerAlgorithm::from(v.clone()));
let sse_customer_key_md5 = metadata_map.get("x-amz-server-side-encryption-customer-key-md5").cloned();
let ssekms_key_id = metadata_map.get("x-amz-server-side-encryption-aws-kms-key-id").cloned();
let metadata = metadata_map;
let output = HeadObjectOutput {
content_length: Some(content_length),
@@ -1282,6 +1584,10 @@ impl S3 for FS {
e_tag: info.etag,
metadata: Some(metadata),
version_id: info.version_id.map(|v| v.to_string()),
server_side_encryption,
sse_customer_algorithm,
sse_customer_key_md5,
ssekms_key_id,
// metadata: object_metadata,
..Default::default()
};
@@ -1630,8 +1936,7 @@ impl S3 for FS {
});
tracing::debug!("TDD: effective_sse={:?} (original={:?})", effective_sse, original_sse);
let _original_kms_key_id = ssekms_key_id.clone();
let effective_kms_key_id = ssekms_key_id.or_else(|| {
let mut effective_kms_key_id = ssekms_key_id.or_else(|| {
bucket_sse_config.as_ref().and_then(|(config, _timestamp)| {
config.rules.first().and_then(|rule| {
rule.apply_server_side_encryption_by_default
@@ -1650,9 +1955,6 @@ impl S3 for FS {
}
// TDD: Store effective SSE information in metadata for GET responses
if let Some(sse) = &effective_sse {
metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string());
}
if let Some(sse_alg) = &sse_customer_algorithm {
metadata.insert(
"x-amz-server-side-encryption-customer-algorithm".to_string(),
@@ -1662,6 +1964,10 @@ impl S3 for FS {
if let Some(sse_md5) = &sse_customer_key_md5 {
metadata.insert("x-amz-server-side-encryption-customer-key-md5".to_string(), sse_md5.clone());
}
if let Some(sse) = &effective_sse {
metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string());
}
if let Some(kms_key_id) = &effective_kms_key_id {
metadata.insert("x-amz-server-side-encryption-aws-kms-key-id".to_string(), kms_key_id.clone());
}
@@ -1728,6 +2034,32 @@ impl S3 for FS {
reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?;
}
// Apply managed SSE (SSE-S3 or SSE-KMS) when requested
if sse_customer_algorithm.is_none() {
if let Some(sse_alg) = &effective_sse {
if is_managed_sse(sse_alg) {
let material =
create_managed_encryption_material(&bucket, &key, sse_alg, effective_kms_key_id.clone(), actual_size)
.await?;
let ManagedEncryptionMaterial {
data_key,
headers,
kms_key_id: kms_key_used,
} = material;
let key_bytes = data_key.plaintext_key;
let nonce = data_key.nonce;
metadata.extend(headers);
effective_kms_key_id = Some(kms_key_used.clone());
let encrypt_reader = EncryptReader::new(reader, key_bytes, nonce);
reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?;
}
}
}
let mut reader = PutObjReader::new(reader);
let mt = metadata.clone();
@@ -1860,7 +2192,7 @@ impl S3 for FS {
tracing::debug!("TDD: effective_sse for multipart={:?} (original={:?})", effective_sse, original_sse);
let _original_kms_key_id = ssekms_key_id.clone();
let effective_kms_key_id = ssekms_key_id.or_else(|| {
let mut effective_kms_key_id = ssekms_key_id.or_else(|| {
bucket_sse_config.as_ref().and_then(|(config, _timestamp)| {
config.rules.first().and_then(|rule| {
rule.apply_server_side_encryption_by_default
@@ -1871,9 +2203,6 @@ impl S3 for FS {
});
// Store effective SSE information in metadata for multipart upload
if let Some(sse) = &effective_sse {
metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string());
}
if let Some(sse_alg) = &sse_customer_algorithm {
metadata.insert(
"x-amz-server-side-encryption-customer-algorithm".to_string(),
@@ -1883,6 +2212,24 @@ impl S3 for FS {
if let Some(sse_md5) = &sse_customer_key_md5 {
metadata.insert("x-amz-server-side-encryption-customer-key-md5".to_string(), sse_md5.clone());
}
if let Some(sse) = &effective_sse {
if is_managed_sse(sse) {
let material = create_managed_encryption_material(&bucket, &key, sse, effective_kms_key_id.clone(), 0).await?;
let ManagedEncryptionMaterial {
data_key: _,
headers,
kms_key_id: kms_key_used,
} = material;
metadata.extend(headers.into_iter());
effective_kms_key_id = Some(kms_key_used.clone());
} else {
metadata.insert("x-amz-server-side-encryption".to_string(), sse.as_str().to_string());
}
}
if let Some(kms_key_id) = &effective_kms_key_id {
metadata.insert("x-amz-server-side-encryption-aws-kms-key-id".to_string(), kms_key_id.clone());
}
@@ -1961,41 +2308,84 @@ impl S3 for FS {
// let upload_id =
let body = body.ok_or_else(|| s3_error!(IncompleteBody))?;
let size = match content_length {
Some(c) => c,
None => {
if let Some(val) = req.headers.get(AMZ_DECODED_CONTENT_LENGTH) {
match atoi::atoi::<i64>(val.as_bytes()) {
Some(x) => x,
None => return Err(s3_error!(UnexpectedContent)),
}
} else {
return Err(s3_error!(UnexpectedContent));
let mut size = content_length;
let mut body_stream = body.ok_or_else(|| s3_error!(IncompleteBody))?;
if size.is_none() {
if let Some(val) = req.headers.get(AMZ_DECODED_CONTENT_LENGTH) {
if let Some(x) = atoi::atoi::<i64>(val.as_bytes()) {
size = Some(x);
}
}
};
let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string()))));
if size.is_none() {
let mut total = 0i64;
let mut buffer = bytes::BytesMut::new();
while let Some(chunk) = body_stream.next().await {
let chunk = chunk.map_err(|e| ApiError::from(StorageError::other(e.to_string())))?;
total += chunk.len() as i64;
buffer.extend_from_slice(&chunk);
}
// mc cp step 4
if total <= 0 {
return Err(s3_error!(UnexpectedContent));
}
let opts = ObjectOptions::default();
size = Some(total);
let combined = buffer.freeze();
let stream = futures::stream::once(async move { Ok::<Bytes, std::io::Error>(combined) });
body_stream = StreamingBlob::wrap(stream);
}
}
// Get multipart info early to check if managed encryption will be applied
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let opts = ObjectOptions::default();
let fi = store
.get_multipart_info(&bucket, &key, &upload_id, &opts)
.await
.map_err(ApiError::from)?;
// Check if managed encryption will be applied
let will_apply_managed_encryption = decrypt_managed_encryption_key(&bucket, &key, &fi.user_defined)
.await?
.is_some();
// If managed encryption will be applied and we have Content-Length, buffer the entire body
// This is necessary because encryption changes the data size, which causes Content-Length mismatches
if will_apply_managed_encryption && size.is_some() {
let mut total = 0i64;
let mut buffer = bytes::BytesMut::new();
while let Some(chunk) = body_stream.next().await {
let chunk = chunk.map_err(|e| ApiError::from(StorageError::other(e.to_string())))?;
total += chunk.len() as i64;
buffer.extend_from_slice(&chunk);
}
if total <= 0 {
return Err(s3_error!(UnexpectedContent));
}
size = Some(total);
let combined = buffer.freeze();
let stream = futures::stream::once(async move { Ok::<Bytes, std::io::Error>(combined) });
body_stream = StreamingBlob::wrap(stream);
}
let mut size = size.ok_or_else(|| s3_error!(UnexpectedContent))?;
let body = StreamReader::new(body_stream.map(|f| f.map_err(|e| std::io::Error::other(e.to_string()))));
// mc cp step 4
let is_compressible = fi
.user_defined
.contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}compression").as_str());
let reader: Box<dyn Reader> = Box::new(WarpReader::new(body));
let mut reader: Box<dyn Reader> = Box::new(WarpReader::new(body));
let actual_size = size;
@@ -2039,16 +2429,22 @@ impl S3 for FS {
}
*/
// TODO: md5 check
let reader = if is_compressible {
if is_compressible {
let hrd = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?;
let compress_reader = CompressReader::new(hrd, CompressionAlgorithm::default());
Box::new(HashReader::new(Box::new(compress_reader), -1, actual_size, None, false).map_err(ApiError::from)?)
} else {
Box::new(HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?)
};
reader = Box::new(compress_reader);
size = -1;
}
let mut reader = PutObjReader::new(*reader);
let mut reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?;
if let Some((key_bytes, base_nonce, _)) = decrypt_managed_encryption_key(&bucket, &key, &fi.user_defined).await? {
let part_nonce = derive_part_nonce(base_nonce, part_id);
let encrypt_reader = EncryptReader::new(reader, key_bytes, part_nonce);
reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?;
}
let mut reader = PutObjReader::new(reader);
let info = store
.put_object_part(&bucket, &key, &upload_id, part_id, &mut reader, &opts)
@@ -2126,7 +2522,7 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let src_info = src_reader.object_info;
let mut src_info = src_reader.object_info;
// Validate copy conditions (simplified for now)
if let Some(if_match) = copy_source_if_match {
@@ -2200,6 +2596,15 @@ impl S3 for FS {
let mut reader: Box<dyn Reader> = Box::new(WarpReader::new(src_stream));
if let Some((key_bytes, nonce, original_size_opt)) =
decrypt_managed_encryption_key(&src_bucket, &src_key, &src_info.user_defined).await?
{
reader = Box::new(DecryptReader::new(reader, key_bytes, nonce));
if let Some(original) = original_size_opt {
src_info.actual_size = original;
}
}
let actual_size = length;
let mut size = length;
@@ -2209,8 +2614,14 @@ impl S3 for FS {
size = -1;
}
// TODO: md5 check
let reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?;
let mut reader = HashReader::new(reader, size, actual_size, None, false).map_err(ApiError::from)?;
if let Some((key_bytes, base_nonce, _)) = decrypt_managed_encryption_key(&bucket, &key, &mp_info.user_defined).await? {
let part_nonce = derive_part_nonce(base_nonce, part_id);
let encrypt_reader = EncryptReader::new(reader, key_bytes, part_nonce);
reader = HashReader::new(Box::new(encrypt_reader), -1, actual_size, None, false).map_err(ApiError::from)?;
}
let mut reader = PutObjReader::new(reader);
// Set up destination options (inherit from multipart upload)