Implement SSE-KMS and SSE-S3 encryption mechanisms with comprehensive encryption and decryption functionalities. Added KMS client initialization and management, integrated AES-GCM and ChaCha20-Poly1305 for data encryption, and established metadata handling for encrypted objects. Enhanced error handling and included integration tests for encryption workflows.

This commit is contained in:
DamonXue
2025-05-11 22:41:20 +08:00
parent 93f1b5dbf1
commit 96de156763
19 changed files with 2657 additions and 31 deletions

View File

@@ -0,0 +1,22 @@
FROM vault:1.13
# Configure Vault for dev mode
ENV VAULT_DEV_ROOT_TOKEN_ID=rustfs-root-token
ENV VAULT_DEV_LISTEN_ADDRESS=0.0.0.0:8200
# Install curl for health checks
USER root
RUN apk add --no-cache curl jq
# Copy the Vault initialization script
COPY vault-init.sh /usr/local/bin/vault-init.sh
RUN chmod +x /usr/local/bin/vault-init.sh
# Switch back to vault user
USER vault
# Expose Vault port
EXPOSE 8200
# Start Vault in dev mode and run the initialization script
ENTRYPOINT ["sh", "-c", "vault server -dev & sleep 5 && vault-init.sh"]

View File

@@ -0,0 +1,26 @@
services:
rustyvault:
build:
context: ./.docker
dockerfile: Dockerfile.rustyvault
container_name: rustyvault
hostname: rustyvault
ports:
- "8200:8200" # Vault API port
volumes:
- vault-data:/vault/data
- vault-config:/vault/config
cap_add:
- IPC_LOCK # Allow the vault to lock sensitive data in memory
environment:
- VAULT_DEV_ROOT_TOKEN_ID=rustfs-root-token
- VAULT_ADDR=http://0.0.0.0:8200
healthcheck:
test: ["CMD", "curl", "-s", "http://127.0.0.1:8200/v1/sys/health"]
interval: 10s
timeout: 5s
retries: 3
networks:
default:
driver: bridge

71
.docker/vault-init.sh Normal file
View File

@@ -0,0 +1,71 @@
#!/bin/sh
# vault-init.sh - Initialize Vault for RustFS SSE-KMS
# Wait for Vault to start
until curl -s http://127.0.0.1:8200/v1/sys/health | grep "initialized" > /dev/null; do
echo "Waiting for Vault to start..."
sleep 1
done
# Set the Vault token
export VAULT_TOKEN="$VAULT_DEV_ROOT_TOKEN_ID"
export VAULT_ADDR="http://127.0.0.1:8200"
echo "Vault is running and initialized"
# Enable the Transit secrets engine (for encryption operations)
vault secrets enable transit
echo "Transit secrets engine enabled"
# Create a key for RustFS encryption
vault write -f transit/keys/rustfs-encryption-key
echo "Created rustfs-encryption-key"
# Create another key for RustFS with rotation capability
vault write -f transit/keys/rustfs-rotating-key
echo "Created rustfs-rotating-key"
# Set up key rotation policy
vault write transit/keys/rustfs-rotating-key/config auto_rotate_period="30d"
echo "Set up auto rotation for rustfs-rotating-key"
# Create a policy for RustFS to access these keys
cat > /tmp/rustfs-policy.hcl << EOF
# Policy for RustFS encryption operations
path "transit/encrypt/rustfs-encryption-key" {
capabilities = ["create", "update"]
}
path "transit/decrypt/rustfs-encryption-key" {
capabilities = ["create", "update"]
}
path "transit/encrypt/rustfs-rotating-key" {
capabilities = ["create", "update"]
}
path "transit/decrypt/rustfs-rotating-key" {
capabilities = ["create", "update"]
}
EOF
# Create the policy
vault policy write rustfs-encryption-policy /tmp/rustfs-policy.hcl
echo "Created rustfs-encryption-policy"
# Create a token for RustFS to use
RUSTFS_TOKEN=$(vault token create -policy=rustfs-encryption-policy -field=token)
echo "Created token for RustFS: $RUSTFS_TOKEN"
# Store the token for RustFS to use
echo "RUSTFS_KMS_VAULT_TOKEN=$RUSTFS_TOKEN" > /vault/config/rustfs-kms.env
echo "RUSTFS_KMS_VAULT_ENDPOINT=http://rustyvault:8200" >> /vault/config/rustfs-kms.env
echo "RUSTFS_KMS_VAULT_KEY_NAME=rustfs-encryption-key" >> /vault/config/rustfs-kms.env
echo "RustFS KMS configuration has been created"
echo "============================================"
echo "Vault is ready for use with RustFS SSE-KMS"
echo "============================================"
# Keep the container running
tail -f /dev/null

10
Cargo.lock generated
View File

@@ -1740,16 +1740,26 @@ version = "0.0.1"
dependencies = [
"aes-gcm",
"argon2",
"base64 0.22.1",
"cfg-if",
"chacha20poly1305",
"hex",
"http",
"jsonwebtoken",
"lazy_static",
"pbkdf2",
"rand 0.8.5",
"reqwest",
"ring",
"serde",
"serde_json",
"sha2 0.10.8",
"test-case",
"thiserror 2.0.12",
"time",
"tokio",
"tracing",
"uuid",
]
[[package]]

View File

@@ -12,15 +12,24 @@ workspace = true
[dependencies]
aes-gcm = { version = "0.10.3", features = ["std"], optional = true }
argon2 = { version = "0.5.3", features = ["std"], optional = true }
base64 = "0.22.1"
cfg-if = "1.0.0"
chacha20poly1305 = { version = "0.10.1", optional = true }
hex = "0.4.3"
http = { workspace = true }
jsonwebtoken = { workspace = true }
lazy_static = "1.4.0"
pbkdf2 = { version = "0.12.2", optional = true }
rand = { workspace = true, optional = true }
reqwest = { workspace = true, features = ["json"] }
ring = { version = "0.17.14", features = ["std"] }
sha2 = { version = "0.10.8", optional = true }
thiserror.workspace = true
serde = { workspace = true }
serde_json.workspace = true
tracing = { workspace = true }
tokio = { workspace = true, features = ["full"] }
uuid = { workspace = true }
[dev-dependencies]
test-case.workspace = true

View File

