Merge pull request #210 from rustfs/feat/admin-api

Feat/admin api
This commit is contained in:
weisd
2025-01-14 10:43:56 +08:00
committed by GitHub
13 changed files with 294 additions and 42 deletions

View File

@@ -11,7 +11,7 @@ use time::OffsetDateTime;
use crate::{
auth::UserIdentity,
policy::{Args, MappedPolicy, Policy, PolicyDoc},
policy::{Args, GroupInfo, MappedPolicy, Policy, PolicyDoc},
Error,
};
@@ -21,7 +21,7 @@ pub struct Cache {
pub user_policies: ArcSwap<CacheEntity<MappedPolicy>>,
pub sts_accounts: ArcSwap<CacheEntity<UserIdentity>>,
pub sts_policies: ArcSwap<CacheEntity<MappedPolicy>>,
pub groups: ArcSwap<CacheEntity<String>>,
pub groups: ArcSwap<CacheEntity<GroupInfo>>,
pub user_group_memeberships: ArcSwap<CacheEntity<HashSet<String>>>,
pub group_policies: ArcSwap<CacheEntity<MappedPolicy>>,
}
@@ -89,7 +89,7 @@ impl Cache {
impl CacheInner {
#[inline]
pub fn get_user<'a>(&self, user_name: &'a str) -> Option<&UserIdentity> {
pub fn get_user(&self, user_name: &str) -> Option<&UserIdentity> {
self.users.get(user_name).or_else(|| self.sts_accounts.get(user_name))
}
@@ -99,7 +99,7 @@ impl CacheInner {
/// 如果是临时用户返回Ok(Some(partent_name)))
/// 如果不是临时用户返回Ok(None)
fn is_temp_user<'a>(&self, user_name: &'a str) -> crate::Result<Option<&str>> {
fn is_temp_user(&self, user_name: &str) -> crate::Result<Option<&str>> {
let user = self
.get_user(user_name)
.ok_or_else(|| Error::NoSuchUser(user_name.to_owned()))?;
@@ -113,7 +113,7 @@ impl CacheInner {
/// 如果是临时用户返回Ok(Some(partent_name)))
/// 如果不是临时用户返回Ok(None)
fn is_service_account<'a>(&self, user_name: &'a str) -> crate::Result<Option<&str>> {
fn is_service_account(&self, user_name: &str) -> crate::Result<Option<&str>> {
let user = self
.get_user(user_name)
.ok_or_else(|| Error::NoSuchUser(user_name.to_owned()))?;
@@ -137,7 +137,7 @@ impl CacheInner {
false
}
pub fn is_allowed(&self, args: Args) -> bool {
pub fn is_allowed(&self, _args: Args) -> bool {
todo!()
}
@@ -199,7 +199,7 @@ pub struct CacheInner {
pub user_policies: G<MappedPolicy>,
pub sts_accounts: G<UserIdentity>,
pub sts_policies: G<MappedPolicy>,
pub groups: G<String>,
pub groups: G<GroupInfo>,
pub user_group_memeberships: G<HashSet<String>>,
pub group_policies: G<MappedPolicy>,
}
@@ -239,7 +239,7 @@ mod tests {
for (index, key) in (0..100).map(|x| x.to_string()).enumerate() {
let c = &cache;
f.push(async move {
Cache::add_or_update(&c, &key, &index, OffsetDateTime::now_utc());
Cache::add_or_update(c, &key, &index, OffsetDateTime::now_utc());
});
}
join_all(f).await;
@@ -259,7 +259,7 @@ mod tests {
for (index, key) in (0..100).map(|x| x.to_string()).enumerate() {
let c = &cache;
f.push(async move {
Cache::add_or_update(&c, &key, &index, OffsetDateTime::now_utc());
Cache::add_or_update(c, &key, &index, OffsetDateTime::now_utc());
});
}
join_all(f).await;
@@ -274,7 +274,7 @@ mod tests {
for (index, key) in (0..100).map(|x| x.to_string()).enumerate() {
let c = &cache;
f.push(async move {
Cache::add_or_update(&c, &key, &(index * 1000), OffsetDateTime::now_utc());
Cache::add_or_update(c, &key, &(index * 1000), OffsetDateTime::now_utc());
});
}
join_all(f).await;
@@ -294,7 +294,7 @@ mod tests {
for (index, key) in (0..100).map(|x| x.to_string()).enumerate() {
let c = &cache;
f.push(async move {
Cache::add_or_update(&c, &key, &index, OffsetDateTime::now_utc());
Cache::add_or_update(c, &key, &index, OffsetDateTime::now_utc());
});
}
join_all(f).await;
@@ -309,7 +309,7 @@ mod tests {
for key in (0..100).map(|x| x.to_string()) {
let c = &cache;
f.push(async move {
Cache::delete(&c, &key, OffsetDateTime::now_utc());
Cache::delete(c, &key, OffsetDateTime::now_utc());
});
}
join_all(f).await;

View File

@@ -17,6 +17,12 @@ pub enum Error {
#[error("user '{0}' does not exist")]
NoSuchUser(String),
#[error("group '{0}' does not exist")]
NoSuchGroup(String),
#[error("group not empty")]
GroupNotEmpty,
#[error("invalid arguments specified")]
InvalidArgument,
@@ -57,3 +63,7 @@ pub enum Error {
}
pub type Result<T> = std::result::Result<T, Error>;
pub fn is_err_no_such_user(e: &Error) -> bool {
matches!(e, Error::NoSuchUser(_))
}

View File

@@ -128,11 +128,7 @@ pub async fn delete_user(ak: &str, _notify: bool) -> crate::Result<()> {
}
pub async fn is_temp_user(ak: &str) -> crate::Result<(bool, String)> {
if let Some(user) = get()?.get_user(ak).await? {
Ok((user.credentials.is_temp(), user.credentials.parent_user))
} else {
Err(Error::NoSuchUser(ak.to_string()))
}
get()?.is_temp_user(ak).await
}
pub async fn get_user_info(ak: &str) -> crate::Result<madmin::UserInfo> {
@@ -146,3 +142,7 @@ pub async fn set_user_status(ak: &str, status: AccountStatus) -> crate::Result<O
pub async fn list_service_accounts(ak: &str) -> crate::Result<Vec<Credentials>> {
get()?.list_service_accounts(ak).await
}
pub async fn remove_users_from_group(group: &str, members: Vec<String>) -> crate::Result<OffsetDateTime> {
get()?.remove_users_from_group(group, members).await
}

View File

@@ -94,7 +94,7 @@ where
Ok(())
}
async fn notify(&self) {
async fn _notify(&self) {
self.send_chan.send(OffsetDateTime::now_utc().unix_timestamp()).await.unwrap();
}
@@ -354,7 +354,7 @@ where
Ok(user_entiry.update_at.unwrap_or(OffsetDateTime::now_utc()))
}
pub async fn delete_user(&self, access_key: &str, utype: UserType) -> crate::Result<()> {
pub async fn delete_user(&self, access_key: &str, _utype: UserType) -> crate::Result<()> {
let users = self.cache.users.load();
if let Some(x) = users.get(access_key) {
if x.credentials.is_temp() {
@@ -363,6 +363,7 @@ where
}
// if utype == UserType::Reg {}
// TODO: Delete user from group memberships
let path = format!("config/iam/{}{}/identity.json", UserType::Reg.prefix(), access_key);
debug!("delete object: {path:?}");
@@ -467,4 +468,62 @@ where
Ok(user_entiry.update_at.unwrap_or(OffsetDateTime::now_utc()))
}
pub async fn is_temp_user(&self, access_key: &str) -> crate::Result<(bool, String)> {
let users = self.cache.users.load();
let u = match users.get(access_key) {
Some(u) => u,
None => return Err(Error::NoSuchUser(access_key.to_string())),
};
if u.credentials.is_temp() {
Ok((true, u.credentials.parent_user.clone()))
} else {
Ok((false, String::new()))
}
}
pub async fn remove_users_from_group(&self, group: &str, members: Vec<String>) -> crate::Result<OffsetDateTime> {
if group.is_empty() {
return Err(Error::InvalidArgument);
}
let users = self.cache.users.load();
let groups = self.cache.groups.load();
let group_members_cache = self.cache.user_group_memeberships.load();
for member in members.iter() {
let u = users.get(member).ok_or(Error::NoSuchUser(member.to_string()))?;
if u.credentials.is_temp() || u.credentials.is_service_account() {
return Err(Error::IAMActionNotAllowed);
}
}
let group_info = groups.get(group).ok_or(Error::NoSuchGroup(group.to_string()))?;
let mut group_members = match group_members_cache.get(group) {
Some(m) => m.clone(),
None => return Err(Error::NoSuchGroup(group.to_string())),
};
if members.is_empty() && !group_members.is_empty() {
return Err(Error::GroupNotEmpty);
}
if members.is_empty() {
group_members.clear();
} else {
for member in members.iter() {
group_members.remove(member);
}
}
let path = format!("config/iam/group/{}.json", group);
debug!("save object: {path:?}");
self.api.save_iam_config(&members, path).await?;
Cache::add_or_update(&self.cache.user_group_memeberships, group, &group_members, OffsetDateTime::now_utc());
Ok(OffsetDateTime::now_utc())
}
}

View File

@@ -23,7 +23,8 @@ pub trait Store: Clone + Send + Sync + 'static {
async fn load_all(&self, cache: &Cache) -> crate::Result<()>;
fn get_default_policyes() -> HashMap<String, PolicyDoc> {
DEFAULT_POLICIES
let default_policies = DEFAULT_POLICIES;
default_policies
.iter()
.map(|(n, p)| {
(

View File

@@ -1,7 +1,7 @@
use crate::Error;
use jsonwebtoken::{encode, Algorithm, DecodingKey, EncodingKey, Header};
use rand::{Rng, RngCore};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{de::DeserializeOwned, Serialize};
pub fn gen_access_key(length: usize) -> crate::Result<String> {
const ALPHA_NUMERIC_TABLE: [char; 36] = [

19
madmin/src/group.rs Normal file
View File

@@ -0,0 +1,19 @@
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum GroupStatus {
Enabled,
Disabled,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GroupAddRemove {
group: String,
pub members: Vec<String>,
#[serde(rename = "groupStatus")]
pub status: GroupStatus,
#[serde(rename = "isRemove")]
pub is_remove: bool,
}

View File

@@ -1,3 +1,4 @@
pub mod group;
pub mod heal_commands;
pub mod health;
pub mod info_commands;
@@ -8,5 +9,6 @@ pub mod trace;
pub mod user;
pub mod utils;
pub use group::*;
pub use info_commands::*;
pub use user::*;

View File

@@ -27,6 +27,7 @@ use iam::{auth, get_global_action_cred};
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;
use matchit::Params;
use s3s::header::CONTENT_TYPE;
use s3s::stream::{ByteStream, DynByteStream};
use s3s::{
dto::{AssumeRoleOutput, Credentials, Timestamp},
@@ -48,6 +49,7 @@ use tokio::{select, spawn};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info, warn};
pub mod group;
pub mod service_account;
pub mod trace;
pub mod user;
@@ -353,10 +355,13 @@ impl Operation for AccountInfoHandler {
policy: bucket_policy,
};
let output = serde_json::to_string(&info)
let data = serde_json::to_vec(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -379,10 +384,13 @@ impl Operation for ServerInfoHandler {
let info = get_server_info(true).await;
let output = serde_json::to_string(&info)
let data = serde_json::to_vec(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse serverInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -412,10 +420,13 @@ impl Operation for StorageInfoHandler {
let info = store.storage_info().await;
let output = serde_json::to_string(&info)
let data = serde_json::to_vec(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -435,10 +446,13 @@ impl Operation for DataUsageInfoHandler {
s3_error!(InternalError, "load_data_usage_from_backend failed")
})?;
let output = serde_json::to_string(&info)
let data = serde_json::to_vec(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse DataUsageInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -859,10 +873,13 @@ impl Operation for ListPools {
pools_status.push(state);
}
let output = serde_json::to_string(&pools_status)
let data = serde_json::to_vec(&pools_status)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -926,10 +943,13 @@ impl Operation for StatusPool {
let pools_status = store.status(idx).await.map_err(to_s3_error)?;
let output = serde_json::to_string(&pools_status)
let data = serde_json::to_vec(&pools_status)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}

View File

@@ -0,0 +1,97 @@
use http::StatusCode;
use iam::{error::is_err_no_such_user, get_global_action_cred};
use madmin::GroupAddRemove;
use matchit::Params;
use s3s::{s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use tracing::warn;
use crate::admin::router::Operation;
pub struct ListGroups {}
#[async_trait::async_trait]
impl Operation for ListGroups {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ListGroups");
Err(s3_error!(NotImplemented))
}
}
pub struct Group {}
#[async_trait::async_trait]
impl Operation for Group {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle Group");
Err(s3_error!(NotImplemented))
}
}
pub struct UpdateGroupMembers {}
#[async_trait::async_trait]
impl Operation for UpdateGroupMembers {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle UpdateGroupMembers");
let Some(body) = req.input.bytes() else {
return Err(s3_error!(InvalidRequest, "get body failed"));
};
let args: GroupAddRemove = serde_json::from_slice(&body)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("unmarshal body err {}", e)))?;
warn!("UpdateGroupMembers args {:?}", args);
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
for member in args.members.iter() {
match iam_store.is_temp_user(member).await {
Ok((is_temp, _)) => {
if is_temp {
return Err(S3Error::with_message(
S3ErrorCode::MethodNotAllowed,
format!("can't add temp user {}", member),
));
}
get_global_action_cred()
.map(|cred| {
if cred.access_key == *member {
return Err(S3Error::with_message(
S3ErrorCode::MethodNotAllowed,
format!("can't add root {}", member),
));
}
Ok(())
})
.unwrap_or_else(|| {
Err(S3Error::with_message(S3ErrorCode::InternalError, "get global cred failed".to_string()))
})?;
}
Err(e) => {
if !is_err_no_such_user(&e) {
return Err(S3Error::with_message(S3ErrorCode::InternalError, e.to_string()));
}
}
}
}
if args.is_remove {
warn!("remove group members");
} else {
warn!("add group members");
}
Err(s3_error!(NotImplemented))
}
}
pub struct SetGroupStatus {}
#[async_trait::async_trait]
impl Operation for SetGroupStatus {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle SetGroupStatus");
Err(s3_error!(NotImplemented))
}
}

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use http::HeaderMap;
use hyper::StatusCode;
use iam::{
auth::CredentialsBuilder,
@@ -10,7 +11,7 @@ use iam::{
};
use madmin::{AddServiceAccountReq, ListServiceAccountsResp, ServiceAccountInfo};
use matchit::Params;
use s3s::{s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde_urlencoded::from_bytes;
use tracing::{debug, warn};
@@ -277,7 +278,10 @@ impl Operation for ListServiceAccount {
let data = serde_json::to_vec(&ListServiceAccountsResp { accounts })
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("marshal users err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::from(data))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}

View File

@@ -1,8 +1,8 @@
use http::StatusCode;
use http::{HeaderMap, StatusCode};
use iam::get_global_action_cred;
use madmin::{AccountStatus, AddOrUpdateUserReq};
use matchit::Params;
use s3s::{s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use tracing::warn;
@@ -87,7 +87,10 @@ impl Operation for AddUser {
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("create_user err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::empty())))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
@@ -128,7 +131,10 @@ impl Operation for SetUserStatus {
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("set_user_status err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::empty())))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
@@ -151,7 +157,10 @@ impl Operation for ListUsers {
// let body = encrypt_data(input_cred.secret_key.expose().as_bytes(), &data)
// .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, format!("encrypt_data err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::from(data))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
@@ -188,7 +197,10 @@ impl Operation for RemoveUser {
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("delete_user err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::empty())))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
@@ -220,6 +232,9 @@ impl Operation for GetUserInfo {
let data = serde_json::to_vec(&info)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("marshal user err {}", e)))?;
Ok(S3Response::new((StatusCode::OK, Body::from(data))))
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}

View File

@@ -5,6 +5,7 @@ pub mod router;
use common::error::Result;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::{
group,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
user,
};
@@ -161,6 +162,30 @@ fn regist_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
AdminOperation(&user::SetUserStatus {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/groups").as_str(),
AdminOperation(&group::ListGroups {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/group").as_str(),
AdminOperation(&group::Group {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-group-status").as_str(),
AdminOperation(&group::SetGroupStatus {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/update-group-members").as_str(),
AdminOperation(&group::UpdateGroupMembers {}),
)?;
// Service accounts
r.insert(
Method::POST,