From e4e2fa23ce6d22adbed713fff455f6ea588e6fdd Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 22 Jan 2025 11:08:59 +0800 Subject: [PATCH] add policy api --- iam/src/manager.rs | 2 +- rustfs/src/admin/handlers.rs | 2 +- rustfs/src/admin/handlers/policy.rs | 274 ++++++++++++++++++++++++++++ rustfs/src/admin/handlers/user.rs | 10 +- rustfs/src/admin/mod.rs | 37 +++- rustfs/src/main.rs | 2 +- 6 files changed, 322 insertions(+), 5 deletions(-) create mode 100644 rustfs/src/admin/handlers/policy.rs diff --git a/iam/src/manager.rs b/iam/src/manager.rs index 7cdb2f48..d55debb5 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -1098,7 +1098,7 @@ where fn update_user_with_claims(&self, k: &str, u: UserIdentity) -> Result<()> { let mut u = u; - if u.credentials.session_token.is_empty() { + if !u.credentials.session_token.is_empty() { u.credentials.claims = Some(extract_jwt_claims(&u)?); } diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 52ec6018..4c3f88a2 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -1,7 +1,6 @@ use super::router::Operation; use crate::storage::error::to_s3_error; use bytes::Bytes; -use const_str::from_utf8; use ecstore::admin_server_info::get_server_info; use ecstore::bucket::policy::action::{Action, ActionSet}; use ecstore::bucket::policy::bucket_policy::{BPStatement, BucketPolicy}; @@ -56,6 +55,7 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; pub mod group; +pub mod policy; pub mod service_account; pub mod trace; pub mod user; diff --git a/rustfs/src/admin/handlers/policy.rs b/rustfs/src/admin/handlers/policy.rs new file mode 100644 index 00000000..40b8c98d --- /dev/null +++ b/rustfs/src/admin/handlers/policy.rs @@ -0,0 +1,274 @@ +use std::collections::HashMap; + +use crate::admin::{router::Operation, utils::has_space_be}; +use http::{HeaderMap, StatusCode}; +use iam::{error::is_err_no_such_user, get_global_action_cred, policy::Policy, store::MappedPolicy}; +use matchit::Params; +use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result}; +use serde::Deserialize; +use serde_urlencoded::from_bytes; +use tracing::warn; + +#[derive(Debug, Deserialize, Default)] +pub struct BucketQuery { + pub bucket: String, +} + +pub struct ListCannedPolicies {} +#[async_trait::async_trait] +impl Operation for ListCannedPolicies { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle ListCannedPolicies"); + + let query = { + if let Some(query) = req.uri.query() { + let input: BucketQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + input + } else { + BucketQuery::default() + } + }; + + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) }; + + let policies = iam_store.list_polices(&query.bucket).await.map_err(|e| { + warn!("list policies failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InternalError, e.to_string()) + })?; + + let kvs: HashMap = policies + .into_iter() + .filter(|(_, v)| serde_json::to_string(v).is_ok()) + .collect(); + + let body = serde_json::to_vec(&kvs).map_err(|e| s3_error!(InternalError, "marshal body failed, e: {:?}", e))?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), header)) + } +} + +#[derive(Debug, Deserialize, Default)] +pub struct PolicyNameQuery { + pub name: String, +} + +pub struct AddCannedPolicy {} +#[async_trait::async_trait] +impl Operation for AddCannedPolicy { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle AddCannedPolicy"); + + let query = { + if let Some(query) = req.uri.query() { + let input: PolicyNameQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + input + } else { + PolicyNameQuery::default() + } + }; + + if query.name.is_empty() { + return Err(s3_error!(InvalidArgument, "policy name is empty")); + } + + if has_space_be(&query.name) { + return Err(s3_error!(InvalidArgument, "policy name has space")); + } + + let mut input = req.input; + let policy_bytes = match input.store_all_unlimited().await { + Ok(b) => b, + Err(e) => { + warn!("get body failed, e: {:?}", e); + return Err(s3_error!(InvalidRequest, "get body failed")); + } + }; + + let policy = Policy::parse_config(policy_bytes.as_ref()).map_err(|e| { + warn!("parse policy failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InvalidRequest, e.to_string()) + })?; + + if policy.version.is_empty() { + return Err(s3_error!(InvalidRequest, "policy version is empty")); + } + + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) }; + + iam_store.set_policy(&query.name, policy).await.map_err(|e| { + warn!("set policy failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InternalError, e.to_string()) + })?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + } +} + +pub struct InfoCannedPolicy {} +#[async_trait::async_trait] +impl Operation for InfoCannedPolicy { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle InfoCannedPolicy"); + + let query = { + if let Some(query) = req.uri.query() { + let input: PolicyNameQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + input + } else { + PolicyNameQuery::default() + } + }; + + if query.name.is_empty() { + return Err(s3_error!(InvalidArgument, "policy name is empty")); + } + + let policies = MappedPolicy::new(&query.name).to_slice(); + if policies.len() != 1 { + return Err(s3_error!(InvalidArgument, "too many policies")); + } + + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) }; + + let pd = iam_store.info_policy(&query.name).await.map_err(|e| { + warn!("info policy failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InternalError, e.to_string()) + })?; + + let body = serde_json::to_vec(&pd).map_err(|e| s3_error!(InternalError, "marshal body failed, e: {:?}", e))?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), header)) + } +} + +pub struct RemoveCannedPolicy {} +#[async_trait::async_trait] +impl Operation for RemoveCannedPolicy { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle RemoveCannedPolicy"); + + let query = { + if let Some(query) = req.uri.query() { + let input: PolicyNameQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + input + } else { + PolicyNameQuery::default() + } + }; + + if query.name.is_empty() { + return Err(s3_error!(InvalidArgument, "policy name is empty")); + } + + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) }; + + iam_store.delete_policy(&query.name, true).await.map_err(|e| { + warn!("delete policy failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InternalError, e.to_string()) + })?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + } +} + +#[derive(Debug, Deserialize, Default)] +pub struct SetPolicyForUserOrGroupQuery { + #[serde(rename = "policyName")] + pub policy_name: String, + #[serde(rename = "userOrGroup")] + pub user_or_group: String, + #[serde(rename = "isGroup")] + pub is_group: bool, +} + +pub struct SetPolicyForUserOrGroup {} +#[async_trait::async_trait] +impl Operation for SetPolicyForUserOrGroup { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle SetPolicyForUserOrGroup"); + + let query = { + if let Some(query) = req.uri.query() { + let input: SetPolicyForUserOrGroupQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + input + } else { + SetPolicyForUserOrGroupQuery::default() + } + }; + + if query.policy_name.is_empty() { + return Err(s3_error!(InvalidArgument, "policy name is empty")); + } + + if query.user_or_group.is_empty() { + return Err(s3_error!(InvalidArgument, "user or group is empty")); + } + + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) }; + + if !query.is_group { + match iam_store.is_temp_user(&query.user_or_group).await { + Ok((ok, _)) => { + if ok { + return Err(s3_error!(InvalidArgument, "temp user can't set policy")); + } + } + Err(err) => { + if !is_err_no_such_user(&err) { + warn!("is temp user failed, e: {:?}", err); + return Err(S3Error::with_message(S3ErrorCode::InternalError, err.to_string())); + } + } + }; + + let Some(sys_cred) = get_global_action_cred() else { + return Err(s3_error!(InternalError, "get global action cred failed")); + }; + + if query.user_or_group == sys_cred.access_key { + return Err(s3_error!(InvalidArgument, "can't set policy for system user")); + } + } + + if !query.is_group { + if iam_store.get_user(&query.user_or_group).await.is_none() { + return Err(s3_error!(InvalidArgument, "user not exist")); + } + } else { + iam_store.get_group_description(&query.user_or_group).await.map_err(|e| { + warn!("get group description failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InternalError, e.to_string()) + })?; + } + + iam_store + .policy_db_set(&query.user_or_group, iam::store::UserType::Reg, query.is_group, &query.policy_name) + .await + .map_err(|e| { + warn!("policy db set failed, e: {:?}", e); + S3Error::with_message(S3ErrorCode::InternalError, e.to_string()) + })?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + } +} diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index e30b5f0f..6c960987 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -243,12 +243,20 @@ impl Operation for RemoveUser { return Err(s3_error!(InvalidArgument, "can't remove temp user")); } - let (cred, _owner) = check_key_valid(get_session_token(&req.headers), ak).await?; + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let (cred, _owner) = check_key_valid(get_session_token(&req.headers), &input_cred.access_key).await?; let sys_cred = get_global_action_cred() .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "get_global_action_cred failed"))?; if ak == sys_cred.access_key || ak == cred.access_key { + warn!( + "can't remove self or system access key {}, {}, {}", + ak, sys_cred.access_key, cred.access_key + ); return Err(s3_error!(InvalidArgument, "can't remove self")); } diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 43079517..c0b5fa27 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -5,7 +5,7 @@ pub mod utils; use common::error::Result; // use ecstore::global::{is_dist_erasure, is_erasure}; use handlers::{ - group, + group, policy, service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount}, user, }; @@ -218,5 +218,40 @@ fn regist_user_route(r: &mut S3Router) -> Result<()> { AdminOperation(&AddServiceAccount {}), )?; + // list-canned-policies?bucket=xxx + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/list-canned-policies").as_str(), + AdminOperation(&policy::ListCannedPolicies {}), + )?; + + // info-canned-policy?name=xxx + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/info-canned-policy").as_str(), + AdminOperation(&policy::InfoCannedPolicy {}), + )?; + + // add-canned-policy?name=xxx + r.insert( + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/v3/add-canned-policy").as_str(), + AdminOperation(&policy::AddCannedPolicy {}), + )?; + + // remove-canned-policy?name=xxx + r.insert( + Method::DELETE, + format!("{}{}", ADMIN_PREFIX, "/v3/remove-canned-policy").as_str(), + AdminOperation(&policy::RemoveCannedPolicy {}), + )?; + + // set-user-or-group-policy?policyName=xxx&userOrGroup=xxx&isGroup=xxx + r.insert( + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/v3/set-user-or-group-policy").as_str(), + AdminOperation(&policy::SetPolicyForUserOrGroup {}), + )?; + Ok(()) } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 83eac1c2..f9f09b22 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -169,7 +169,7 @@ async fn run(opt: config::Opt) -> Result<()> { let hybrid_service = TowerToHyperService::new( tower::ServiceBuilder::new() - .layer(CorsLayer::very_permissive()) + .layer(CorsLayer::permissive()) .service(hybrid(hyper_service, rpc_service)), );