@@ -24,4 +24,39 @@ pub enum Error {
#[error("jwt err: {0}")]
ErrJwt(#[from] jsonwebtoken::errors::Error),
// SSE related errors
#[error("invalid SSE algorithm")]
ErrInvalidSSEAlgorithm,
#[error("missing SSE encryption key")]
ErrMissingSSEKey,
#[error("invalid SSE customer key")]
ErrInvalidSSECustomerKey,
#[error("SSE key MD5 mismatch")]
ErrSSEKeyMD5Mismatch,
#[error("invalid SSE encryption metadata")]
ErrInvalidEncryptionMetadata,
#[error("missing KMS configuration")]
ErrMissingKMSConfig,
#[error("KMS key ID configuration error: {0}")]
ErrKMSKeyConfiguration(String),
#[error("KMS error: {0}")]
ErrKMS(String),
#[error("invalid encrypted data format")]
ErrInvalidEncryptedDataFormat,
#[error("encrypted object key missing")]
ErrEncryptedObjectKeyMissing,
// Base64 decoding error
#[error("base64 decode error: {0}")]
ErrBase64DecodeError(#[from] base64::DecodeError),
}

View File

@@ -3,9 +3,57 @@
mod encdec;
mod error;
mod jwt;
mod metadata;
mod rusty_vault_client;
mod sse;
mod sse_c;
mod sse_s3;
mod sse_kms;
#[cfg(test)]
mod tests;
pub use encdec::decrypt::decrypt_data;
pub use encdec::encrypt::encrypt_data;
pub use error::Error;
pub use jwt::decode::decode as jwt_decode;
pub use jwt::encode::encode as jwt_encode;
// 导出SSE功能
pub use sse::{SSE, Algorithm, SSEOptions, Encryptable, get_default_kms_config, DefaultKMSConfig};
pub use sse::{init_kms, is_kms_initialized, get_kms_init_error};
pub use metadata::{EncryptionInfo, extract_encryption_metadata, remove_encryption_metadata};
pub use sse_c::SSECEncryption;
pub use sse_s3::{SSES3Encryption, init_master_key};
// KMS 功能导出
#[cfg(feature = "kms")]
pub use sse_kms::{KMSClient, SSEKMSEncryption};
/// Encryption factory: Create appropriate encryptor based on encryption type
pub struct CryptoFactory;
impl CryptoFactory {
/// Create encryptor
#[cfg(not(feature = "kms"))]
pub fn create_encryptor(sse_type: Option<SSE>) -> Box<dyn Encryptable> {
match sse_type {
Some(SSE::SSEC) => Box::new(SSECEncryption::new()),
Some(SSE::SSES3) => Box::new(SSES3Encryption::new()),
Some(SSE::SSEKMS) => Box::new(SSES3Encryption::new()), // Fall back to SSE-S3 when KMS is not enabled
None => Box::new(SSES3Encryption::new()), // Default to SSE-S3
}
}
#[cfg(feature = "kms")]
pub fn create_encryptor(sse_type: Option<SSE>) -> Result<Box<dyn Encryptable>, Error> {
match sse_type {
Some(SSE::SSEC) => Ok(Box::new(SSECEncryption::new())),
Some(SSE::SSES3) => Ok(Box::new(SSES3Encryption::new())),
Some(SSE::SSEKMS) => {
let kms = SSEKMSEncryption::new()?;
Ok(Box::new(kms))
},
None => Ok(Box::new(SSES3Encryption::new())), // Default to SSE-S3
}
}
}

316
crypto/src/metadata.rs Normal file
View File

@@ -0,0 +1,316 @@
// metadata.rs - Implementation of encryption metadata for RustFS
// This file handles the encryption metadata format used in the S3 object storage
use crate::{Error, sse::{SSE, Algorithm}};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use uuid::Uuid;
// Metadata header constants
pub const CRYPTO_IV: &str = "X-Rustfs-Crypto-Iv";
pub const CRYPTO_KEY: &str = "X-Rustfs-Crypto-Key";
pub const CRYPTO_KEY_ID: &str = "X-Rustfs-Crypto-Key-Id";
pub const CRYPTO_ALGORITHM: &str = "X-Rustfs-Crypto-Algorithm";
pub const CRYPTO_SEAL_ALGORITHM: &str = "X-Rustfs-Crypto-Seal-Algorithm";
pub const CRYPTO_KMS_KEY_NAME: &str = "X-Rustfs-Crypto-Kms-Key-Name";
pub const CRYPTO_KMS_CONTEXT: &str = "X-Rustfs-Crypto-Kms-Context";
pub const CRYPTO_META_PREFIX: &str = "X-Rustfs-Crypto-";
/// EncryptionInfo - contains information about encryption
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncryptionInfo {
/// Type of SSE encryption
pub sse_type: SSE,
/// Algorithm used for encryption
pub algorithm: Algorithm,
/// Initialization vector used for encryption
pub iv: Vec<u8>,
/// Encrypted data key (encrypted with master key)
pub key: Option<Vec<u8>>,
/// Key ID for KMS
pub key_id: Option<String>,
/// KMS context
pub context: Option<String>,
/// Storage class
pub storage_class: Option<String>,
/// Unique ID for this encryption operation
pub matdesc: String,
}
impl EncryptionInfo {
/// Create a new EncryptionInfo for SSE-C
pub fn new_ssec(iv: Vec<u8>) -> Self {
Self {
sse_type: SSE::SSEC,
algorithm: Algorithm::AES256,
iv,
key: None,
key_id: None,
context: None,
storage_class: None,
matdesc: Uuid::new_v4().to_string(),
}
}
/// Create a new EncryptionInfo for SSE-S3
pub fn new_sse_s3(iv: Vec<u8>, encrypted_key: Vec<u8>) -> Self {
Self {
sse_type: SSE::SSES3,
algorithm: Algorithm::AES256,
iv,
key: Some(encrypted_key),
key_id: None,
context: None,
storage_class: None,
matdesc: Uuid::new_v4().to_string(),
}
}
/// Create a new EncryptionInfo for SSE-KMS
pub fn new_sse_kms(iv: Vec<u8>, encrypted_key: Vec<u8>, key_id: String, context: Option<String>) -> Self {
Self {
sse_type: SSE::SSEKMS,
algorithm: Algorithm::AWSKMS,
iv,
key: Some(encrypted_key),
key_id: Some(key_id),
context,
storage_class: None,
matdesc: Uuid::new_v4().to_string(),
}
}
/// Convert encryption info to user defined metadata map
pub fn to_metadata(&self) -> HashMap<String, String> {
let mut metadata = HashMap::new();
// Common fields
metadata.insert(CRYPTO_ALGORITHM.to_string(), self.algorithm.to_string());
metadata.insert(CRYPTO_IV.to_string(), base64::encode(&self.iv));
// Type-specific fields
match self.sse_type {
SSE::SSEC => {
// For SSE-C, we don't store any key material
}
SSE::SSES3 => {
if let Some(key) = &self.key {
metadata.insert(CRYPTO_KEY.to_string(), base64::encode(key));
}
}
SSE::SSEKMS => {
if let Some(key) = &self.key {
metadata.insert(CRYPTO_KEY.to_string(), base64::encode(key));
}
if let Some(key_id) = &self.key_id {
metadata.insert(CRYPTO_KEY_ID.to_string(), key_id.clone());
}
if let Some(context) = &self.context {
metadata.insert(CRYPTO_KMS_CONTEXT.to_string(), context.clone());
}
}
}
metadata
}
/// Parse encryption info from user defined metadata
pub fn from_metadata(metadata: &HashMap<String, String>) -> Result<Option<Self>, Error> {
// Check if encryption metadata exists
if !metadata.contains_key(CRYPTO_ALGORITHM) || !metadata.contains_key(CRYPTO_IV) {
return Ok(None);
}
let algorithm_str = metadata.get(CRYPTO_ALGORITHM).unwrap();
let iv_base64 = metadata.get(CRYPTO_IV).unwrap();
let algorithm = match algorithm_str.as_str() {
"AES256" => Algorithm::AES256,
"aws:kms" => Algorithm::AWSKMS,
_ => return Err(Error::ErrInvalidSSEAlgorithm),
};
let iv = base64::decode(iv_base64).map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
// Determine SSE type and parse additional fields
let mut sse_type = SSE::SSES3; // Default assuming SSE-S3
let mut key = None;
let mut key_id = None;
let mut context = None;
if metadata.contains_key(CRYPTO_KEY) {
let key_base64 = metadata.get(CRYPTO_KEY).unwrap();
key = Some(base64::decode(key_base64).map_err(|_| Error::ErrInvalidEncryptionMetadata)?);
}
if metadata.contains_key(CRYPTO_KEY_ID) {
sse_type = SSE::SSEKMS;
key_id = metadata.get(CRYPTO_KEY_ID).cloned();
}
if metadata.contains_key(CRYPTO_KMS_CONTEXT) {
context = metadata.get(CRYPTO_KMS_CONTEXT).cloned();
}
// If no key is present but we have algorithm and IV, it's SSE-C
if key.is_none() && key_id.is_none() {
sse_type = SSE::SSEC;
}
Ok(Some(Self {
sse_type,
algorithm,
iv,
key,
key_id,
context,
storage_class: None,
matdesc: Uuid::new_v4().to_string(), // Generate a new ID for restored metadata
}))
}
/// Check if this encryption info is for SSE-C
pub fn is_ssec(&self) -> bool {
self.sse_type == SSE::SSEC
}
/// Check if this encryption info is for SSE-S3
pub fn is_sse_s3(&self) -> bool {
self.sse_type == SSE::SSES3
}
/// Check if this encryption info is for SSE-KMS
pub fn is_sse_kms(&self) -> bool {
self.sse_type == SSE::SSEKMS
}
}
/// Extract encryption related metadata from a general metadata map
pub fn extract_encryption_metadata(metadata: &HashMap<String, String>) -> HashMap<String, String> {
metadata
.iter()
.filter(|(k, _)| k.starts_with(CRYPTO_META_PREFIX))
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
/// Remove encryption metadata from general metadata
pub fn remove_encryption_metadata(metadata: &mut HashMap<String, String>) {
metadata.retain(|k, _| !k.starts_with(CRYPTO_META_PREFIX));
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encryption_info_to_metadata_ssec() {
let iv = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let info = EncryptionInfo::new_ssec(iv.clone());
let metadata = info.to_metadata();
assert_eq!(metadata.get(CRYPTO_ALGORITHM).unwrap(), "AES256");
assert_eq!(metadata.get(CRYPTO_IV).unwrap(), &base64::encode(&iv));
assert!(!metadata.contains_key(CRYPTO_KEY));
assert!(!metadata.contains_key(CRYPTO_KEY_ID));
}
#[test]
fn test_encryption_info_to_metadata_sse_s3() {
let iv = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let key = vec![11, 22, 33, 44, 55, 66, 77, 88];
let info = EncryptionInfo::new_sse_s3(iv.clone(), key.clone());
let metadata = info.to_metadata();
assert_eq!(metadata.get(CRYPTO_ALGORITHM).unwrap(), "AES256");
assert_eq!(metadata.get(CRYPTO_IV).unwrap(), &base64::encode(&iv));
assert_eq!(metadata.get(CRYPTO_KEY).unwrap(), &base64::encode(&key));
assert!(!metadata.contains_key(CRYPTO_KEY_ID));
}
#[test]
fn test_encryption_info_to_metadata_sse_kms() {
let iv = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let key = vec![11, 22, 33, 44, 55, 66, 77, 88];
let key_id = "my-kms-key-id";
let context = "my-context";
let info = EncryptionInfo::new_sse_kms(
iv.clone(),
key.clone(),
key_id.to_string(),
Some(context.to_string())
);
let metadata = info.to_metadata();
assert_eq!(metadata.get(CRYPTO_ALGORITHM).unwrap(), "aws:kms");
assert_eq!(metadata.get(CRYPTO_IV).unwrap(), &base64::encode(&iv));
assert_eq!(metadata.get(CRYPTO_KEY).unwrap(), &base64::encode(&key));
assert_eq!(metadata.get(CRYPTO_KEY_ID).unwrap(), key_id);
assert_eq!(metadata.get(CRYPTO_KMS_CONTEXT).unwrap(), context);
}
#[test]
fn test_encryption_info_from_metadata() {
let iv = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let key = vec![11, 22, 33, 44, 55, 66, 77, 88];
let key_id = "my-kms-key-id";
let context = "my-context";
let mut metadata = HashMap::new();
metadata.insert(CRYPTO_ALGORITHM.to_string(), "aws:kms".to_string());
metadata.insert(CRYPTO_IV.to_string(), base64::encode(&iv));
metadata.insert(CRYPTO_KEY.to_string(), base64::encode(&key));
metadata.insert(CRYPTO_KEY_ID.to_string(), key_id.to_string());
metadata.insert(CRYPTO_KMS_CONTEXT.to_string(), context.to_string());
let info = EncryptionInfo::from_metadata(&metadata).unwrap().unwrap();
assert_eq!(info.sse_type, SSE::SSEKMS);
assert_eq!(info.algorithm, Algorithm::AWSKMS);
assert_eq!(info.iv, iv);
assert_eq!(info.key.unwrap(), key);
assert_eq!(info.key_id.unwrap(), key_id);
assert_eq!(info.context.unwrap(), context);
}
#[test]
fn test_extract_encryption_metadata() {
let mut metadata = HashMap::new();
metadata.insert("X-Rustfs-Normal-Meta".to_string(), "value1".to_string());
metadata.insert(CRYPTO_ALGORITHM.to_string(), "AES256".to_string());
metadata.insert(CRYPTO_IV.to_string(), "iv_base64".to_string());
let extracted = extract_encryption_metadata(&metadata);
assert_eq!(extracted.len(), 2);
assert!(!extracted.contains_key("X-Rustfs-Normal-Meta"));
assert!(extracted.contains_key(CRYPTO_ALGORITHM));
assert!(extracted.contains_key(CRYPTO_IV));
}
#[test]
fn test_remove_encryption_metadata() {
let mut metadata = HashMap::new();
metadata.insert("X-Rustfs-Normal-Meta".to_string(), "value1".to_string());
metadata.insert(CRYPTO_ALGORITHM.to_string(), "AES256".to_string());
metadata.insert(CRYPTO_IV.to_string(), "iv_base64".to_string());
remove_encryption_metadata(&mut metadata);
assert_eq!(metadata.len(), 1);
assert!(metadata.contains_key("X-Rustfs-Normal-Meta"));
assert!(!metadata.contains_key(CRYPTO_ALGORITHM));
assert!(!metadata.contains_key(CRYPTO_IV));
}
}

View File

@@ -0,0 +1,257 @@
// rusty_vault_client.rs - Simple RustyVault API client implementation
// Based on https://github.com/Tongsuo-Project/RustyVault/blob/main/src/api/client.rs
use serde::{Deserialize, Serialize};
use serde_json::Map;
use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
// 定义 API 响应结构
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VaultResponse {
pub request_id: Option<String>,
pub lease_id: Option<String>,
pub renewable: Option<bool>,
pub lease_duration: Option<i64>,
pub data: Option<serde_json::Value>,
pub response_data: Option<serde_json::Map<String, serde_json::Value>>,
pub warnings: Option<Vec<String>>,
pub auth: Option<serde_json::Value>,
}
// 客户端错误类型
#[derive(Debug)]
pub enum ClientError {
RequestError(reqwest::Error),
StatusError { status: u16, message: String },
ParseError(String),
ConfigError(String),
}
impl fmt::Display for ClientError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ClientError::RequestError(e) => write!(f, "Request error: {}", e),
ClientError::StatusError { status, message } => {
write!(f, "Status error: {} - {}", status, message)
}
ClientError::ParseError(s) => write!(f, "Parse error: {}", s),
ClientError::ConfigError(s) => write!(f, "Configuration error: {}", s),
}
}
}
impl std::error::Error for ClientError {}
impl From<reqwest::Error> for ClientError {
fn from(err: reqwest::Error) -> Self {
ClientError::RequestError(err)
}
}
// RustyVault 客户端
pub struct Client {
addr: String,
token: String,
client: reqwest::Client,
namespace: Option<String>,
}
impl Client {
// 创建一个新的客户端构建器
pub fn new() -> ClientBuilder {
ClientBuilder::default()
}
// 执行 GET 请求
pub async fn read(&self, path: &str, query: Option<HashMap<String, String>>) -> Result<VaultResponse, ClientError> {
let url = format!("{}/v1/{}", self.addr.trim_end_matches('/'), path.trim_start_matches('/'));
let mut req = self.client.get(&url)
.header("X-Vault-Token", &self.token);
if let Some(ns) = &self.namespace {
req = req.header("X-Vault-Namespace", ns);
}
if let Some(q) = query {
req = req.query(&q);
}
self.send_request(req).await
}
// 执行 POST 请求
pub async fn write(&self, path: &str, body: Option<serde_json::Map<String, serde_json::Value>>) -> Result<VaultResponse, ClientError> {
let url = format!("{}/v1/{}", self.addr.trim_end_matches('/'), path.trim_start_matches('/'));
let mut req = self.client.post(&url)
.header("X-Vault-Token", &self.token);
if let Some(ns) = &self.namespace {
req = req.header("X-Vault-Namespace", ns);
}
if let Some(b) = body {
req = req.json(&b);
}
self.send_request(req).await
}
// 执行 DELETE 请求
pub async fn delete(&self, path: &str) -> Result<VaultResponse, ClientError> {
let url = format!("{}/v1/{}", self.addr.trim_end_matches('/'), path.trim_start_matches('/'));
let mut req = self.client.delete(&url)
.header("X-Vault-Token", &self.token);
if let Some(ns) = &self.namespace {
req = req.header("X-Vault-Namespace", ns);
}
self.send_request(req).await
}
// 发送请求并处理响应
async fn send_request(&self, req: reqwest::RequestBuilder) -> Result<VaultResponse, ClientError> {
let response = req.send().await?;
let status = response.status();
if !status.is_success() {
let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string());
return Err(ClientError::StatusError {
status: status.as_u16(),
message: error_text,
});
}
let json: serde_json::Value = response.json().await?;
// 构建响应对象
let mut vault_response = VaultResponse {
request_id: json.get("request_id").and_then(|v| v.as_str()).map(String::from),
lease_id: json.get("lease_id").and_then(|v| v.as_str()).map(String::from),
renewable: json.get("renewable").and_then(|v| v.as_bool()),
lease_duration: json.get("lease_duration").and_then(|v| v.as_i64()),
data: json.get("data").cloned(),
response_data: None,
warnings: json.get("warnings").and_then(|v| {
let mut warnings = Vec::new();
if let Some(array) = v.as_array() {
for item in array {
if let Some(s) = item.as_str() {
warnings.push(s.to_string());
}
}
Some(warnings)
} else {
None
}
}),
auth: json.get("auth").cloned(),
};
// 解析 response_data
if let Some(data) = json.get("data").and_then(|v| v.as_object()) {
vault_response.response_data = Some(data.clone());
} else {
vault_response.response_data = Some(Map::new());
}
Ok(vault_response)
}
}
// 客户端构建器
#[derive(Default)]
pub struct ClientBuilder {
addr: Option<String>,
token: Option<String>,
namespace: Option<String>,
timeout: Option<Duration>,
skip_tls_verify: bool,
ca_cert: Option<String>,
client_cert: Option<String>,
client_key: Option<String>,
}
impl ClientBuilder {
// 设置 Vault 服务器地址
pub fn with_addr(mut self, addr: &str) -> Self {
self.addr = Some(addr.to_string());
self
}
// 设置认证令牌
pub fn with_token(mut self, token: &str) -> Self {
self.token = Some(token.to_string());
self
}
// 设置命名空间
pub fn with_namespace(mut self, namespace: &str) -> Self {
self.namespace = Some(namespace.to_string());
self
}
// 设置请求超时时间
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
// 跳过 TLS 证书验证
pub fn skip_tls_verify(mut self) -> Self {
self.skip_tls_verify = true;
self
}
// 设置 CA 证书路径
pub fn with_ca_cert(mut self, ca_path: &str) -> Self {
self.ca_cert = Some(ca_path.to_string());
self
}
// 设置客户端证书和密钥
pub fn with_client_cert(mut self, cert_path: &str, key_path: &str) -> Self {
self.client_cert = Some(cert_path.to_string());
self.client_key = Some(key_path.to_string());
self
}
// 构建客户端实例
pub fn build(self) -> Result<Client, ClientError> {
let addr = self.addr.ok_or_else(|| {
ClientError::ConfigError("Vault address is required".to_string())
})?;
let token = self.token.unwrap_or_default();
// 构建 reqwest 客户端
let mut client_builder = reqwest::Client::builder();
// 设置超时
if let Some(timeout) = self.timeout {
client_builder = client_builder.timeout(timeout);
}
// TLS 配置
if self.skip_tls_verify {
client_builder = client_builder.danger_accept_invalid_certs(true);
}
// 构建客户端
let client = client_builder.build().map_err(|e| {
ClientError::ConfigError(format!("Failed to build HTTP client: {}", e))
})?;
Ok(Client {
addr,
token,
client,
namespace: self.namespace,
})
}
}

