This commit is contained in:
weisd
2025-11-03 17:39:51 +08:00
committed by GitHub
parent a7f5c4af46
commit 769778e565
5 changed files with 155 additions and 12 deletions

View File

@@ -163,6 +163,83 @@ where
Ok(())
}
pub async fn load_user(&self, access_key: &str) -> Result<()> {
let mut users_map: HashMap<String, UserIdentity> = HashMap::new();
let mut user_policy_map = HashMap::new();
let mut sts_users_map = HashMap::new();
let mut sts_policy_map = HashMap::new();
let mut policy_docs_map = HashMap::new();
let _ = self.api.load_user(access_key, UserType::Svc, &mut users_map).await;
let parent_user = users_map.get(access_key).map(|svc| svc.credentials.parent_user.clone());
if let Some(parent_user) = parent_user {
let _ = self.api.load_user(&parent_user, UserType::Reg, &mut users_map).await;
let _ = self
.api
.load_mapped_policy(&parent_user, UserType::Reg, false, &mut user_policy_map)
.await;
} else {
let _ = self.api.load_user(access_key, UserType::Reg, &mut users_map).await;
if users_map.contains_key(access_key) {
let _ = self
.api
.load_mapped_policy(access_key, UserType::Reg, false, &mut user_policy_map)
.await;
}
let _ = self.api.load_user(access_key, UserType::Sts, &mut sts_users_map).await;
let has_sts_user = sts_users_map.get(access_key);
let sts_parent = has_sts_user.map(|sts| sts.credentials.parent_user.clone());
if let Some(parent) = sts_parent {
let _ = self
.api
.load_mapped_policy(&parent, UserType::Sts, false, &mut sts_policy_map)
.await;
}
let sts_user = has_sts_user.map(|sts| sts.credentials.access_key.clone());
if let Some(ref sts) = sts_user {
if let Some(plc) = sts_policy_map.get(sts) {
for p in plc.to_slice().iter() {
if !policy_docs_map.contains_key(p) {
let _ = self.api.load_policy_doc(p, &mut policy_docs_map).await;
}
}
}
}
}
if let Some(plc) = user_policy_map.get(access_key) {
for p in plc.to_slice().iter() {
if !policy_docs_map.contains_key(p) {
let _ = self.api.load_policy_doc(p, &mut policy_docs_map).await;
}
}
}
if let Some(user) = users_map.get(access_key) {
Cache::add_or_update(&self.cache.users, access_key, user, OffsetDateTime::now_utc());
}
if let Some(user_policy) = user_policy_map.get(access_key) {
Cache::add_or_update(&self.cache.user_policies, access_key, user_policy, OffsetDateTime::now_utc());
}
if let Some(sts_user) = sts_users_map.get(access_key) {
Cache::add_or_update(&self.cache.sts_accounts, access_key, sts_user, OffsetDateTime::now_utc());
}
if let Some(sts_policy) = sts_policy_map.get(access_key) {
Cache::add_or_update(&self.cache.sts_policies, access_key, sts_policy, OffsetDateTime::now_utc());
}
if let Some(policy_doc) = policy_docs_map.get(access_key) {
Cache::add_or_update(&self.cache.policy_docs, access_key, policy_doc, OffsetDateTime::now_utc());
}
Ok(())
}
// TODO: Check if exists, whether retry is possible
#[tracing::instrument(level = "debug", skip(self))]
async fn save_iam_formatter(self: Arc<Self>) -> Result<()> {
@@ -653,7 +730,11 @@ where
Some(p) => p.clone(),
None => {
let mut m = HashMap::new();
self.api.load_mapped_policy(name, UserType::Reg, false, &mut m).await?;
if let Err(err) = self.api.load_mapped_policy(name, UserType::Reg, false, &mut m).await {
if !is_err_no_such_policy(&err) {
return Err(err);
}
}
if let Some(p) = m.get(name) {
Cache::add_or_update(&self.cache.user_policies, name, p, OffsetDateTime::now_utc());
p.clone()
@@ -662,7 +743,11 @@ where
Some(p) => p.clone(),
None => {
let mut m = HashMap::new();
self.api.load_mapped_policy(name, UserType::Sts, false, &mut m).await?;
if let Err(err) = self.api.load_mapped_policy(name, UserType::Sts, false, &mut m).await {
if !is_err_no_such_policy(&err) {
return Err(err);
}
}
if let Some(p) = m.get(name) {
Cache::add_or_update(&self.cache.sts_policies, name, p, OffsetDateTime::now_utc());
p.clone()
@@ -694,7 +779,11 @@ where
Some(p) => p.clone(),
None => {
let mut m = HashMap::new();
self.api.load_mapped_policy(group, UserType::Reg, true, &mut m).await?;
if let Err(err) = self.api.load_mapped_policy(group, UserType::Reg, true, &mut m).await {
if !is_err_no_such_policy(&err) {
return Err(err);
}
}
if let Some(p) = m.get(group) {
Cache::add_or_update(&self.cache.group_policies, group, p, OffsetDateTime::now_utc());
p.clone()
@@ -736,7 +825,11 @@ where
Some(p) => p.clone(),
None => {
let mut m = HashMap::new();
self.api.load_mapped_policy(group, UserType::Reg, true, &mut m).await?;
if let Err(err) = self.api.load_mapped_policy(group, UserType::Reg, true, &mut m).await {
if !is_err_no_such_policy(&err) {
return Err(err);
}
}
if let Some(p) = m.get(group) {
Cache::add_or_update(&self.cache.group_policies, group, p, OffsetDateTime::now_utc());
p.clone()
@@ -1038,7 +1131,7 @@ where
}
}
self.api.delete_mapped_policy(access_key, utype, false).await?;
let _ = self.api.delete_mapped_policy(access_key, utype, false).await;
Cache::delete(&self.cache.user_policies, access_key, OffsetDateTime::now_utc());
@@ -1246,6 +1339,26 @@ where
Ok(self.cache.groups.load().keys().cloned().collect())
}
pub async fn update_groups(&self) -> Result<Vec<String>> {
let mut groups_set = HashSet::new();
let mut m = HashMap::new();
self.api.load_groups(&mut m).await?;
for (group, gi) in m.iter() {
Cache::add_or_update(&self.cache.groups, group, gi, OffsetDateTime::now_utc());
groups_set.insert(group.to_string());
}
let mut m = HashMap::new();
self.api.load_mapped_policies(UserType::Reg, true, &mut m).await?;
for (group, gi) in m.iter() {
Cache::add_or_update(&self.cache.group_policies, group, gi, OffsetDateTime::now_utc());
groups_set.insert(group.to_string());
}
Ok(groups_set.into_iter().collect())
}
pub async fn remove_members_from_group(
&self,
name: &str,
@@ -1312,9 +1425,17 @@ where
}
if members.is_empty() {
self.api.delete_mapped_policy(group, UserType::Reg, true).await?;
if let Err(err) = self.api.delete_mapped_policy(group, UserType::Reg, true).await {
if !is_err_no_such_policy(&err) {
return Err(err);
}
}
self.api.delete_group_info(group).await?;
if let Err(err) = self.api.delete_group_info(group).await {
if !is_err_no_such_group(&err) {
return Err(err);
}
}
Cache::delete(&self.cache.groups, group, OffsetDateTime::now_utc());
Cache::delete(&self.cache.group_policies, group, OffsetDateTime::now_utc());

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use super::{GroupInfo, MappedPolicy, Store, UserType};
use crate::error::{Error, Result, is_err_config_not_found};
use crate::error::{Error, Result, is_err_config_not_found, is_err_no_such_group};
use crate::{
cache::{Cache, CacheEntity},
error::{is_err_no_such_policy, is_err_no_such_user},
@@ -563,7 +563,11 @@ impl Store for ObjectStore {
if let Some(item) = v.item {
let name = rustfs_utils::path::dir(&item);
self.load_group(&name, m).await?;
if let Err(err) = self.load_group(&name, m).await {
if !is_err_no_such_group(&err) {
return Err(err);
}
}
}
}
let _ = ctx.cancel();

View File

@@ -626,7 +626,17 @@ impl<T: Store> IamSys<T> {
Ok((Some(res), ok))
}
None => Ok((None, false)),
None => {
let _ = self.store.load_user(access_key).await;
if let Some(res) = self.store.get_user(access_key).await {
let ok = res.credentials.is_valid();
Ok((Some(res), ok))
} else {
Ok((None, false))
}
}
}
}
@@ -667,6 +677,10 @@ impl<T: Store> IamSys<T> {
self.store.get_group_description(group).await
}
pub async fn list_groups_load(&self) -> Result<Vec<String>> {
self.store.update_groups().await
}
pub async fn list_groups(&self) -> Result<Vec<String>> {
self.store.list_groups().await
}

View File

@@ -62,7 +62,7 @@ impl Operation for ListGroups {
let Ok(iam_store) = rustfs_iam::get() else { return Err(s3_error!(InternalError, "iam not init")) };
let groups = iam_store.list_groups().await.map_err(|e| {
let groups = iam_store.list_groups_load().await.map_err(|e| {
warn!("list groups failed, e: {:?}", e);
S3Error::with_message(S3ErrorCode::InternalError, e.to_string())
})?;

View File

@@ -31,7 +31,7 @@ use serde_json::Value;
use serde_urlencoded::from_bytes;
use std::collections::HashMap;
use time::{Duration, OffsetDateTime};
use tracing::{info, warn};
use tracing::{error, info, warn};
const ASSUME_ROLE_ACTION: &str = "AssumeRole";
const ASSUME_ROLE_VERSION: &str = "2011-06-15";
@@ -116,6 +116,10 @@ impl Operation for AssumeRoleHandle {
};
if let Err(_err) = iam_store.policy_db_get(&cred.access_key, &cred.groups).await {
error!(
"AssumeRole get policy failed, err: {:?}, access_key: {:?}, groups: {:?}",
_err, cred.access_key, cred.groups
);
return Err(s3_error!(InvalidArgument, "invalid policy arg"));
}