The APIs exported by the SSE module have been refactored.

This commit is contained in:
reatang
2026-01-12 01:29:09 +08:00
parent d6b9f9557f
commit 6bbf8b8650
3 changed files with 395 additions and 67 deletions

1
Cargo.lock generated
View File

@@ -7890,6 +7890,7 @@ dependencies = [
name = "rustfs"
version = "0.0.5"
dependencies = [
"aes-gcm 0.11.0-rc.2",
"astral-tokio-tar",
"async-trait",
"atoi",

View File

@@ -95,6 +95,7 @@ serde_urlencoded = { workspace = true }
rustls = { workspace = true }
subtle = { workspace = true }
rustls-pemfile = { workspace = true }
aes-gcm = { workspace = true }
# Time and Date
chrono = { workspace = true }
@@ -128,6 +129,7 @@ urlencoding = { workspace = true }
uuid = { workspace = true }
zip = { workspace = true }
libc = { workspace = true }
rand = { workspace = true }
# Observability and Metrics
metrics = { workspace = true }

View File

@@ -72,8 +72,13 @@
//! }
//! ```
use aes_gcm::{
Aes256Gcm, Key, Nonce,
aead::{Aead, KeyInit},
};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
use chrono::Utc;
use rand::RngCore;
use rustfs_ecstore::error::StorageError;
use rustfs_filemeta::ObjectPartInfo;
use rustfs_kms::{
@@ -235,9 +240,17 @@ pub async fn apply_encryption(request: EncryptionRequest<'_>) -> Result<Option<E
if let (Some(algorithm), Some(key), Some(key_md5)) =
(request.sse_customer_algorithm, request.sse_customer_key, request.sse_customer_key_md5)
{
return apply_ssec_encryption_material(request.bucket, request.key, algorithm, key, key_md5, request.content_size, request.part_number)
.await
.map(Some);
return apply_ssec_encryption_material(
request.bucket,
request.key,
algorithm,
key,
key_md5,
request.content_size,
request.part_number,
)
.await
.map(Some);
}
// Priority 2: Managed SSE (SSE-S3 or SSE-KMS)
@@ -292,7 +305,10 @@ pub async fn apply_encryption(request: EncryptionRequest<'_>) -> Result<Option<E
/// ```
pub async fn apply_decryption(request: DecryptionRequest<'_>) -> Result<Option<DecryptionMaterial>, ApiError> {
// Check for SSE-C encryption
if request.metadata.contains_key("x-amz-server-side-encryption-customer-algorithm") {
if request
.metadata
.contains_key("x-amz-server-side-encryption-customer-algorithm")
{
let (key, key_md5) = match (request.sse_customer_key, request.sse_customer_key_md5) {
(Some(k), Some(md5)) => (k, md5),
_ => {
@@ -349,14 +365,8 @@ async fn apply_ssec_encryption_material(
// Build metadata
let mut metadata = HashMap::new();
metadata.insert(
"x-amz-server-side-encryption-customer-algorithm".to_string(),
validated.algorithm.clone(),
);
metadata.insert(
"x-amz-server-side-encryption-customer-key-md5".to_string(),
validated.key_md5.clone(),
);
metadata.insert("x-amz-server-side-encryption-customer-algorithm".to_string(), validated.algorithm.clone());
metadata.insert("x-amz-server-side-encryption-customer-key-md5".to_string(), validated.key_md5.clone());
metadata.insert(
"x-amz-server-side-encryption-customer-original-size".to_string(),
content_size.to_string(),
@@ -483,7 +493,10 @@ async fn apply_managed_encryption_material(
};
let mut metadata = service.metadata_to_headers(&encryption_metadata);
metadata.insert("x-rustfs-encryption-original-size".to_string(), encryption_metadata.original_size.to_string());
metadata.insert(
"x-rustfs-encryption-original-size".to_string(),
encryption_metadata.original_size.to_string(),
);
// Handle part-specific nonce if needed
let nonce = if let Some(part_num) = part_number {
@@ -595,97 +608,162 @@ pub struct SsecParams {
}
// ============================================================================
// 测试用自定义SSE CMK的实现 (SSE-S3 / SSE-KMS) Definitions
// Test purpose, custom SSE CMK implementation (SSE-S3 / SSE-KMS) Definitions
// ============================================================================
// 定义一个通过 bucket 和 key 获取 SSE DEK 的特性
// Define a trait to get SSE DEK by bucket and key
trait SseDekProvider {
/// Generate an SSE DEK
async fn generate_sse_dek(
&self,
bucket: &str,
key: &str,
kms_key_id: &str,
) -> Result<(DataKey, Vec<u8>), ApiError>;
async fn generate_sse_dek(&self, bucket: &str, key: &str, kms_key_id: &str) -> Result<(DataKey, Vec<u8>), ApiError>;
/// Decrypt an SSE DEK (returns only plaintext key, nonce should be read from metadata)
async fn decrypt_sse_dek(
&self,
encrypted_dek: &[u8],
kms_key_id: &str,
) -> Result<[u8; 32], ApiError>;
async fn decrypt_sse_dek(&self, encrypted_dek: &[u8], kms_key_id: &str) -> Result<[u8; 32], ApiError>;
}
// 实现一个通过 bucket 和 keyobject key获取 SSE DEK 测试用途的实现
// Implement a simple SSE DEK provider for testing purposes
struct SimpleSseDekProvider {
cmk_ids: HashMap<String, String>,
cmk_ids: HashMap<String, [u8; 32]>,
}
// __RUSTFS_SSE_SIMPLE_CMK_ID 格式为:key-id1:key1,key-id2:key2,...
// __RUSTFS_SSE_SIMPLE_CMK_ID format: key-id1:base64_key1,key-id2:base64_key2,...
impl SimpleSseDekProvider {
pub fn new() -> Self {
let cmk_id = std::env::var("__RUSTFS_SSE_SIMPLE_CMK_ID").unwrap_or_default();
let cmk_ids = cmk_id.split(',').map(|s| s.split(':').collect()).collect();
let cmk_id = std::env::var("__RUSTFS_SSE_SIMPLE_CMK_ID").unwrap_or_else(|_| "".to_string());
let cmk_ids: HashMap<String, [u8; 32]> = cmk_id
.split(',')
.filter_map(|pair| {
let parts: Vec<&str> = pair.split(':').collect();
if parts.len() == 2 {
let key_name = parts[0];
match BASE64_STANDARD.decode(parts[1]) {
Ok(v) => {
let decoded_len = v.len();
match v.try_into() {
Ok(arr) => {
println!("✓ Successfully loaded CMK: {}", key_name);
Some((key_name.to_string(), arr))
}
Err(_) => {
eprintln!(
"✗ Failed to load CMK '{}': decoded key is not 32 bytes (got {} bytes)",
key_name, decoded_len
);
None
}
}
}
Err(e) => {
eprintln!("✗ Failed to load CMK '{}': invalid base64 encoding: {}", key_name, e);
None
}
}
} else {
eprintln!("✗ Invalid CMK format in '{}': expected 'name:base64_key'", pair);
None
}
})
.collect();
if cmk_ids.is_empty() {
eprintln!("⚠️ WARNING: No valid CMK keys loaded! All encryption operations will fail.");
}
Self { cmk_ids }
}
// 简单的加密DEK仅用于测试不做实际加密
fn encrypt_dek(dek: [u8; 32]) -> Vec<u8> {
let mut encrypted_dek = vec![0u8; 32];
encrypted_dek.copy_from_slice(&dek);
encrypted_dek
// Simple encryption of DEK
fn encrypt_dek(dek: [u8; 32], cmk_value: [u8; 32]) -> Result<String, ApiError> {
// Use AES-256-GCM to encrypt DEK
let key = Key::<Aes256Gcm>::try_from(cmk_value).map_err(|_| ApiError::from(StorageError::other("Invalid key length")))?;
let cipher = Aes256Gcm::new(&key);
let nonce = Nonce::from([0u8; 12]);
let ciphertext = cipher
.encrypt(&nonce, dek.as_slice())
.map_err(|_| ApiError::from(StorageError::other("Failed to encrypt DEK")))?;
// nonce:ciphertext
Ok(format!("{}:{}", BASE64_STANDARD.encode(nonce), BASE64_STANDARD.encode(ciphertext)))
}
// 简单的解密DEK仅用于测试不做实际解密
fn decrypt_dek(encrypted_dek: &[u8]) -> [u8; 32] {
let mut dek = [0u8; 32];
dek.copy_from_slice(encrypted_dek);
dek
// Simple decryption of DEK
fn decrypt_dek(encrypted_dek: &str, cmk_value: [u8; 32]) -> Result<[u8; 32], ApiError> {
let parts: Vec<&str> = encrypted_dek.split(':').collect();
if parts.len() != 2 {
return Err(ApiError::from(StorageError::other("Invalid encrypted DEK format")));
}
let nonce_vec = BASE64_STANDARD
.decode(parts[0])
.map_err(|_| ApiError::from(StorageError::other("Invalid nonce format")))?;
let ciphertext = BASE64_STANDARD
.decode(parts[1])
.map_err(|_| ApiError::from(StorageError::other("Invalid ciphertext format")))?;
let key = Key::<Aes256Gcm>::try_from(cmk_value).map_err(|_| ApiError::from(StorageError::other("Invalid key length")))?;
let cipher = Aes256Gcm::new(&key);
let nonce_array: [u8; 12] = nonce_vec
.try_into()
.map_err(|_| ApiError::from(StorageError::other("Invalid nonce length")))?;
let nonce = Nonce::from(nonce_array);
let plaintext = cipher
.decrypt(&nonce, ciphertext.as_slice())
.map_err(|e| ApiError::from(StorageError::other(format!("Failed to decrypt DEK: {e}"))))?;
let dek: [u8; 32] = plaintext
.try_into()
.map_err(|_| ApiError::from(StorageError::other("Decrypted DEK has invalid length")))?;
Ok(dek)
}
}
impl SseDekProvider for SimpleSseDekProvider {
async fn generate_sse_dek(&self, bucket: &str, key: &str, kms_key_id: &str) -> Result<(DataKey, Vec<u8>), ApiError> {
// 通过一个配置项获取 CMK ID
let _cmk_id = self
async fn generate_sse_dek(&self, _bucket: &str, _key: &str, kms_key_id: &str) -> Result<(DataKey, Vec<u8>), ApiError> {
let cmk_value = self
.cmk_ids
.get(kms_key_id)
.and_then(|s: &Vec<&str>| s.get(1).copied())
.ok_or_else(|| ApiError::from(StorageError::other(format!("CMK ID not found: {}", kms_key_id))))?;
.ok_or_else(|| ApiError::from(StorageError::other(format!("CMK ID not found: {}", kms_key_id))))?
.clone();
// 随机生成一个32字节的数组作为数据密钥
// Generate a 32-byte array as data key
let mut dek = [0u8; 32];
use rand::RngCore;
rand::thread_rng().fill_bytes(&mut dek);
rand::rng().fill_bytes(&mut dek);
// 随机生成一个12字节的数组作为IV
// Generate a 12-byte array as IV
let mut nonce = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce);
rand::rng().fill_bytes(&mut nonce);
// 加密数据密钥
let encrypted_dek = Self::encrypt_dek(dek);
// Encrypt data key
let encrypted_dek = Self::encrypt_dek(dek, cmk_value)?;
// 返回数据密钥和IV
// Return data key and IV
Ok((
DataKey {
plaintext_key: dek,
nonce,
},
encrypted_dek,
encrypted_dek.into_bytes(),
))
}
async fn decrypt_sse_dek(&self, encrypted_dek: &[u8], kms_key_id: &str) -> Result<[u8; 32], ApiError> {
// 通过一个配置项获取 CMK ID
let _cmk_id = self
// Verify CMK ID exists (for testing purposes)
let cmk_value = self
.cmk_ids
.get(kms_key_id)
.and_then(|s: &Vec<&str>| s.get(1).copied())
.ok_or_else(|| ApiError::from(StorageError::other(format!("CMK ID not found: {}", kms_key_id))))?;
.ok_or_else(|| ApiError::from(StorageError::other(format!("CMK ID not found: {}", kms_key_id))))?
.clone();
// 解密数据密钥
Ok(Self::decrypt_dek(encrypted_dek))
// Decrypt data key
let encrypted_dek_str = std::str::from_utf8(encrypted_dek)
.map_err(|_| ApiError::from(StorageError::other("Invalid UTF-8 in encrypted DEK")))?;
let dek = Self::decrypt_dek(encrypted_dek_str, cmk_value)?;
Ok(dek)
}
}
@@ -840,13 +918,13 @@ pub fn derive_part_nonce(base: [u8; 12], part_number: usize) -> [u8; 12] {
nonce
}
/// In-memory async reader for decrypted multipart data
pub(crate) struct InMemoryAsyncReader {
/// In-memory async reader for decrypted multipart data
pub struct InMemoryAsyncReader {
cursor: std::io::Cursor<Vec<u8>>,
}
impl InMemoryAsyncReader {
pub(crate) fn new(data: Vec<u8>) -> Self {
pub fn new(data: Vec<u8>) -> Self {
Self {
cursor: std::io::Cursor::new(data),
}
@@ -903,10 +981,10 @@ pub async fn decrypt_multipart_managed_stream(
}
let part_data = &encrypted_data[offset..offset + part_size];
let part_nonce = derive_part_nonce(base_nonce, part_info.part_number);
let part_nonce = derive_part_nonce(base_nonce, part_info.number);
let mut decrypted_part = Vec::with_capacity(part_size);
let cursor = std::io::Cursor::new(part_data);
let cursor = WarpReader::new(std::io::Cursor::new(part_data.to_vec()));
let decrypt_reader = DecryptReader::new(cursor, key_bytes, part_nonce);
let mut decrypt_reader = Box::pin(decrypt_reader);
@@ -918,7 +996,7 @@ pub async fn decrypt_multipart_managed_stream(
let all_decrypted = decrypted_parts.concat();
let total_size = all_decrypted.len() as i64;
let reader: Box<dyn Reader> = Box::new(InMemoryAsyncReader::new(all_decrypted));
let reader: Box<dyn Reader> = Box::new(WarpReader::new(std::io::Cursor::new(all_decrypted)));
Ok((reader, total_size))
}
@@ -1173,4 +1251,251 @@ mod tests {
let result = verify_ssec_key_match("provided_md5", None);
assert!(result.is_err());
}
// ============================================================================
// Integration Tests - Encrypt/Decrypt with SimpleSseDekProvider
// ============================================================================
#[tokio::test]
async fn test_simple_sse_dek_provider_encrypt_decrypt() {
use std::io::Cursor;
use tokio::io::AsyncReadExt;
// 1. Setup: Create SimpleSseDekProvider with test CMK
let provider = SimpleSseDekProvider::new();
// 2. Generate a data encryption key
let bucket = "test-bucket";
let key = "test-key";
let kms_key_id = "test-key"; // Must match the key in SimpleSseDekProvider::new()
let (data_key, _encrypted_dek) = provider
.generate_sse_dek(bucket, key, kms_key_id)
.await
.expect("Failed to generate DEK");
// 3. Prepare test data (明文)
let plaintext = b"Hello, World! This is a test message for encryption and decryption.";
println!("Original plaintext: {:?}", String::from_utf8_lossy(plaintext));
println!("Plaintext length: {} bytes", plaintext.len());
// 4. Encrypt with EncryptReader (wrap Cursor with WarpReader)
let plaintext_reader = WarpReader::new(Cursor::new(plaintext.to_vec()));
let mut encrypt_reader = EncryptReader::new(plaintext_reader, data_key.plaintext_key, data_key.nonce);
// Read encrypted data
let mut encrypted_data = Vec::new();
encrypt_reader
.read_to_end(&mut encrypted_data)
.await
.expect("Failed to read encrypted data");
println!("Encrypted data length: {} bytes", encrypted_data.len());
println!(
"First 16 bytes of encrypted data: {:02x?}",
&encrypted_data[..16.min(encrypted_data.len())]
);
// Verify encrypted data is different from plaintext
assert_ne!(
&encrypted_data[..plaintext.len()],
plaintext,
"Encrypted data should be different from plaintext"
);
// 5. Decrypt with DecryptReader (wrap Cursor with WarpReader)
let encrypted_reader = WarpReader::new(Cursor::new(encrypted_data));
let mut decrypt_reader = DecryptReader::new(encrypted_reader, data_key.plaintext_key, data_key.nonce);
// Read decrypted data
let mut decrypted_data = Vec::new();
decrypt_reader
.read_to_end(&mut decrypted_data)
.await
.expect("Failed to read decrypted data");
println!("Decrypted data: {:?}", String::from_utf8_lossy(&decrypted_data));
println!("Decrypted length: {} bytes", decrypted_data.len());
// 6. Verify decrypted data matches original plaintext
assert_eq!(decrypted_data, plaintext, "Decrypted data should match original plaintext");
println!("✅ Encryption/Decryption test passed!");
}
#[tokio::test]
async fn test_simple_sse_dek_provider_encrypt_decrypt_large_data() {
use std::io::Cursor;
use tokio::io::AsyncReadExt;
// Test with larger data to ensure streaming works correctly
let provider = SimpleSseDekProvider::new();
let bucket = "test-bucket";
let key = "test-key-large";
let kms_key_id = "test-key";
let (data_key, _encrypted_dek) = provider
.generate_sse_dek(bucket, key, kms_key_id)
.await
.expect("Failed to generate DEK");
// Create 1MB of test data
let plaintext_size = 1024 * 1024; // 1MB
let plaintext: Vec<u8> = (0..plaintext_size).map(|i| (i % 256) as u8).collect();
println!("Testing with {} bytes of data", plaintext.len());
// Encrypt (wrap with WarpReader)
let plaintext_reader = WarpReader::new(Cursor::new(plaintext.clone()));
let mut encrypt_reader = EncryptReader::new(plaintext_reader, data_key.plaintext_key, data_key.nonce);
let mut encrypted_data = Vec::new();
encrypt_reader
.read_to_end(&mut encrypted_data)
.await
.expect("Failed to encrypt large data");
println!("Encrypted {} bytes to {} bytes", plaintext.len(), encrypted_data.len());
// Decrypt (wrap with WarpReader)
let encrypted_reader = WarpReader::new(Cursor::new(encrypted_data));
let mut decrypt_reader = DecryptReader::new(encrypted_reader, data_key.plaintext_key, data_key.nonce);
let mut decrypted_data = Vec::new();
decrypt_reader
.read_to_end(&mut decrypted_data)
.await
.expect("Failed to decrypt large data");
// Verify
assert_eq!(decrypted_data.len(), plaintext.len(), "Decrypted size should match original");
assert_eq!(decrypted_data, plaintext, "Decrypted data should match original plaintext");
println!("✅ Large data encryption/decryption test passed!");
}
#[tokio::test]
async fn test_simple_sse_dek_provider_different_nonces() {
use std::io::Cursor;
use tokio::io::AsyncReadExt;
// Verify that different nonces produce different ciphertext
let provider = SimpleSseDekProvider::new();
let bucket = "test-bucket";
let key = "test-key";
let kms_key_id = "test-key";
// Generate two different keys (with different nonces)
let (data_key1, _) = provider
.generate_sse_dek(bucket, key, kms_key_id)
.await
.expect("Failed to generate DEK 1");
let (data_key2, _) = provider
.generate_sse_dek(bucket, key, kms_key_id)
.await
.expect("Failed to generate DEK 2");
// Verify nonces are different
assert_ne!(data_key1.nonce, data_key2.nonce, "Different keys should have different nonces");
// Same plaintext
let plaintext = b"Same plaintext";
// Encrypt with first key (wrap with WarpReader)
let reader1 = WarpReader::new(Cursor::new(plaintext.to_vec()));
let mut encrypt_reader1 = EncryptReader::new(reader1, data_key1.plaintext_key, data_key1.nonce);
let mut encrypted1 = Vec::new();
encrypt_reader1.read_to_end(&mut encrypted1).await.unwrap();
// Encrypt with second key (wrap with WarpReader)
let reader2 = WarpReader::new(Cursor::new(plaintext.to_vec()));
let mut encrypt_reader2 = EncryptReader::new(reader2, data_key2.plaintext_key, data_key2.nonce);
let mut encrypted2 = Vec::new();
encrypt_reader2.read_to_end(&mut encrypted2).await.unwrap();
// Verify ciphertexts are different (due to different nonces/keys)
assert_ne!(
encrypted1, encrypted2,
"Same plaintext with different nonces should produce different ciphertext"
);
println!("✅ Different nonces produce different ciphertext - test passed!");
}
#[tokio::test]
async fn test_simple_sse_dek_provider_decrypt_with_encrypted_dek() {
use std::io::Cursor;
use tokio::io::AsyncReadExt;
// Test the full cycle: generate -> encrypt DEK -> decrypt DEK -> use for data encryption
let provider = SimpleSseDekProvider::new();
let bucket = "test-bucket";
let key = "test-key";
let kms_key_id = "test-key";
// 1. Generate DEK and get encrypted DEK
let (data_key, encrypted_dek) = provider
.generate_sse_dek(bucket, key, kms_key_id)
.await
.expect("Failed to generate DEK");
let original_plaintext_key = data_key.plaintext_key;
let original_nonce = data_key.nonce;
// 2. Simulate storing encrypted_dek and nonce in metadata
// In real scenario, nonce would be stored separately in metadata
// 3. Later, decrypt the DEK
let decrypted_plaintext_key = provider
.decrypt_sse_dek(&encrypted_dek, kms_key_id)
.await
.expect("Failed to decrypt DEK");
// 4. Verify decrypted key matches original
assert_eq!(
decrypted_plaintext_key, original_plaintext_key,
"Decrypted DEK should match original plaintext key"
);
// 5. Use decrypted key to encrypt/decrypt data
let plaintext = b"Test data with decrypted DEK";
// Encrypt with original key (wrap with WarpReader)
let reader = WarpReader::new(Cursor::new(plaintext.to_vec()));
let mut encrypt_reader = EncryptReader::new(reader, original_plaintext_key, original_nonce);
let mut encrypted_data = Vec::new();
encrypt_reader.read_to_end(&mut encrypted_data).await.unwrap();
// Decrypt with recovered key (simulating GET operation) (wrap with WarpReader)
let reader = WarpReader::new(Cursor::new(encrypted_data));
let mut decrypt_reader = DecryptReader::new(
reader,
decrypted_plaintext_key,
original_nonce, // In real scenario, read from metadata
);
let mut decrypted_data = Vec::new();
decrypt_reader.read_to_end(&mut decrypted_data).await.unwrap();
// Verify
assert_eq!(decrypted_data, plaintext, "Data decrypted with recovered key should match original");
println!("✅ Full cycle (generate -> encrypt DEK -> decrypt DEK -> decrypt data) test passed!");
}
#[test]
fn test_encryption_type_enum() {
// Test EncryptionType enum
assert_eq!(EncryptionType::SseS3, EncryptionType::SseS3);
assert_eq!(EncryptionType::SseKms, EncryptionType::SseKms);
assert_eq!(EncryptionType::SseC, EncryptionType::SseC);
assert_ne!(EncryptionType::SseS3, EncryptionType::SseKms);
// Test Debug format
let debug_str = format!("{:?}", EncryptionType::SseKms);
assert!(debug_str.contains("SseKms"));
}
}