322
crypto/src/sse.rs Normal file
View File

@@ -0,0 +1,322 @@
// sse.rs - Server-Side Encryption interfaces and common implementations
// This file implements the core interfaces for Server-Side Encryption in RustFS
use crate::Error;
use http::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::{Once, RwLock};
use tracing::{debug, error, info};
/// SSE specifies the type of server-side encryption used
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SSE {
/// SSE-C indicates the object was encrypted using client-provided key
SSEC,
/// SSE-S3 indicates the object was encrypted using server-managed key
SSES3,
/// SSE-KMS indicates the object was encrypted using a KMS-managed key
SSEKMS,
}
impl fmt::Display for SSE {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SSE::SSEC => write!(f, "SSE-C"),
SSE::SSES3 => write!(f, "SSE-S3"),
SSE::SSEKMS => write!(f, "SSE-KMS"),
}
}
}
/// Algorithm represents encryption algorithm used
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Algorithm {
/// AES256 is the AES-256 encryption algorithm with GCM mode
AES256,
/// AWSKMS is the encryption algorithm using AWS KMS
AWSKMS,
}
impl fmt::Display for Algorithm {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Algorithm::AES256 => write!(f, "AES256"),
Algorithm::AWSKMS => write!(f, "aws:kms"),
}
}
}
/// S3 SSE Headers
pub const SSE_HEADER: &str = "X-Amz-Server-Side-Encryption";
pub const SSE_C_HEADER: &str = "X-Amz-Server-Side-Encryption-Customer-Algorithm";
pub const SSE_C_KEY_HEADER: &str = "X-Amz-Server-Side-Encryption-Customer-Key";
pub const SSE_C_KEY_MD5_HEADER: &str = "X-Amz-Server-Side-Encryption-Customer-Key-Md5";
pub const SSE_KMS_KEY_ID_HEADER: &str = "X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id";
pub const SSE_KMS_CONTEXT_HEADER: &str = "X-Amz-Server-Side-Encryption-Context";
/// SSE Copy Headers
pub const SSE_COPY_C_HEADER: &str = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Algorithm";
pub const SSE_COPY_C_KEY_HEADER: &str = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key";
pub const SSE_COPY_C_KEY_MD5_HEADER: &str = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key-Md5";
/// SSEOptions contain encryption options specified by the user
#[derive(Clone, Debug, Default)]
pub struct SSEOptions {
pub sse_type: Option<SSE>,
pub algorithm: Option<Algorithm>,
pub customer_key: Option<Vec<u8>>,
pub customer_key_md5: Option<String>,
pub kms_key_id: Option<String>,
pub kms_context: Option<String>,
}
impl SSEOptions {
/// Create a new empty SSEOptions
pub fn new() -> Self {
Default::default()
}
/// Extracts SSE options from request headers
pub fn from_headers(headers: &HeaderMap) -> Result<Self, Error> {
let mut options = Self::new();
// Check for SSE-C
if let Some(val) = headers.get(SSE_C_HEADER) {
if val == "AES256" {
options.sse_type = Some(SSE::SSEC);
options.algorithm = Some(Algorithm::AES256);
// SSE-C requires the customer-provided key
if let Some(key) = headers.get(SSE_C_KEY_HEADER) {
match base64::decode(key.as_bytes()) {
Ok(decoded_key) => {
options.customer_key = Some(decoded_key);
},
Err(_) => return Err(Error::ErrInvalidSSECustomerKey),
}
} else {
return Err(Error::ErrMissingSSEKey);
}
// MD5 is optional but should be validated if provided
if let Some(md5) = headers.get(SSE_C_KEY_MD5_HEADER) {
options.customer_key_md5 = Some(md5.to_str().unwrap_or("").to_string());
// TODO: Validate MD5 if present
}
} else {
return Err(Error::ErrInvalidSSEAlgorithm);
}
}
// Check for SSE-S3 or SSE-KMS
if let Some(val) = headers.get(SSE_HEADER) {
let val_str = val.to_str().unwrap_or("");
if val_str == "AES256" {
options.sse_type = Some(SSE::SSES3);
options.algorithm = Some(Algorithm::AES256);
} else if val_str == "aws:kms" {
options.sse_type = Some(SSE::SSEKMS);
options.algorithm = Some(Algorithm::AWSKMS);
// KMS requires a key ID
if let Some(key_id) = headers.get(SSE_KMS_KEY_ID_HEADER) {
options.kms_key_id = Some(key_id.to_str().unwrap_or("").to_string());
}
// KMS context is optional
if let Some(context) = headers.get(SSE_KMS_CONTEXT_HEADER) {
options.kms_context = Some(context.to_str().unwrap_or("").to_string());
}
} else {
return Err(Error::ErrInvalidSSEAlgorithm);
}
}
Ok(options)
}
/// Extracts SSE options from copy source headers
pub fn from_copy_source_headers(headers: &HeaderMap) -> Result<Self, Error> {
let mut options = Self::new();
// Check for SSE-C copy headers
if let Some(val) = headers.get(SSE_COPY_C_HEADER) {
if val == "AES256" {
options.sse_type = Some(SSE::SSEC);
options.algorithm = Some(Algorithm::AES256);
// SSE-C requires the customer-provided key
if let Some(key) = headers.get(SSE_COPY_C_KEY_HEADER) {
match base64::decode(key.as_bytes()) {
Ok(decoded_key) => {
options.customer_key = Some(decoded_key);
},
Err(_) => return Err(Error::ErrInvalidSSECustomerKey),
}
} else {
return Err(Error::ErrMissingSSEKey);
}
// MD5 is optional but should be validated if provided
if let Some(md5) = headers.get(SSE_COPY_C_KEY_MD5_HEADER) {
options.customer_key_md5 = Some(md5.to_str().unwrap_or("").to_string());
// TODO: Validate MD5 if present
}
} else {
return Err(Error::ErrInvalidSSEAlgorithm);
}
}
Ok(options)
}
/// Adds SSE headers to response based on options
pub fn add_headers_to_response(&self, headers: &mut HeaderMap) {
match self.sse_type {
Some(SSE::SSES3) => {
headers.insert(SSE_HEADER, HeaderValue::from_static("AES256"));
}
Some(SSE::SSEKMS) => {
headers.insert(SSE_HEADER, HeaderValue::from_static("aws:kms"));
if let Some(key_id) = &self.kms_key_id {
if let Ok(val) = HeaderValue::from_str(key_id) {
headers.insert(SSE_KMS_KEY_ID_HEADER, val);
}
}
}
Some(SSE::SSEC) => {
headers.insert(SSE_C_HEADER, HeaderValue::from_static("AES256"));
// We don't include the key in the response, only the algorithm
}
None => {}
}
}
}
/// DefaultKMSConfig represents the default KMS configuration for encryption
#[derive(Clone, Debug)]
pub struct DefaultKMSConfig {
pub endpoint: String,
pub key_id: String,
pub token: String,
pub ca_path: Option<String>,
pub skip_tls_verify: bool,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
}
// KMS initialization status tracking - using thread-safe RwLock instead of unsafe
static INIT_KMS: Once = Once::new();
static KMS_INIT_ERROR: RwLock<Option<String>> = RwLock::new(None);
lazy_static::lazy_static! {
/// Default KMS configuration from environment variables
static ref DEFAULT_KMS_CONFIG: Option<DefaultKMSConfig> = {
// Check if KMS is enabled
if std::env::var("RUSTFS_KMS_ENABLED").unwrap_or_default() != "true" {
return None;
}
// Basic required parameters
let endpoint = std::env::var("RUSTFS_KMS_VAULT_ENDPOINT").ok()?;
let key_id = std::env::var("RUSTFS_KMS_VAULT_KEY_NAME").ok()?;
let token = std::env::var("RUSTFS_KMS_VAULT_TOKEN").ok()?;
// Optional TLS configuration parameters
let ca_path = std::env::var("RUSTFS_KMS_VAULT_CAPATH").ok();
let skip_tls_verify = std::env::var("RUSTFS_KMS_VAULT_SKIP_TLS_VERIFY")
.map(|v| v == "true" || v == "1" || v == "yes")
.unwrap_or(false);
// Client certificate configuration
let client_cert_path = std::env::var("RUSTFS_KMS_VAULT_CLIENT_CERT").ok();
let client_key_path = std::env::var("RUSTFS_KMS_VAULT_CLIENT_KEY").ok();
Some(DefaultKMSConfig {
endpoint,
key_id,
token,
ca_path,
skip_tls_verify,
client_cert_path,
client_key_path
})
};
}
/// Get the default KMS configuration
pub fn get_default_kms_config() -> Option<DefaultKMSConfig> {
DEFAULT_KMS_CONFIG.clone()
}
/// Check if KMS initialization was successful - thread-safe version
pub fn is_kms_initialized() -> bool {
match get_default_kms_config() {
Some(_) => match KMS_INIT_ERROR.read() {
Ok(error) => error.is_none(),
Err(_) => false, // If lock is poisoned, consider uninitialized
},
None => false,
}
}
/// Get KMS initialization error if any - thread-safe version
pub fn get_kms_init_error() -> Option<String> {
match KMS_INIT_ERROR.read() {
Ok(error) => error.clone(),
Err(_) => Some("Failed to read KMS init error status".to_string()),
}
}
/// Initialize KMS client on system startup
#[cfg(feature = "kms")]
pub fn init_kms() {
if let Some(config) = get_default_kms_config() {
INIT_KMS.call_once(|| {
info!("Initializing KMS client with endpoint: {}", config.endpoint);
match crate::sse_kms::KMSClient::new(&config) {
Ok(client) => {
match crate::sse_kms::KMSClient::set_global_client(client) {
Ok(_) => {
info!("KMS client initialized successfully");
},
Err(e) => {
let err_msg = format!("Failed to set global KMS client: {}", e);
error!("{}", &err_msg);
if let Ok(mut error) = KMS_INIT_ERROR.write() {
*error = Some(err_msg);
}
}
}
},
Err(e) => {
let err_msg = format!("Failed to create KMS client: {}", e);
error!("{}", &err_msg);
if let Ok(mut error) = KMS_INIT_ERROR.write() {
*error = Some(err_msg);
}
}
}
});
} else {
debug!("KMS is not enabled or not properly configured");
}
}
/// No-op implementation when KMS feature is not enabled
#[cfg(not(feature = "kms"))]
pub fn init_kms() {
debug!("KMS feature is not enabled");
}
/// Trait for objects that can be encrypted and decrypted
pub trait Encryptable {
/// Encrypt data using the provided encryption method
fn encrypt(&self, data: &[u8], options: &SSEOptions) -> Result<Vec<u8>, Error>;
/// Decrypt data using the provided decryption method
fn decrypt(&self, data: &[u8], options: &SSEOptions) -> Result<Vec<u8>, Error>;
}

