mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
refactor(admin): move KMS management handlers (#1971)
This commit is contained in:
@@ -15,55 +15,7 @@
|
||||
//! KMS admin handlers for HTTP API
|
||||
|
||||
use super::{kms_dynamic, kms_keys, kms_management};
|
||||
use crate::admin::auth::validate_admin_request;
|
||||
use crate::admin::router::{AdminOperation, Operation, S3Router};
|
||||
use crate::app::context::resolve_kms_runtime_service_manager;
|
||||
use crate::auth::{check_key_valid, get_session_token};
|
||||
use crate::server::RemoteAddr;
|
||||
use hyper::{HeaderMap, StatusCode};
|
||||
use matchit::Params;
|
||||
use rustfs_kms::init_global_kms_service_manager;
|
||||
use rustfs_policy::policy::action::{Action, AdminAction};
|
||||
use s3s::header::CONTENT_TYPE;
|
||||
use s3s::{Body, S3Request, S3Response, S3Result, s3_error};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
async fn kms_encryption_service_from_context() -> Option<std::sync::Arc<rustfs_kms::ObjectEncryptionService>> {
|
||||
let manager = match resolve_kms_runtime_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
manager.get_encryption_service().await
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct KmsStatusResponse {
|
||||
pub backend_type: String,
|
||||
pub backend_status: String,
|
||||
pub cache_enabled: bool,
|
||||
pub cache_stats: Option<CacheStatsResponse>,
|
||||
pub default_key_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CacheStatsResponse {
|
||||
pub hit_count: u64,
|
||||
pub miss_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct KmsConfigResponse {
|
||||
pub backend: String,
|
||||
pub cache_enabled: bool,
|
||||
pub cache_max_keys: usize,
|
||||
pub cache_ttl_seconds: u64,
|
||||
pub default_key_id: Option<String>,
|
||||
}
|
||||
use crate::admin::router::{AdminOperation, S3Router};
|
||||
|
||||
pub fn register_kms_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()> {
|
||||
kms_management::register_kms_management_route(r)?;
|
||||
@@ -71,157 +23,3 @@ pub fn register_kms_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<(
|
||||
kms_keys::register_kms_key_route(r)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get KMS service status
|
||||
pub struct KmsStatusHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for KmsStatusHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let Some(cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "authentication required"));
|
||||
};
|
||||
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?;
|
||||
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ServerInfoAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(service) = kms_encryption_service_from_context().await else {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
let backend_status = match service.health_check().await {
|
||||
Ok(true) => "healthy".to_string(),
|
||||
Ok(false) => "unhealthy".to_string(),
|
||||
Err(e) => {
|
||||
warn!("KMS health check failed: {}", e);
|
||||
"error".to_string()
|
||||
}
|
||||
};
|
||||
|
||||
let cache_stats = service.cache_stats().await.map(|(hits, misses)| CacheStatsResponse {
|
||||
hit_count: hits,
|
||||
miss_count: misses,
|
||||
});
|
||||
|
||||
let response = KmsStatusResponse {
|
||||
backend_type: "vault".to_string(), // TODO: Get from config
|
||||
backend_status,
|
||||
cache_enabled: cache_stats.is_some(),
|
||||
cache_stats,
|
||||
default_key_id: service.get_default_key_id().cloned(),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response).map_err(|e| s3_error!(InternalError, "failed to serialize response: {}", e))?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get KMS configuration
|
||||
pub struct KmsConfigHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for KmsConfigHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let Some(cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "authentication required"));
|
||||
};
|
||||
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?;
|
||||
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ServerInfoAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(service) = kms_encryption_service_from_context().await else {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
// TODO: Get actual config from service
|
||||
let response = KmsConfigResponse {
|
||||
backend: "vault".to_string(),
|
||||
cache_enabled: true,
|
||||
cache_max_keys: 1000,
|
||||
cache_ttl_seconds: 300,
|
||||
default_key_id: service.get_default_key_id().cloned(),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response).map_err(|e| s3_error!(InternalError, "failed to serialize response: {}", e))?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers))
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear KMS cache
|
||||
pub struct KmsClearCacheHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for KmsClearCacheHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let Some(cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "authentication required"));
|
||||
};
|
||||
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?;
|
||||
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ServerInfoAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(service) = kms_encryption_service_from_context().await else {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
match service.clear_cache().await {
|
||||
Ok(()) => {
|
||||
info!("KMS cache cleared successfully");
|
||||
let response = serde_json::json!({
|
||||
"status": "success",
|
||||
"message": "cache cleared successfully"
|
||||
});
|
||||
|
||||
let data =
|
||||
serde_json::to_vec(&response).map_err(|e| s3_error!(InternalError, "failed to serialize response: {}", e))?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers))
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to clear KMS cache: {}", e);
|
||||
Err(s3_error!(InternalError, "failed to clear cache: {}", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,11 +14,55 @@
|
||||
|
||||
//! KMS management route registration.
|
||||
|
||||
use super::kms::{KmsClearCacheHandler, KmsConfigHandler, KmsStatusHandler};
|
||||
use super::kms_keys::{CreateKeyHandler, DescribeKeyHandler, GenerateDataKeyHandler, ListKeysHandler};
|
||||
use crate::admin::router::{AdminOperation, S3Router};
|
||||
use crate::server::ADMIN_PREFIX;
|
||||
use hyper::Method;
|
||||
use crate::admin::auth::validate_admin_request;
|
||||
use crate::admin::router::{AdminOperation, Operation, S3Router};
|
||||
use crate::app::context::resolve_kms_runtime_service_manager;
|
||||
use crate::auth::{check_key_valid, get_session_token};
|
||||
use crate::server::{ADMIN_PREFIX, RemoteAddr};
|
||||
use hyper::{HeaderMap, Method, StatusCode};
|
||||
use matchit::Params;
|
||||
use rustfs_kms::init_global_kms_service_manager;
|
||||
use rustfs_policy::policy::action::{Action, AdminAction};
|
||||
use s3s::header::CONTENT_TYPE;
|
||||
use s3s::{Body, S3Request, S3Response, S3Result, s3_error};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
async fn kms_encryption_service_from_context() -> Option<std::sync::Arc<rustfs_kms::ObjectEncryptionService>> {
|
||||
let manager = match resolve_kms_runtime_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
manager.get_encryption_service().await
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct KmsStatusResponse {
|
||||
pub backend_type: String,
|
||||
pub backend_status: String,
|
||||
pub cache_enabled: bool,
|
||||
pub cache_stats: Option<CacheStatsResponse>,
|
||||
pub default_key_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CacheStatsResponse {
|
||||
pub hit_count: u64,
|
||||
pub miss_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct KmsConfigResponse {
|
||||
pub backend: String,
|
||||
pub cache_enabled: bool,
|
||||
pub cache_max_keys: usize,
|
||||
pub cache_ttl_seconds: u64,
|
||||
pub default_key_id: Option<String>,
|
||||
}
|
||||
|
||||
pub fn register_kms_management_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()> {
|
||||
r.insert(
|
||||
@@ -65,3 +109,157 @@ pub fn register_kms_management_route(r: &mut S3Router<AdminOperation>) -> std::i
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get KMS service status
|
||||
pub struct KmsStatusHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for KmsStatusHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let Some(cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "authentication required"));
|
||||
};
|
||||
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?;
|
||||
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ServerInfoAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(service) = kms_encryption_service_from_context().await else {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
let backend_status = match service.health_check().await {
|
||||
Ok(true) => "healthy".to_string(),
|
||||
Ok(false) => "unhealthy".to_string(),
|
||||
Err(e) => {
|
||||
warn!("KMS health check failed: {}", e);
|
||||
"error".to_string()
|
||||
}
|
||||
};
|
||||
|
||||
let cache_stats = service.cache_stats().await.map(|(hits, misses)| CacheStatsResponse {
|
||||
hit_count: hits,
|
||||
miss_count: misses,
|
||||
});
|
||||
|
||||
let response = KmsStatusResponse {
|
||||
backend_type: "vault".to_string(), // TODO: Get from config
|
||||
backend_status,
|
||||
cache_enabled: cache_stats.is_some(),
|
||||
cache_stats,
|
||||
default_key_id: service.get_default_key_id().cloned(),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response).map_err(|e| s3_error!(InternalError, "failed to serialize response: {}", e))?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get KMS configuration
|
||||
pub struct KmsConfigHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for KmsConfigHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let Some(cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "authentication required"));
|
||||
};
|
||||
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?;
|
||||
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ServerInfoAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(service) = kms_encryption_service_from_context().await else {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
// TODO: Get actual config from service
|
||||
let response = KmsConfigResponse {
|
||||
backend: "vault".to_string(),
|
||||
cache_enabled: true,
|
||||
cache_max_keys: 1000,
|
||||
cache_ttl_seconds: 300,
|
||||
default_key_id: service.get_default_key_id().cloned(),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response).map_err(|e| s3_error!(InternalError, "failed to serialize response: {}", e))?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers))
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear KMS cache
|
||||
pub struct KmsClearCacheHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for KmsClearCacheHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let Some(cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "authentication required"));
|
||||
};
|
||||
|
||||
let (cred, owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?;
|
||||
|
||||
validate_admin_request(
|
||||
&req.headers,
|
||||
&cred,
|
||||
owner,
|
||||
false,
|
||||
vec![Action::AdminAction(AdminAction::ServerInfoAdminAction)],
|
||||
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(service) = kms_encryption_service_from_context().await else {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
match service.clear_cache().await {
|
||||
Ok(()) => {
|
||||
info!("KMS cache cleared successfully");
|
||||
let response = serde_json::json!({
|
||||
"status": "success",
|
||||
"message": "cache cleared successfully"
|
||||
});
|
||||
|
||||
let data =
|
||||
serde_json::to_vec(&response).map_err(|e| s3_error!(InternalError, "failed to serialize response: {}", e))?;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers))
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to clear KMS cache: {}", e);
|
||||
Err(s3_error!(InternalError, "failed to clear cache: {}", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user