add policy api

This commit is contained in:
weisd
2025-01-22 11:08:59 +08:00
parent 9bfd259b03
commit e4e2fa23ce
6 changed files with 322 additions and 5 deletions

View File

@@ -1098,7 +1098,7 @@ where
fn update_user_with_claims(&self, k: &str, u: UserIdentity) -> Result<()> {
let mut u = u;
if u.credentials.session_token.is_empty() {
if !u.credentials.session_token.is_empty() {
u.credentials.claims = Some(extract_jwt_claims(&u)?);
}

View File

@@ -1,7 +1,6 @@
use super::router::Operation;
use crate::storage::error::to_s3_error;
use bytes::Bytes;
use const_str::from_utf8;
use ecstore::admin_server_info::get_server_info;
use ecstore::bucket::policy::action::{Action, ActionSet};
use ecstore::bucket::policy::bucket_policy::{BPStatement, BucketPolicy};
@@ -56,6 +55,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info, warn};
pub mod group;
pub mod policy;
pub mod service_account;
pub mod trace;
pub mod user;

View File

@@ -0,0 +1,274 @@
use std::collections::HashMap;
use crate::admin::{router::Operation, utils::has_space_be};
use http::{HeaderMap, StatusCode};
use iam::{error::is_err_no_such_user, get_global_action_cred, policy::Policy, store::MappedPolicy};
use matchit::Params;
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use tracing::warn;
#[derive(Debug, Deserialize, Default)]
pub struct BucketQuery {
pub bucket: String,
}
pub struct ListCannedPolicies {}
#[async_trait::async_trait]
impl Operation for ListCannedPolicies {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ListCannedPolicies");
let query = {
if let Some(query) = req.uri.query() {
let input: BucketQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
input
} else {
BucketQuery::default()
}
};
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
let policies = iam_store.list_polices(&query.bucket).await.map_err(|e| {
warn!("list policies failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;
let kvs: HashMap<String, Policy> = policies
.into_iter()
.filter(|(_, v)| serde_json::to_string(v).is_ok())
.collect();
let body = serde_json::to_vec(&kvs).map_err(|e| s3_error!(InternalError, "marshal body failed, e: {:?}", e))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), header))
}
}
#[derive(Debug, Deserialize, Default)]
pub struct PolicyNameQuery {
pub name: String,
}
pub struct AddCannedPolicy {}
#[async_trait::async_trait]
impl Operation for AddCannedPolicy {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle AddCannedPolicy");
let query = {
if let Some(query) = req.uri.query() {
let input: PolicyNameQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
input
} else {
PolicyNameQuery::default()
}
};
if query.name.is_empty() {
return Err(s3_error!(InvalidArgument, "policy name is empty"));
}
if has_space_be(&query.name) {
return Err(s3_error!(InvalidArgument, "policy name has space"));
}
let mut input = req.input;
let policy_bytes = match input.store_all_unlimited().await {
Ok(b) => b,
Err(e) => {
warn!("get body failed, e: {:?}", e);
return Err(s3_error!(InvalidRequest, "get body failed"));
}
};
let policy = Policy::parse_config(policy_bytes.as_ref()).map_err(|e| {
warn!("parse policy failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InvalidRequest, e.to_string())
})?;
if policy.version.is_empty() {
return Err(s3_error!(InvalidRequest, "policy version is empty"));
}
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
iam_store.set_policy(&query.name, policy).await.map_err(|e| {
warn!("set policy failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
pub struct InfoCannedPolicy {}
#[async_trait::async_trait]
impl Operation for InfoCannedPolicy {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle InfoCannedPolicy");
let query = {
if let Some(query) = req.uri.query() {
let input: PolicyNameQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
input
} else {
PolicyNameQuery::default()
}
};
if query.name.is_empty() {
return Err(s3_error!(InvalidArgument, "policy name is empty"));
}
let policies = MappedPolicy::new(&query.name).to_slice();
if policies.len() != 1 {
return Err(s3_error!(InvalidArgument, "too many policies"));
}
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
let pd = iam_store.info_policy(&query.name).await.map_err(|e| {
warn!("info policy failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;
let body = serde_json::to_vec(&pd).map_err(|e| s3_error!(InternalError, "marshal body failed, e: {:?}", e))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), header))
}
}
pub struct RemoveCannedPolicy {}
#[async_trait::async_trait]
impl Operation for RemoveCannedPolicy {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RemoveCannedPolicy");
let query = {
if let Some(query) = req.uri.query() {
let input: PolicyNameQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
input
} else {
PolicyNameQuery::default()
}
};
if query.name.is_empty() {
return Err(s3_error!(InvalidArgument, "policy name is empty"));
}
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
iam_store.delete_policy(&query.name, true).await.map_err(|e| {
warn!("delete policy failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
#[derive(Debug, Deserialize, Default)]
pub struct SetPolicyForUserOrGroupQuery {
#[serde(rename = "policyName")]
pub policy_name: String,
#[serde(rename = "userOrGroup")]
pub user_or_group: String,
#[serde(rename = "isGroup")]
pub is_group: bool,
}
pub struct SetPolicyForUserOrGroup {}
#[async_trait::async_trait]
impl Operation for SetPolicyForUserOrGroup {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle SetPolicyForUserOrGroup");
let query = {
if let Some(query) = req.uri.query() {
let input: SetPolicyForUserOrGroupQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
input
} else {
SetPolicyForUserOrGroupQuery::default()
}
};
if query.policy_name.is_empty() {
return Err(s3_error!(InvalidArgument, "policy name is empty"));
}
if query.user_or_group.is_empty() {
return Err(s3_error!(InvalidArgument, "user or group is empty"));
}
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
if !query.is_group {
match iam_store.is_temp_user(&query.user_or_group).await {
Ok((ok, _)) => {
if ok {
return Err(s3_error!(InvalidArgument, "temp user can't set policy"));
}
}
Err(err) => {
if !is_err_no_such_user(&err) {
warn!("is temp user failed, e: {:?}", err);
return Err(S3Error::with_message(S3ErrorCode::InternalError, err.to_string()));
}
}
};
let Some(sys_cred) = get_global_action_cred() else {
return Err(s3_error!(InternalError, "get global action cred failed"));
};
if query.user_or_group == sys_cred.access_key {
return Err(s3_error!(InvalidArgument, "can't set policy for system user"));
}
}
if !query.is_group {
if iam_store.get_user(&query.user_or_group).await.is_none() {
return Err(s3_error!(InvalidArgument, "user not exist"));
}
} else {
iam_store.get_group_description(&query.user_or_group).await.map_err(|e| {
warn!("get group description failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;
}
iam_store
.policy_db_set(&query.user_or_group, iam::store::UserType::Reg, query.is_group, &query.policy_name)
.await
.map_err(|e| {
warn!("policy db set failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}

View File

@@ -243,12 +243,20 @@ impl Operation for RemoveUser {
return Err(s3_error!(InvalidArgument, "can't remove temp user"));
}
let (cred, _owner) = check_key_valid(get_session_token(&req.headers), ak).await?;
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, _owner) = check_key_valid(get_session_token(&req.headers), &input_cred.access_key).await?;
let sys_cred = get_global_action_cred()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "get_global_action_cred failed"))?;
if ak == sys_cred.access_key || ak == cred.access_key {
warn!(
"can't remove self or system access key {}, {}, {}",
ak, sys_cred.access_key, cred.access_key
);
return Err(s3_error!(InvalidArgument, "can't remove self"));
}

View File

@@ -5,7 +5,7 @@ pub mod utils;
use common::error::Result;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::{
group,
group, policy,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
user,
};
@@ -218,5 +218,40 @@ fn regist_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
AdminOperation(&AddServiceAccount {}),
)?;
// list-canned-policies?bucket=xxx
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-canned-policies").as_str(),
AdminOperation(&policy::ListCannedPolicies {}),
)?;
// info-canned-policy?name=xxx
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/info-canned-policy").as_str(),
AdminOperation(&policy::InfoCannedPolicy {}),
)?;
// add-canned-policy?name=xxx
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/add-canned-policy").as_str(),
AdminOperation(&policy::AddCannedPolicy {}),
)?;
// remove-canned-policy?name=xxx
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-canned-policy").as_str(),
AdminOperation(&policy::RemoveCannedPolicy {}),
)?;
// set-user-or-group-policy?policyName=xxx&userOrGroup=xxx&isGroup=xxx
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-user-or-group-policy").as_str(),
AdminOperation(&policy::SetPolicyForUserOrGroup {}),
)?;
Ok(())
}

View File

@@ -169,7 +169,7 @@ async fn run(opt: config::Opt) -> Result<()> {
let hybrid_service = TowerToHyperService::new(
tower::ServiceBuilder::new()
.layer(CorsLayer::very_permissive())
.layer(CorsLayer::permissive())
.service(hybrid(hyper_service, rpc_service)),
);