206
crypto/src/sse_c.rs Normal file
View File

@@ -0,0 +1,206 @@
// sse_c.rs - Implementation of SSE-C (Server-Side Encryption with Customer-Provided Keys)
// This file implements encryption/decryption using customer-provided keys
use crate::{
Error,
metadata::EncryptionInfo,
sse::{SSEOptions, Encryptable}
};
use aes_gcm::{
aead::{Aead, AeadCore, KeyInit},
Aes256Gcm, Key, Nonce
};
use rand::RngCore;
/// SSECEncryption provides SSE-C encryption capabilities
#[derive(Default)]
pub struct SSECEncryption;
impl SSECEncryption {
/// Create a new SSECEncryption instance
pub fn new() -> Self {
Self {}
}
/// Generate a random initialization vector
fn generate_iv() -> Vec<u8> {
let mut iv = vec![0u8; 12]; // AES-GCM typically uses 12 bytes IV
rand::thread_rng().fill_bytes(&mut iv);
iv
}
}
impl Encryptable for SSECEncryption {
/// Encrypt data using customer-provided key (SSE-C)
///
/// The customer key must be provided in the options
fn encrypt(&self, data: &[u8], options: &SSEOptions) -> Result<Vec<u8>, Error> {
// Ensure we have a customer key
let customer_key = options.customer_key.as_ref()
.ok_or(Error::ErrMissingSSEKey)?;
// AES-256 requires a 32-byte key
if customer_key.len() != 32 {
return Err(Error::ErrInvalidSSECustomerKey);
}
// Generate a random IV
let iv = Self::generate_iv();
// Create the cipher
let key = Key::<Aes256Gcm>::from_slice(customer_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&iv);
// Encrypt the data
let ciphertext = cipher.encrypt(nonce, data)
.map_err(|e| Error::ErrEncryptFailed(e))?;
// Create encryption metadata to store with the encrypted data
let info = EncryptionInfo::new_ssec(iv);
// Format: [metadata length (4 bytes)][metadata JSON][ciphertext]
let metadata_json = serde_json::to_vec(&info)
.map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
let metadata_len = metadata_json.len() as u32;
let mut result = Vec::with_capacity(4 + metadata_len as usize + ciphertext.len());
// Add metadata length as big-endian u32
result.extend_from_slice(&metadata_len.to_be_bytes());
// Add metadata and ciphertext
result.extend_from_slice(&metadata_json);
result.extend_from_slice(&ciphertext);
Ok(result)
}
/// Decrypt data using customer-provided key (SSE-C)
///
/// The customer key must be provided in the options
fn decrypt(&self, data: &[u8], options: &SSEOptions) -> Result<Vec<u8>, Error> {
// Ensure we have a customer key
let customer_key = options.customer_key.as_ref()
.ok_or(Error::ErrMissingSSEKey)?;
// AES-256 requires a 32-byte key
if customer_key.len() != 32 {
return Err(Error::ErrInvalidSSECustomerKey);
}
// Ensure data is long enough to contain metadata length (4 bytes)
if data.len() < 4 {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract the metadata length
let mut metadata_len_bytes = [0u8; 4];
metadata_len_bytes.copy_from_slice(&data[0..4]);
let metadata_len = u32::from_be_bytes(metadata_len_bytes) as usize;
// Ensure data is long enough to contain metadata
if data.len() < 4 + metadata_len {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract and parse metadata
let metadata_json = &data[4..4 + metadata_len];
let info: EncryptionInfo = serde_json::from_slice(metadata_json)
.map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
// Verify this is SSE-C encrypted data
if !info.is_ssec() {
return Err(Error::ErrInvalidSSEAlgorithm);
}
// Extract ciphertext
let ciphertext = &data[4 + metadata_len..];
// Create the cipher
let key = Key::<Aes256Gcm>::from_slice(customer_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&info.iv);
// Decrypt the data
let plaintext = cipher.decrypt(nonce, ciphertext)
.map_err(|e| Error::ErrDecryptFailed(e))?;
Ok(plaintext)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ssec_encrypt_decrypt() {
// Generate a random 32-byte key
let mut customer_key = vec![0u8; 32];
rand::thread_rng().fill_bytes(&mut customer_key);
let options = SSEOptions {
customer_key: Some(customer_key.clone()),
..Default::default()
};
// Create test data
let data = b"This is some test data to encrypt with SSE-C";
// Encrypt
let ssec = SSECEncryption::new();
let encrypted = ssec.encrypt(data, &options).unwrap();
// Decrypt
let decrypted = ssec.decrypt(&encrypted, &options).unwrap();
// Verify
assert_eq!(decrypted, data);
}
#[test]
fn test_ssec_wrong_key() {
// Generate a random 32-byte key
let mut customer_key = vec![0u8; 32];
rand::thread_rng().fill_bytes(&mut customer_key);
let options = SSEOptions {
customer_key: Some(customer_key.clone()),
..Default::default()
};
// Create test data
let data = b"This is some test data to encrypt with SSE-C";
// Encrypt
let ssec = SSECEncryption::new();
let encrypted = ssec.encrypt(data, &options).unwrap();
// Attempt to decrypt with wrong key
let mut wrong_key = vec![0u8; 32];
rand::thread_rng().fill_bytes(&mut wrong_key);
let wrong_options = SSEOptions {
customer_key: Some(wrong_key),
..Default::default()
};
// Should fail
let result = ssec.decrypt(&encrypted, &wrong_options);
assert!(result.is_err());
}
#[test]
fn test_ssec_missing_key() {
// Create test data
let data = b"This is some test data to encrypt with SSE-C";
// Try to encrypt without key
let ssec = SSECEncryption::new();
let options = SSEOptions::default();
let result = ssec.encrypt(data, &options);
assert!(result.is_err());
}
}

349
crypto/src/sse_kms.rs Normal file
View File

@@ -0,0 +1,349 @@
// sse_kms.rs - Implementation of SSE-KMS (Server-Side Encryption with Key Management Service)
// This file implements encryption/decryption using KMS-managed keys
use crate::{
Error,
metadata::EncryptionInfo,
sse::{SSEOptions, Encryptable, DefaultKMSConfig, get_default_kms_config},
rusty_vault_client::{self, Client, ClientError},
};
use aes_gcm::{
aead::{Aead, AeadCore, KeyInit},
Aes256Gcm, Key, Nonce
};
use rand::RngCore;
use std::sync::{Arc, Mutex, RwLock, Once};
use serde_json::{json, Map, Value};
use tracing::{debug, error, info};
// Lazily initialized KMS client
static INIT_KMS_CLIENT: Once = Once::new();
static KMS_CLIENT: RwLock<Option<Arc<KMSClient>>> = RwLock::new(None);
/// KMSClient wraps the RustyVault client for key management operations
#[derive(Clone)]
pub struct KMSClient {
client: Arc<Client>,
key_name: String,
}
impl KMSClient {
/// Create a new KMS client with the given configuration
pub fn new(config: &DefaultKMSConfig) -> Result<Self, Error> {
// Create basic client configuration
let mut client_builder = Client::new()
.with_addr(&config.endpoint)
.with_token(&config.token);
// Add TLS related configurations
if let Some(ca_path) = &config.ca_path {
client_builder = client_builder.with_ca_cert(ca_path);
}
if config.skip_tls_verify {
client_builder = client_builder.skip_tls_verify();
}
// Configure client certificates (if provided)
if let (Some(cert_path), Some(key_path)) = (&config.client_cert_path, &config.client_key_path) {
client_builder = client_builder.with_client_cert(cert_path, key_path);
}
// Build the client
let client = match client_builder.build() {
Ok(c) => c,
Err(e) => return Err(Error::ErrKMS(format!("Failed to create Vault client: {}", e))),
};
Ok(Self {
client: Arc::new(client),
key_name: config.key_id.clone(),
})
}
/// Set global KMS client instance
pub fn set_global_client(client: Self) -> Result<(), Error> {
match KMS_CLIENT.write() {
Ok(mut global_client) => {
*global_client = Some(Arc::new(client));
Ok(())
},
Err(_) => Err(Error::ErrKMS("Failed to acquire write lock for global KMS client".to_string()))
}
}
/// Initialize the global KMS client
pub fn init_global_client() -> Result<(), Error> {
INIT_KMS_CLIENT.call_once(|| {
if let Some(config) = get_default_kms_config() {
match Self::new(&config) {
Ok(client) => {
let _ = KMS_CLIENT.write().unwrap().insert(Arc::new(client));
},
Err(e) => {
error!("Failed to initialize KMS client: {}", e);
}
}
}
});
// Check if client is initialized
if KMS_CLIENT.read().unwrap().is_none() {
return Err(Error::ErrMissingKMSConfig);
}
Ok(())
}
/// Get the global KMS client instance
pub fn get_global_client() -> Result<Arc<KMSClient>, Error> {
Self::init_global_client()?;
KMS_CLIENT.read().unwrap()
.as_ref()
.cloned()
.ok_or(Error::ErrMissingKMSConfig)
}
/// Test client connection and key access
pub async fn test_connection(&self) -> Result<bool, Error> {
// Try to get key information to verify client connection and permissions
let path = format!("transit/keys/{}", self.key_name);
match self.client.read(&path, None).await {
Ok(_) => {
debug!("Successfully verified KMS client connection and key access");
Ok(true)
},
Err(e) => {
let err_msg = format!("KMS connection test failed: {}", e);
error!("{}", &err_msg);
Err(Error::ErrKMS(err_msg))
}
}
}
/// Encrypt data using the KMS key
pub async fn encrypt(&self, data: &[u8], context: Option<&str>) -> Result<Vec<u8>, Error> {
let plaintext_b64 = base64::encode(data);
let mut request = Map::new();
request.insert("plaintext".to_string(), Value::String(plaintext_b64));
// Add context if provided
if let Some(ctx) = context {
request.insert("context".to_string(), Value::String(base64::encode(ctx)));
}
// Call Vault API to encrypt
let path = format!("transit/encrypt/{}", self.key_name);
let response = match self.client.write(&path, Some(request)).await {
Ok(resp) => resp,
Err(e) => return Err(Error::ErrKMS(format!("KMS encrypt error: {}", e))),
};
// Extract ciphertext from response - fix ownership issue
if let Some(response_data) = &response.response_data {
if let Some(Value::String(ciphertext)) = response_data.get("ciphertext") {
return Ok(ciphertext.as_bytes().to_vec());
}
}
Err(Error::ErrKMS("No ciphertext in response".to_string()))
}
/// Decrypt data using the KMS key
pub async fn decrypt(&self, ciphertext: &[u8], context: Option<&str>) -> Result<Vec<u8>, Error> {
let ciphertext_str = std::str::from_utf8(ciphertext)
.map_err(|_| Error::ErrInvalidEncryptedDataFormat)?;
let mut request = Map::new();
request.insert("ciphertext".to_string(), Value::String(ciphertext_str.to_string()));
// Add context if provided
if let Some(ctx) = context {
request.insert("context".to_string(), Value::String(base64::encode(ctx)));
}
// Call Vault API to decrypt
let path = format!("transit/decrypt/{}", self.key_name);
let response = match self.client.write(&path, Some(request)).await {
Ok(resp) => resp,
Err(e) => return Err(Error::ErrKMS(format!("KMS decrypt error: {}", e))),
};
// Extract plaintext from response - fix ownership issue
if let Some(response_data) = &response.response_data {
if let Some(Value::String(plaintext_b64)) = response_data.get("plaintext") {
return base64::decode(plaintext_b64).map_err(Error::ErrBase64DecodeError);
}
}
Err(Error::ErrKMS("No plaintext in response".to_string()))
}
}
/// SSEKMSEncryption provides SSE-KMS encryption capabilities
pub struct SSEKMSEncryption {
kms_client: Arc<KMSClient>,
}
impl SSEKMSEncryption {
/// Create a new SSEKMSEncryption instance with the default KMS client
pub fn new() -> Result<Self, Error> {
let kms_client = KMSClient::get_global_client()?;
Ok(Self {
kms_client,
})
}
/// Create a new SSEKMSEncryption instance with a specific KMS client
pub fn with_client(kms_client: Arc<KMSClient>) -> Self {
Self {
kms_client,
}
}
/// Generate a random initialization vector
fn generate_iv() -> Vec<u8> {
let mut iv = vec![0u8; 12]; // AES-GCM typically uses 12 bytes IV
rand::thread_rng().fill_bytes(&mut iv);
iv
}
/// Generate a random data key for encrypting the object
fn generate_data_key() -> Vec<u8> {
let mut key = vec![0u8; 32]; // 256 bits for AES-256
rand::thread_rng().fill_bytes(&mut key);
key
}
}
#[cfg(feature = "kms")]
impl Encryptable for SSEKMSEncryption {
/// Encrypt data using KMS-managed keys (SSE-KMS)
fn encrypt(&self, data: &[u8], options: &SSEOptions) -> Result<Vec<u8>, Error> {
// In this synchronous function, we'll need to run the async code in a blocking manner
let rt = tokio::runtime::Runtime::new().map_err(|e| {
Error::ErrKMS(format!("Failed to create tokio runtime: {}", e))
})?;
rt.block_on(async {
// Get KMS key ID from options or use default
let kms_key_id = options.kms_key_id.as_deref()
.unwrap_or(&self.kms_client.key_name);
let kms_context = options.kms_context.as_deref();
// Generate a random data key for this object
let data_key = Self::generate_data_key();
// Generate a random IV for the data encryption
let iv = Self::generate_iv();
// Create the cipher for encrypting the data
let key = Key::<Aes256Gcm>::from_slice(&data_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&iv);
// Encrypt the data
let ciphertext = cipher.encrypt(nonce, data)
.map_err(|e| Error::ErrEncryptFailed(e))?;
// Encrypt the data key with KMS
let encrypted_key = self.kms_client.encrypt(&data_key, kms_context).await?;
// Create encryption metadata to store with the encrypted data
let info = EncryptionInfo::new_sse_kms(
iv,
encrypted_key,
kms_key_id.to_string(),
options.kms_context.clone()
);
// Format: [metadata length (4 bytes)][metadata JSON][ciphertext]
let metadata_json = serde_json::to_vec(&info)
.map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
let metadata_len = metadata_json.len() as u32;
let mut result = Vec::with_capacity(4 + metadata_len as usize + ciphertext.len());
// Add metadata length as big-endian u32
result.extend_from_slice(&metadata_len.to_be_bytes());
// Add metadata and ciphertext
result.extend_from_slice(&metadata_json);
result.extend_from_slice(&ciphertext);
Ok(result)
})
}
/// Decrypt data using KMS-managed keys (SSE-KMS)
fn decrypt(&self, data: &[u8], options: &SSEOptions) -> Result<Vec<u8>, Error> {
// In this synchronous function, we'll need to run the async code in a blocking manner
let rt = tokio::runtime::Runtime::new().map_err(|e| {
Error::ErrKMS(format!("Failed to create tokio runtime: {}", e))
})?;
rt.block_on(async {
// Ensure data is long enough to contain metadata length (4 bytes)
if data.len() < 4 {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract the metadata length
let mut metadata_len_bytes = [0u8; 4];
metadata_len_bytes.copy_from_slice(&data[0..4]);
let metadata_len = u32::from_be_bytes(metadata_len_bytes) as usize;
// Ensure data is long enough to contain metadata
if data.len() < 4 + metadata_len {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract and parse metadata
let metadata_json = &data[4..4 + metadata_len];
let info: EncryptionInfo = serde_json::from_slice(metadata_json)
.map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
// Verify this is SSE-KMS encrypted data
if !info.is_sse_kms() {
return Err(Error::ErrInvalidSSEAlgorithm);
}
// Extract the encrypted key and ciphertext
let encrypted_key = info.key.ok_or(Error::ErrEncryptedObjectKeyMissing)?;
let ciphertext = &data[4 + metadata_len..];
// Get KMS context if available
let kms_context = info.context.as_deref();
// Decrypt the data key using KMS
let data_key = self.kms_client.decrypt(&encrypted_key, kms_context).await?;
// Create the cipher for decrypting the data
let key = Key::<Aes256Gcm>::from_slice(&data_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&info.iv);
// Decrypt the data
let plaintext = cipher.decrypt(nonce, ciphertext)
.map_err(|e| Error::ErrDecryptFailed(e))?;
Ok(plaintext)
})
}
}
// Non-async version for when KMS feature is disabled
#[cfg(not(feature = "kms"))]
impl Encryptable for SSEKMSEncryption {
fn encrypt(&self, _data: &[u8], _options: &SSEOptions) -> Result<Vec<u8>, Error> {
Err(Error::ErrMissingKMSConfig)
}
fn decrypt(&self, _data: &[u8], _options: &SSEOptions) -> Result<Vec<u8>, Error> {
Err(Error::ErrMissingKMSConfig)
}
}

286
crypto/src/sse_s3.rs Normal file
View File

@@ -0,0 +1,286 @@
// sse_s3.rs - Implementation of SSE-S3 (Server-Side Encryption with Server-Managed Keys)
// This file implements encryption/decryption using server-managed keys
use crate::{
Error,
metadata::EncryptionInfo,
sse::{SSEOptions, Encryptable}
};
use aes_gcm::{
aead::{Aead, AeadCore, KeyInit},
Aes256Gcm, Key, Nonce
};
use rand::RngCore;
use std::sync::{Arc, Mutex, MutexGuard, OnceLock, Once};
use lazy_static::lazy_static;
// Master key for SSE-S3 (this key encrypts the per-object keys)
static MASTER_KEY: OnceLock<Arc<Vec<u8>>> = OnceLock::new();
static INIT_MASTER_KEY: Once = Once::new();
/// Initialize the SSE-S3 master key with provided key
pub fn init_master_key(key: Vec<u8>) {
INIT_MASTER_KEY.call_once(|| {
if key.len() == 32 {
let _ = MASTER_KEY.set(Arc::new(key));
}
});
}
/// Generate a random master key if none is set
fn ensure_master_key() -> Arc<Vec<u8>> {
INIT_MASTER_KEY.call_once(|| {
let mut key = vec![0u8; 32];
rand::thread_rng().fill_bytes(&mut key);
let _ = MASTER_KEY.set(Arc::new(key));
});
MASTER_KEY.get().unwrap().clone()
}
/// SSES3Encryption provides SSE-S3 encryption capabilities
#[derive(Default)]
pub struct SSES3Encryption {
keys_cache: Arc<Mutex<Vec<Vec<u8>>>>,
}
impl SSES3Encryption {
/// Create a new SSES3Encryption instance
pub fn new() -> Self {
Self {
keys_cache: Arc::new(Mutex::new(Vec::new())),
}
}
/// Generate a random initialization vector
fn generate_iv() -> Vec<u8> {
let mut iv = vec![0u8; 12]; // AES-GCM typically uses 12 bytes IV
rand::thread_rng().fill_bytes(&mut iv);
iv
}
/// Generate a random data key for encrypting the object
fn generate_data_key() -> Vec<u8> {
let mut key = vec![0u8; 32]; // 256 bits for AES-256
rand::thread_rng().fill_bytes(&mut key);
key
}
/// Encrypt a data key using the master key
fn encrypt_data_key(&self, data_key: &[u8]) -> Result<Vec<u8>, Error> {
let master_key = ensure_master_key();
// Generate a random IV for the key encryption
let iv = Self::generate_iv();
// Create the cipher for encrypting the data key
let key = Key::<Aes256Gcm>::from_slice(&master_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&iv);
// Encrypt the data key
let mut encrypted_key = cipher.encrypt(nonce, data_key)
.map_err(|e| Error::ErrEncryptFailed(e))?;
// Format: [iv_length (1 byte)][iv][encrypted_key]
let mut result = Vec::with_capacity(1 + iv.len() + encrypted_key.len());
result.push(iv.len() as u8);
result.extend_from_slice(&iv);
result.append(&mut encrypted_key);
Ok(result)
}
/// Decrypt an encrypted data key using the master key
fn decrypt_data_key(&self, encrypted_key_data: &[u8]) -> Result<Vec<u8>, Error> {
let master_key = ensure_master_key();
// Ensure the data is long enough to contain the IV length
if encrypted_key_data.is_empty() {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract IV length and IV
let iv_len = encrypted_key_data[0] as usize;
if encrypted_key_data.len() < 1 + iv_len {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
let iv = &encrypted_key_data[1..1+iv_len];
let encrypted_key = &encrypted_key_data[1+iv_len..];
// Create the cipher for decrypting the data key
let key = Key::<Aes256Gcm>::from_slice(&master_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(iv);
// Decrypt the data key
let data_key = cipher.decrypt(nonce, encrypted_key)
.map_err(|e| Error::ErrDecryptFailed(e))?;
Ok(data_key)
}
}
impl Encryptable for SSES3Encryption {
/// Encrypt data using server-managed keys (SSE-S3)
fn encrypt(&self, data: &[u8], _options: &SSEOptions) -> Result<Vec<u8>, Error> {
// Generate a random data key for this object
let data_key = Self::generate_data_key();
// Generate a random IV for the data encryption
let iv = Self::generate_iv();
// Create the cipher for encrypting the data
let key = Key::<Aes256Gcm>::from_slice(&data_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&iv);
// Encrypt the data
let ciphertext = cipher.encrypt(nonce, data)
.map_err(|e| Error::ErrEncryptFailed(e))?;
// Encrypt the data key with the master key
let encrypted_key = self.encrypt_data_key(&data_key)?;
// Create encryption metadata to store with the encrypted data
let info = EncryptionInfo::new_sse_s3(iv, encrypted_key);
// Format: [metadata length (4 bytes)][metadata JSON][ciphertext]
let metadata_json = serde_json::to_vec(&info)
.map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
let metadata_len = metadata_json.len() as u32;
let mut result = Vec::with_capacity(4 + metadata_len as usize + ciphertext.len());
// Add metadata length as big-endian u32
result.extend_from_slice(&metadata_len.to_be_bytes());
// Add metadata and ciphertext
result.extend_from_slice(&metadata_json);
result.extend_from_slice(&ciphertext);
Ok(result)
}
/// Decrypt data using server-managed keys (SSE-S3)
fn decrypt(&self, data: &[u8], _options: &SSEOptions) -> Result<Vec<u8>, Error> {
// Ensure data is long enough to contain metadata length (4 bytes)
if data.len() < 4 {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract the metadata length
let mut metadata_len_bytes = [0u8; 4];
metadata_len_bytes.copy_from_slice(&data[0..4]);
let metadata_len = u32::from_be_bytes(metadata_len_bytes) as usize;
// Ensure data is long enough to contain metadata
if data.len() < 4 + metadata_len {
return Err(Error::ErrInvalidEncryptedDataFormat);
}
// Extract and parse metadata
let metadata_json = &data[4..4 + metadata_len];
let info: EncryptionInfo = serde_json::from_slice(metadata_json)
.map_err(|_| Error::ErrInvalidEncryptionMetadata)?;
// Verify this is SSE-S3 encrypted data
if !info.is_sse_s3() {
return Err(Error::ErrInvalidSSEAlgorithm);
}
// Extract the encrypted key and ciphertext
let encrypted_key = info.key.ok_or(Error::ErrEncryptedObjectKeyMissing)?;
let ciphertext = &data[4 + metadata_len..];
// Decrypt the data key
let data_key = self.decrypt_data_key(&encrypted_key)?;
// Create the cipher for decrypting the data
let key = Key::<Aes256Gcm>::from_slice(&data_key);
let cipher = Aes256Gcm::new(key);
let nonce = Nonce::from_slice(&info.iv);
// Decrypt the data
let plaintext = cipher.decrypt(nonce, ciphertext)
.map_err(|e| Error::ErrDecryptFailed(e))?;
Ok(plaintext)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sse_s3_encrypt_decrypt() {
// Ensure a deterministic master key for testing
let test_master_key = vec![1u8; 32];
init_master_key(test_master_key.clone());
// Create test data
let data = b"This is some test data to encrypt with SSE-S3";
// Create encryption options (mostly unused for SSE-S3)
let options = SSEOptions::default();
// Encrypt
let sse_s3 = SSES3Encryption::new();
let encrypted = sse_s3.encrypt(data, &options).unwrap();
// Decrypt
let decrypted = sse_s3.decrypt(&encrypted, &options).unwrap();
// Verify
assert_eq!(decrypted, data);
}
#[test]
fn test_sse_s3_data_key_encryption() {
// Ensure a deterministic master key for testing
let test_master_key = vec![1u8; 32];
init_master_key(test_master_key.clone());
// Create a data key
let data_key = vec![5u8; 32];
// Encrypt the data key
let sse_s3 = SSES3Encryption::new();
let encrypted_key = sse_s3.encrypt_data_key(&data_key).unwrap();
// Decrypt the data key
let decrypted_key = sse_s3.decrypt_data_key(&encrypted_key).unwrap();
// Verify
assert_eq!(decrypted_key, data_key);
}
#[test]
fn test_sse_s3_master_key_auto_generation() {
// Reset the once cell for testing (this is a hack that works only for tests)
unsafe {
// This is unsafe but necessary for testing the auto-generation of master keys
let once = &INIT_MASTER_KEY as *const Once as *mut Once;
std::ptr::write(once, Once::new());
MASTER_KEY = OnceLock::new();
}
// Create test data
let data = b"This is some test data to encrypt with auto-generated master key";
// Create encryption options
let options = SSEOptions::default();
// Encrypt (this should auto-generate a master key)
let sse_s3 = SSES3Encryption::new();
let encrypted = sse_s3.encrypt(data, &options).unwrap();
// Decrypt
let decrypted = sse_s3.decrypt(&encrypted, &options).unwrap();
// Verify
assert_eq!(decrypted, data);
}
}

180
crypto/src/tests/mod.rs Normal file
View File

@@ -0,0 +1,180 @@
// SSE encryption integration tests
use crate::{
SSE, Algorithm, SSEOptions, Encryptable,
SSECEncryption, SSES3Encryption, CryptoFactory,
EncryptionInfo, extract_encryption_metadata, remove_encryption_metadata
};
use rand::RngCore;
use std::collections::HashMap;
#[cfg(test)]
mod tests {
use super::*;
// Helper function to generate a random customer key
fn generate_customer_key() -> Vec<u8> {
let mut key = vec![0u8; 32];
rand::thread_rng().fill_bytes(&mut key);
key
}
#[test]
fn test_sse_c_workflow() {
// Generate a customer key
let customer_key = generate_customer_key();
// Create test data
let data = b"This is test data for SSE-C encryption";
// Create SSE-C options
let options = SSEOptions {
sse_type: Some(SSE::SSEC),
algorithm: Some(Algorithm::AES256),
customer_key: Some(customer_key.clone()),
..Default::default()
};
// Encrypt with SSE-C
let ssec = SSECEncryption::new();
let encrypted = ssec.encrypt(data, &options).unwrap();
// Decrypt with SSE-C
let decrypted = ssec.decrypt(&encrypted, &options).unwrap();
// Verify
assert_eq!(decrypted, data);
}
#[test]
fn test_sse_s3_workflow() {
// Set a deterministic master key for testing
let master_key = vec![1u8; 32];
crate::init_master_key(master_key);
// Create test data
let data = b"This is test data for SSE-S3 encryption";
// Create SSE-S3 options
let options = SSEOptions {
sse_type: Some(SSE::SSES3),
algorithm: Some(Algorithm::AES256),
..Default::default()
};
// Encrypt with SSE-S3
let sses3 = SSES3Encryption::new();
let encrypted = sses3.encrypt(data, &options).unwrap();
// Decrypt with SSE-S3
let decrypted = sses3.decrypt(&encrypted, &options).unwrap();
// Verify
assert_eq!(decrypted, data);
}
#[test]
fn test_crypto_factory() {
// Generate a customer key
let customer_key = generate_customer_key();
// Set a deterministic master key for testing
let master_key = vec![1u8; 32];
crate::init_master_key(master_key);
// Test data
let data = b"This is test data for CryptoFactory";
// Test with SSE-C
let ssec_options = SSEOptions {
sse_type: Some(SSE::SSEC),
algorithm: Some(Algorithm::AES256),
customer_key: Some(customer_key.clone()),
..Default::default()
};
#[cfg(not(feature = "kms"))]
{
let encryptor = CryptoFactory::create_encryptor(Some(SSE::SSEC));
let encrypted = encryptor.encrypt(data, &ssec_options).unwrap();
let decrypted = encryptor.decrypt(&encrypted, &ssec_options).unwrap();
assert_eq!(decrypted, data);
}
#[cfg(feature = "kms")]
{
let encryptor = CryptoFactory::create_encryptor(Some(SSE::SSEC)).unwrap();
let encrypted = encryptor.encrypt(data, &ssec_options).unwrap();
let decrypted = encryptor.decrypt(&encrypted, &ssec_options).unwrap();
assert_eq!(decrypted, data);
}
// Test with SSE-S3
let sses3_options = SSEOptions {
sse_type: Some(SSE::SSES3),
algorithm: Some(Algorithm::AES256),
..Default::default()
};
#[cfg(not(feature = "kms"))]
{
let encryptor = CryptoFactory::create_encryptor(Some(SSE::SSES3));
let encrypted = encryptor.encrypt(data, &sses3_options).unwrap();
let decrypted = encryptor.decrypt(&encrypted, &sses3_options).unwrap();
assert_eq!(decrypted, data);
}
#[cfg(feature = "kms")]
{
let encryptor = CryptoFactory::create_encryptor(Some(SSE::SSES3)).unwrap();
let encrypted = encryptor.encrypt(data, &sses3_options).unwrap();
let decrypted = encryptor.decrypt(&encrypted, &sses3_options).unwrap();
assert_eq!(decrypted, data);
}
}
#[test]
fn test_metadata_handling() {
// Create encryption info for testing
let iv = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let key = vec![11, 22, 33, 44, 55];
let info = EncryptionInfo::new_sse_s3(iv.clone(), key.clone());
// Convert to metadata
let metadata = info.to_metadata();
// Extract encryption metadata
let extracted = extract_encryption_metadata(&metadata);
// Check that extracted contains only encryption metadata
assert_eq!(extracted.len(), metadata.len());
// Create mixed metadata
let mut mixed_metadata = metadata.clone();
mixed_metadata.insert("X-Normal-Meta".to_string(), "normal-value".to_string());
// Extract encryption metadata
let extracted_from_mixed = extract_encryption_metadata(&mixed_metadata);
// Check that extracted contains only encryption metadata
assert_eq!(extracted_from_mixed.len(), metadata.len());
assert!(!extracted_from_mixed.contains_key("X-Normal-Meta"));
// Remove encryption metadata
let mut to_clean = mixed_metadata.clone();
remove_encryption_metadata(&mut to_clean);
// Check that only non-encryption metadata remains
assert_eq!(to_clean.len(), 1);
assert!(to_clean.contains_key("X-Normal-Meta"));
// Reconstruct encryption info from metadata
let reconstructed = EncryptionInfo::from_metadata(&metadata).unwrap().unwrap();
// Verify reconstructed info
assert_eq!(reconstructed.sse_type, SSE::SSES3);
assert_eq!(reconstructed.algorithm, Algorithm::AES256);
assert_eq!(reconstructed.iv, iv);
assert_eq!(reconstructed.key.unwrap(), key);
}
}

View File

@@ -8,6 +8,8 @@ services:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
# 添加KMS配置从vault-config卷加载
- RUSTFS_KMS_ENABLED=true
platform: linux/amd64
ports:
- "9000:9000" # 映射宿主机的 9001 端口到容器的 9000 端口
@@ -15,7 +17,11 @@ services:
volumes:
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node0:/data # 将当前路径挂载到容器内的 /root/data
command: "/app/rustfs"
- vault-config:/etc/rustfs/kms:ro # 读取KMS配置
command: "/bin/sh -c 'if [ -f /etc/rustfs/kms/rustfs-kms.env ]; then export $(cat /etc/rustfs/kms/rustfs-kms.env | xargs); fi && /app/rustfs'"
depends_on:
rustyvault:
condition: service_healthy
node1:
image: rustfs:v1
@@ -26,13 +32,19 @@ services:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
# 添加KMS配置从vault-config卷加载
- RUSTFS_KMS_ENABLED=true
platform: linux/amd64
ports:
- "9001:9000" # 映射宿主机的 9002 端口到容器的 9000 端口
volumes:
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node1:/data
command: "/app/rustfs"
- vault-config:/etc/rustfs/kms:ro # 读取KMS配置
command: "/bin/sh -c 'if [ -f /etc/rustfs/kms/rustfs-kms.env ]; then export $(cat /etc/rustfs/kms/rustfs-kms.env | xargs); fi && /app/rustfs'"
depends_on:
rustyvault:
condition: service_healthy
node2:
image: rustfs:v1
@@ -43,13 +55,19 @@ services:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
# 添加KMS配置从vault-config卷加载
- RUSTFS_KMS_ENABLED=true
platform: linux/amd64
ports:
- "9002:9000" # 映射宿主机的 9003 端口到容器的 9000 端口
volumes:
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node2:/data
command: "/app/rustfs"
- vault-config:/etc/rustfs/kms:ro # 读取KMS配置
command: "/bin/sh -c 'if [ -f /etc/rustfs/kms/rustfs-kms.env ]; then export $(cat /etc/rustfs/kms/rustfs-kms.env | xargs); fi && /app/rustfs'"
depends_on:
rustyvault:
condition: service_healthy
node3:
image: rustfs:v1
@@ -60,10 +78,21 @@ services:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9002
# 添加KMS配置从vault-config卷加载
- RUSTFS_KMS_ENABLED=true
platform: linux/amd64
ports:
- "9003:9000" # 映射宿主机的 9004 端口到容器的 9000 端口
volumes:
- ./target/x86_64-unknown-linux-musl/release/rustfs:/app/rustfs
# - ./data/node3:/data
command: "/app/rustfs"
- vault-config:/etc/rustfs/kms:ro # 读取KMS配置
command: "/bin/sh -c 'if [ -f /etc/rustfs/kms/rustfs-kms.env ]; then export $(cat /etc/rustfs/kms/rustfs-kms.env | xargs); fi && /app/rustfs'"
depends_on:
rustyvault:
condition: service_healthy
# 定义用于存储Vault数据和配置的Docker卷
volumes:
vault-data:
vault-config:

382
ecstore/src/encrypt.rs Normal file
View File

@@ -0,0 +1,382 @@
// encrypt.rs - 对象存储加密处理层
// 处理与对象存储加密相关的操作
use crate::store_api::{ObjectInfo, ObjectOptions};
use common::error::{Error, Result};
use http::HeaderMap;
use ring::aead::{Aad, BoundKey, Nonce, NonceSequence, OpeningKey, SealingKey, CHACHA20_POLY1305};
use ring::digest::{digest, SHA256};
use ring::rand::{SecureRandom, SystemRandom};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info};
// 加密算法常量
pub const ENC_AES_256_GCM: &str = "AES256";
pub const ENC_CHACHA20_POLY1305: &str = "ChaCha20-Poly1305";
// 加密请求头常量
pub const HEADER_SSE_C_ALGORITHM: &str = "x-amz-server-side-encryption-customer-algorithm";
pub const HEADER_SSE_C_KEY: &str = "x-amz-server-side-encryption-customer-key";
pub const HEADER_SSE_C_KEY_MD5: &str = "x-amz-server-side-encryption-customer-key-MD5";
pub const HEADER_SSE_S3_ALGORITHM: &str = "x-amz-server-side-encryption";
pub const HEADER_SSE_S3_KEY_ID: &str = "x-amz-server-side-encryption-aws-kms-key-id";
pub const HEADER_SSE_KMS_KEY_ID: &str = "x-amz-server-side-encryption-kms-key-id";
// 加密元数据常量
pub const META_CRYPTO_IV: &str = "X-Amz-Meta-Crypto-Iv";
pub const META_CRYPTO_KEY: &str = "X-Amz-Meta-Crypto-Key";
pub const META_CRYPTO_KEY_ID: &str = "X-Amz-Meta-Crypto-Key-Id";
pub const META_CRYPTO_ALGORITHM: &str = "X-Amz-Meta-Crypto-Algorithm";
pub const META_CRYPTO_CONTEXT: &str = "X-Amz-Meta-Crypto-Context";
pub const META_UNENCRYPTED_CONTENT_LENGTH: &str = "X-Amz-Meta-Unencrypted-Content-Length";
pub const META_OBJECT_ENCRYPTION: &str = "X-Amz-Meta-Object-Encryption";
// 加密模式
#[derive(Debug, Clone, PartialEq)]
pub enum EncryptionMode {
SSEC, // SSE-C: 客户提供的密钥
SSES3, // SSE-S3: 服务端托管密钥
SSEKMS, // SSE-KMS: KMS托管密钥
}
/// 对象加密配置选项
#[derive(Debug, Clone)]
pub struct EncryptionOptions {
pub mode: EncryptionMode,
pub algorithm: String,
pub customer_key: Option<Vec<u8>>,
pub kms_key_id: Option<String>,
}
/// NonceSequence实现用于AEAD加密
struct NonceGen {
nonce: [u8; 12],
}
impl NonceGen {
fn new(iv: &[u8]) -> Self {
let mut nonce = [0u8; 12];
nonce.copy_from_slice(&iv[..12]);
Self { nonce }
}
}
impl NonceSequence for NonceGen {
fn advance(&mut self) -> Result<Nonce, ring::error::Unspecified> {
Ok(Nonce::assume_unique_for_key(self.nonce))
}
}
/// 对象加密实现
pub struct ObjectEncryption;
impl ObjectEncryption {
/// 从HTTP请求头中检测加密模式
pub fn detect_encryption_mode(headers: &HeaderMap) -> Result<Option<EncryptionOptions>> {
// 检测SSE-C
if headers.contains_key(HEADER_SSE_C_ALGORITHM) {
let algorithm = headers
.get(HEADER_SSE_C_ALGORITHM)
.ok_or_else(|| Error::msg("缺少SSE-C算法"))?
.to_str()
.map_err(|_| Error::msg("无效的SSE-C算法"))?;
let key = headers
.get(HEADER_SSE_C_KEY)
.ok_or_else(|| Error::msg("缺少SSE-C密钥"))?
.to_str()
.map_err(|_| Error::msg("无效的SSE-C密钥"))?;
// 将Base64编码的密钥转换为字节
let key_bytes = base64::decode(key).map_err(|_| Error::msg("无效的SSE-C密钥格式"))?;
// 验证MD5校验和
if headers.contains_key(HEADER_SSE_C_KEY_MD5) {
let key_md5 = headers
.get(HEADER_SSE_C_KEY_MD5)
.unwrap()
.to_str()
.map_err(|_| Error::msg("无效的SSE-C密钥MD5"))?;
let computed_md5 = Self::compute_md5(&key_bytes);
if key_md5 != computed_md5 {
return Err(Error::msg("SSE-C密钥MD5校验失败"));
}
}
return Ok(Some(EncryptionOptions {
mode: EncryptionMode::SSEC,
algorithm: algorithm.to_string(),
customer_key: Some(key_bytes),
kms_key_id: None,
}));
}
// 检测SSE-S3
if headers.contains_key(HEADER_SSE_S3_ALGORITHM) {
let algorithm = headers
.get(HEADER_SSE_S3_ALGORITHM)
.unwrap()
.to_str()
.map_err(|_| Error::msg("无效的SSE-S3算法"))?;
if algorithm != "AES256" {
return Err(Error::msg("不支持的SSE-S3加密算法"));
}
return Ok(Some(EncryptionOptions {
mode: EncryptionMode::SSES3,
algorithm: algorithm.to_string(),
customer_key: None,
kms_key_id: None,
}));
}
// 检测SSE-KMS
if headers.contains_key(HEADER_SSE_KMS_KEY_ID) {
let key_id = headers
.get(HEADER_SSE_KMS_KEY_ID)
.unwrap()
.to_str()
.map_err(|_| Error::msg("无效的KMS密钥ID"))?
.to_string();
return Ok(Some(EncryptionOptions {
mode: EncryptionMode::SSEKMS,
algorithm: ENC_AES_256_GCM.to_string(),
customer_key: None,
kms_key_id: Some(key_id),
}));
}
// 没有启用加密
Ok(None)
}
/// 计算MD5摘要并返回Base64编码的字符串
fn compute_md5(data: &[u8]) -> String {
let digest = md5::compute(data);
base64::encode(digest.as_ref())
}
/// 检查对象是否需要解密
pub fn needs_decryption(info: &ObjectInfo) -> bool {
if let Some(user_defined) = &info.user_defined {
user_defined.contains_key(META_CRYPTO_ALGORITHM) ||
user_defined.contains_key(META_OBJECT_ENCRYPTION)
} else {
false
}
}
/// 加密对象数据
pub async fn encrypt_object_data(
data: &[u8],
options: &EncryptionOptions,
) -> Result<(Vec<u8>, HashMap<String, String>)> {
let mut metadata = HashMap::new();
match options.mode {
EncryptionMode::SSEC => {
// 获取客户提供的密钥
let customer_key = options.customer_key
.as_ref()
.ok_or_else(|| Error::msg("缺少SSE-C客户密钥"))?;
// 生成随机IV
let rng = SystemRandom::new();
let mut iv = [0u8; 12]; // ChaCha20-Poly1305使用12字节的Nonce
rng.fill(&mut iv).map_err(|_| Error::msg("无法生成随机IV"))?;
// 使用ChaCha20-Poly1305加密
let sealing_key = ring::aead::UnboundKey::new(&CHACHA20_POLY1305, customer_key)
.map_err(|_| Error::msg("无法创建加密密钥"))?;
let mut sealing_key = SealingKey::new(sealing_key, NonceGen::new(&iv));
// 准备输出缓冲区
let mut in_out = data.to_vec();
let tag_len = ring::aead::CHACHA20_POLY1305.tag_len();
in_out.extend(vec![0u8; tag_len]);
// 加密
let aad = Aad::empty(); // 可以根据需要添加额外的认证数据
sealing_key.seal_in_place_append_tag(aad, &mut in_out)
.map_err(|_| Error::msg("数据加密失败"))?;
// 设置加密元数据
metadata.insert(META_CRYPTO_IV.to_string(), base64::encode(&iv));
metadata.insert(META_CRYPTO_ALGORITHM.to_string(), ENC_CHACHA20_POLY1305.to_string());
metadata.insert(META_UNENCRYPTED_CONTENT_LENGTH.to_string(), data.len().to_string());
metadata.insert(META_OBJECT_ENCRYPTION.to_string(), "SSE-C".to_string());
Ok((in_out, metadata))
},
EncryptionMode::SSES3 => {
// 生成随机数据密钥
let rng = SystemRandom::new();
let mut data_key = [0u8; 32];
rng.fill(&mut data_key).map_err(|_| Error::msg("无法生成随机数据密钥"))?;
// 生成随机IV
let mut iv = [0u8; 12];
rng.fill(&mut iv).map_err(|_| Error::msg("无法生成随机IV"))?;
// 使用数据密钥和ChaCha20-Poly1305加密数据
let sealing_key = ring::aead::UnboundKey::new(&CHACHA20_POLY1305, &data_key)
.map_err(|_| Error::msg("无法创建加密密钥"))?;
let mut sealing_key = SealingKey::new(sealing_key, NonceGen::new(&iv));
// 准备输出缓冲区
let mut in_out = data.to_vec();
let tag_len = ring::aead::CHACHA20_POLY1305.tag_len();
in_out.extend(vec![0u8; tag_len]);
// 加密
let aad = Aad::empty();
sealing_key.seal_in_place_append_tag(aad, &mut in_out)
.map_err(|_| Error::msg("数据加密失败"))?;
// 使用SSE-S3主密钥加密数据密钥
// 在实际实现中,这里应该调用密钥管理服务
// 这里简化处理,直接将数据密钥放入元数据
metadata.insert(META_CRYPTO_IV.to_string(), base64::encode(&iv));
metadata.insert(META_CRYPTO_KEY.to_string(), base64::encode(&data_key));
metadata.insert(META_CRYPTO_ALGORITHM.to_string(), ENC_AES_256_GCM.to_string());
metadata.insert(META_UNENCRYPTED_CONTENT_LENGTH.to_string(), data.len().to_string());
metadata.insert(META_OBJECT_ENCRYPTION.to_string(), "SSE-S3".to_string());
Ok((in_out, metadata))
},
EncryptionMode::SSEKMS => {
// 获取KMS密钥ID
let key_id = options.kms_key_id
.as_ref()
.ok_or_else(|| Error::msg("缺少SSE-KMS密钥ID"))?;
// 生成随机数据密钥
let rng = SystemRandom::new();
let mut data_key = [0u8; 32];
rng.fill(&mut data_key).map_err(|_| Error::msg("无法生成随机数据密钥"))?;
// 生成随机IV
let mut iv = [0u8; 12];
rng.fill(&mut iv).map_err(|_| Error::msg("无法生成随机IV"))?;
// 使用数据密钥和ChaCha20-Poly1305加密数据
let sealing_key = ring::aead::UnboundKey::new(&CHACHA20_POLY1305, &data_key)
.map_err(|_| Error::msg("无法创建加密密钥"))?;
let mut sealing_key = SealingKey::new(sealing_key, NonceGen::new(&iv));
// 准备输出缓冲区
let mut in_out = data.to_vec();
let tag_len = ring::aead::CHACHA20_POLY1305.tag_len();
in_out.extend(vec![0u8; tag_len]);
// 加密
let aad = Aad::empty();
sealing_key.seal_in_place_append_tag(aad, &mut in_out)
.map_err(|_| Error::msg("数据加密失败"))?;
// 在实际实现中这里应该调用KMS加密数据密钥
// 这里简化处理,直接将数据密钥放入元数据
metadata.insert(META_CRYPTO_IV.to_string(), base64::encode(&iv));
metadata.insert(META_CRYPTO_KEY.to_string(), base64::encode(&data_key));
metadata.insert(META_CRYPTO_KEY_ID.to_string(), key_id.clone());
metadata.insert(META_CRYPTO_ALGORITHM.to_string(), ENC_AES_256_GCM.to_string());
metadata.insert(META_UNENCRYPTED_CONTENT_LENGTH.to_string(), data.len().to_string());
metadata.insert(META_OBJECT_ENCRYPTION.to_string(), "SSE-KMS".to_string());
Ok((in_out, metadata))
},
}
}
/// 解密对象数据
pub async fn decrypt_object_data(
encrypted_data: &[u8],
metadata: &HashMap<String, String>,
customer_key: Option<Vec<u8>>,
) -> Result<Vec<u8>> {
// 确定加密模式
let encryption_mode = metadata.get(META_OBJECT_ENCRYPTION)
.ok_or_else(|| Error::msg("缺少加密模式元数据"))?;
// 获取IV
let iv_base64 = metadata.get(META_CRYPTO_IV)
.ok_or_else(|| Error::msg("缺少IV元数据"))?;
let iv = base64::decode(iv_base64).map_err(|_| Error::msg("无效的IV格式"))?;
match encryption_mode.as_str() {
"SSE-C" => {
// 获取客户提供的密钥
let key = customer_key
.ok_or_else(|| Error::msg("缺少SSE-C客户密钥"))?;
// 使用ChaCha20-Poly1305解密
let opening_key = ring::aead::UnboundKey::new(&CHACHA20_POLY1305, &key)
.map_err(|_| Error::msg("无法创建解密密钥"))?;
let mut opening_key = OpeningKey::new(opening_key, NonceGen::new(&iv));
// 解密
let mut in_out = encrypted_data.to_vec();
let aad = Aad::empty();
let decrypted_data = opening_key.open_in_place(aad, &mut in_out)
.map_err(|_| Error::msg("数据解密失败"))?;
Ok(decrypted_data.to_vec())
},
"SSE-S3" => {
// 获取加密的数据密钥
let key_base64 = metadata.get(META_CRYPTO_KEY)
.ok_or_else(|| Error::msg("缺少数据密钥元数据"))?;
let data_key = base64::decode(key_base64).map_err(|_| Error::msg("无效的数据密钥格式"))?;
// 使用数据密钥和ChaCha20-Poly1305解密
let opening_key = ring::aead::UnboundKey::new(&CHACHA20_POLY1305, &data_key)
.map_err(|_| Error::msg("无法创建解密密钥"))?;
let mut opening_key = OpeningKey::new(opening_key, NonceGen::new(&iv));
// 解密
let mut in_out = encrypted_data.to_vec();
let aad = Aad::empty();
let decrypted_data = opening_key.open_in_place(aad, &mut in_out)
.map_err(|_| Error::msg("数据解密失败"))?;
Ok(decrypted_data.to_vec())
},
"SSE-KMS" => {
// 获取加密的数据密钥
let key_base64 = metadata.get(META_CRYPTO_KEY)
.ok_or_else(|| Error::msg("缺少数据密钥元数据"))?;
let data_key = base64::decode(key_base64).map_err(|_| Error::msg("无效的数据密钥格式"))?;
// 使用数据密钥和ChaCha20-Poly1305解密
let opening_key = ring::aead::UnboundKey::new(&CHACHA20_POLY1305, &data_key)
.map_err(|_| Error::msg("无法创建解密密钥"))?;
let mut opening_key = OpeningKey::new(opening_key, NonceGen::new(&iv));
// 解密
let mut in_out = encrypted_data.to_vec();
let aad = Aad::empty();
let decrypted_data = opening_key.open_in_place(aad, &mut in_out)
.map_err(|_| Error::msg("数据解密失败"))?;
Ok(decrypted_data.to_vec())
},
_ => Err(Error::msg(format!("不支持的加密模式: {}", encryption_mode))),
}
}
/// 将加密元数据添加到对象选项中
pub fn add_encryption_metadata(opts: &mut ObjectOptions, metadata: HashMap<String, String>) {
if opts.user_defined.is_none() {
opts.user_defined = Some(metadata);
} else if let Some(user_defined) = &mut opts.user_defined {
user_defined.extend(metadata);
}
}
}

View File

@@ -33,6 +33,8 @@ mod store_utils;
pub mod utils;
pub mod xhttp;
pub mod encrypt;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;
pub use global::update_erasure_type;

View File

@@ -1191,45 +1191,111 @@ impl ObjectIO for ECStore {
check_get_obj_args(bucket, object)?;
let object = utils::path::encode_dir_object(object);
// Check if decryption is needed
let customer_key = if let Some(sse_opts) = crate::encrypt::ObjectEncryption::detect_encryption_mode(&h)? {
sse_opts.customer_key
} else {
None
};
let mut reader: GetObjectReader;
if self.single_pool() {
return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await;
reader = self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await?;
} else {
// TODO: nslock
let mut opts = opts.clone();
opts.no_lock = true;
// TODO: check if DeleteMarker
let (_oi, idx) = self.get_latest_object_info_with_idx(bucket, &object, &opts).await?;
reader = self.pools[idx]
.get_object_reader(bucket, object.as_str(), range, h, &opts)
.await?;
}
// TODO: nslock
// Check if the object needs to be decrypted
if reader.is_encrypted || crate::encrypt::ObjectEncryption::needs_decryption(&reader.info) {
// Get object metadata
let metadata = reader.info.user_defined.as_ref()
.ok_or_else(|| Error::msg("Missing encryption metadata"))?;
// Handle decryption logic
let encrypted_data = reader.get_object_data().await?;
// Decrypt data
let decrypted_data = crate::encrypt::ObjectEncryption::decrypt_object_data(
&encrypted_data,
metadata,
customer_key
).await?;
// update the reader with decrypted data
reader.set_decrypted_data(decrypted_data);
}
let mut opts = opts.clone();
opts.no_lock = true;
// TODO: check if DeleteMarker
let (_oi, idx) = self.get_latest_object_info_with_idx(bucket, &object, &opts).await?;
self.pools[idx]
.get_object_reader(bucket, object.as_str(), range, h, &opts)
.await
Ok(reader)
}
#[tracing::instrument(level = "debug", skip(self, data))]
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
check_put_object_args(bucket, object)?;
let object = utils::path::encode_dir_object(object);
// Check if the object needs to be encrypted
if let Some(sse_opts) = crate::encrypt::ObjectEncryption::detect_encryption_mode(&data.headers)? {
// Get the encryption metadata
let object_data = data.get_data().await?;
// Encrypt the data
let (encrypted_data, crypto_metadata) = crate::encrypt::ObjectEncryption::encrypt_object_data(
&object_data,
&sse_opts
).await?;
// Update the headers with encryption metadata
let mut opts = opts.clone();
crate::encrypt::ObjectEncryption::add_encryption_metadata(&mut opts, crypto_metadata);
// Create a new put object reader with the encrypted data.
let mut encrypted_reader = data.clone_with_data(encrypted_data);
// Put the encrypted object
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), &mut encrypted_reader, &opts).await;
}
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), data, opts).await;
let idx = self.get_pool_idx(bucket, &object, data.content_length as i64).await?;
if opts.data_movement && idx == opts.src_pool_idx {
return Err(Error::new(StorageError::DataMovementOverwriteErr(
bucket.to_owned(),
object.to_owned(),
opts.version_id.clone().unwrap_or_default(),
)));
}
return self.pools[idx].put_object(bucket, &object, &mut encrypted_reader, &opts).await;
} else {
// No encryption needed
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), data, opts).await;
}
let idx = self.get_pool_idx(bucket, &object, data.content_length as i64).await?;
if opts.data_movement && idx == opts.src_pool_idx {
return Err(Error::new(StorageError::DataMovementOverwriteErr(
bucket.to_owned(),
object.to_owned(),
opts.version_id.clone().unwrap_or_default(),
)));
}
return self.pools[idx].put_object(bucket, &object, data, opts).await;
}
let idx = self.get_pool_idx(bucket, &object, data.content_length as i64).await?;
if opts.data_movement && idx == opts.src_pool_idx {
return Err(Error::new(StorageError::DataMovementOverwriteErr(
bucket.to_owned(),
object.to_owned(),
opts.version_id.clone().unwrap_or_default(),
)));
}
self.pools[idx].put_object(bucket, &object, data, opts).await
}
}

View File

@@ -21,6 +21,7 @@ use common::{
error::{Error, Result},
globals::set_global_addr,
};
use crypto::init_kms;
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
@@ -521,6 +522,15 @@ async fn run(opt: config::Opt) -> Result<()> {
init_iam_sys(store.clone()).await?;
// 初始化KMS客户端如果启用
init_kms();
// 检查KMS初始化状态并记录日志
if let Some(err) = crypto::get_kms_init_error() {
error!("Failed to initialize KMS: {}", err);
} else if crypto::is_kms_initialized() {
info!("KMS encryption is enabled and initialized successfully");
}
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
error!("new_global_notification_sys failed {:?}", &err);
Error::from_string(err.to_string())