Merge pull request #528 from rustfs/feat/import-iam

Feat/import iam
This commit is contained in:
weisd
2025-07-01 11:50:35 +08:00
committed by GitHub
11 changed files with 323 additions and 39 deletions

View File

@@ -26,7 +26,7 @@ pub use rules::BucketNotificationConfig;
use std::io::IsTerminal;
pub use target::Target;
use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt};
/// Initialize the tracing log system
///

View File

@@ -1,5 +1,5 @@
use super::pattern;
use crate::arn::{ArnError, TargetIDError, ARN};
use crate::arn::{ARN, ArnError, TargetIDError};
use crate::event::EventName;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

View File

@@ -1,6 +1,5 @@
/// process related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref PROCESS_LOCKS_READ_TOTAL_MD: MetricDescriptor =

View File

@@ -30,9 +30,7 @@ use crate::client::{
transition_api::{ReaderImpl, TransitionClient, UploadInfo},
utils::{is_amz_header, is_minio_header, is_rustfs_header, is_standard_header, is_storageclass_header},
};
use rustfs_utils::{
crypto::base64_encode,
};
use rustfs_utils::crypto::base64_encode;
#[derive(Debug, Clone)]
pub struct AdvancedPutOptions {

View File

@@ -1549,6 +1549,7 @@ where
);
}
}
UserType::None => {}
}
Ok(())

View File

@@ -59,6 +59,7 @@ pub enum UserType {
Svc,
Sts,
Reg,
None,
}
impl UserType {
@@ -67,6 +68,7 @@ impl UserType {
UserType::Svc => "service-accounts/",
UserType::Sts => "sts/",
UserType::Reg => "users/",
UserType::None => "",
}
}
}

View File

@@ -445,6 +445,7 @@ impl Store for ObjectStore {
UserType::Reg => IAM_CONFIG_USERS_PREFIX.as_str(),
UserType::Svc => IAM_CONFIG_SERVICE_ACCOUNTS_PREFIX.as_str(),
UserType::Sts => IAM_CONFIG_STS_PREFIX.as_str(),
UserType::None => "",
};
let (ctx_tx, ctx_rx) = broadcast::channel(1);

View File

@@ -416,7 +416,7 @@ impl<T: Store> IamSys<T> {
extract_jwt_claims(&u)
}
pub async fn delete_service_account(&self, access_key: &str) -> Result<()> {
pub async fn delete_service_account(&self, access_key: &str, _notify: bool) -> Result<()> {
let Some(u) = self.store.get_user(access_key).await else {
return Ok(());
};

View File

@@ -576,7 +576,7 @@ impl Operation for DeleteServiceAccount {
}
}
iam_store.delete_service_account(&query.access_key).await.map_err(|e| {
iam_store.delete_service_account(&query.access_key, true).await.map_err(|e| {
debug!("delete service account failed, e: {:?}", e);
s3_error!(InternalError, "delete service account failed")
})?;

View File

@@ -4,10 +4,13 @@ use crate::{
};
use ecstore::global::get_global_action_cred;
use http::{HeaderMap, StatusCode};
use iam::store::UserType;
use iam::{
store::{GroupInfo, MappedPolicy, UserType},
sys::NewServiceAccountOpts,
};
use madmin::{
AccountStatus, AddOrUpdateUserReq,
user::{SRSessionPolicy, SRSvcAccCreate},
AccountStatus, AddOrUpdateUserReq, IAMEntities, IAMErrEntities, IAMErrEntity, IAMErrPolicyEntity,
user::{ImportIAMResult, SRSessionPolicy, SRSvcAccCreate},
};
use matchit::Params;
use policy::policy::{
@@ -443,7 +446,7 @@ impl Operation for ExportIam {
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
ALL_GROUPS_FILE => {
let mut groups = HashMap::new();
let mut groups: HashMap<String, GroupInfo> = HashMap::new();
iam_store
.load_groups(&mut groups)
.await
@@ -514,7 +517,7 @@ impl Operation for ExportIam {
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
USER_POLICY_MAPPINGS_FILE => {
let mut user_policy_mappings = HashMap::new();
let mut user_policy_mappings: HashMap<String, MappedPolicy> = HashMap::new();
iam_store
.load_mapped_policys(UserType::Reg, false, &mut user_policy_mappings)
.await
@@ -546,7 +549,7 @@ impl Operation for ExportIam {
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
}
STS_USER_POLICY_MAPPINGS_FILE => {
let mut sts_user_policy_mappings = HashMap::new();
let mut sts_user_policy_mappings: HashMap<String, MappedPolicy> = HashMap::new();
iam_store
.load_mapped_policys(UserType::Sts, false, &mut sts_user_policy_mappings)
.await
@@ -600,6 +603,11 @@ impl Operation for ImportIam {
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let skipped = IAMEntities::default();
let mut removed = IAMEntities::default();
let mut added = IAMEntities::default();
let mut failed = IAMErrEntities::default();
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_POLICIES_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
@@ -619,17 +627,19 @@ impl Operation for ImportIam {
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (name, policy) in policies {
if policy.id.is_empty() {
iam_store
.delete_policy(&name, true)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let res = iam_store.delete_policy(&name, true).await;
removed.policies.push(name.clone());
if let Err(e) = res {
return Err(s3_error!(InternalError, "delete policy failed, name: {name}, err: {e}"));
}
continue;
}
iam_store
.set_policy(&name, policy)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let res = iam_store.set_policy(&name, policy).await;
added.policies.push(name.clone());
if let Err(e) = res {
return Err(s3_error!(InternalError, "set policy failed, name: {name}, err: {e}"));
}
}
}
}
@@ -668,17 +678,290 @@ impl Operation for ImportIam {
return Err(s3_error!(InvalidArgument, "has space be"));
}
iam_store
.create_user(&ak, &req)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
if let Err(e) = iam_store.create_user(&ak, &req).await {
failed.users.push(IAMErrEntity {
name: ak.clone(),
error: e.to_string(),
});
} else {
added.users.push(ak.clone());
}
}
}
}
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_GROUPS_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let groups: HashMap<String, GroupInfo> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (group_name, group_info) in groups {
if let Err(e) = iam_store.get_group_description(&group_name).await {
if matches!(e, iam::error::Error::NoSuchGroup(_)) || has_space_be(&group_name) {
return Err(s3_error!(InvalidArgument, "group not found or has space be"));
}
}
if let Err(e) = iam_store.add_users_to_group(&group_name, group_info.members.clone()).await {
failed.groups.push(IAMErrEntity {
name: group_name.clone(),
error: e.to_string(),
});
} else {
added.groups.push(group_name.clone());
}
}
}
}
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_SVC_ACCTS_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let svc_accts: HashMap<String, SRSvcAccCreate> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (ak, req) in svc_accts {
if skipped.service_accounts.contains(&ak) {
continue;
}
let sp = if let Some(ps) = req.session_policy.as_str() {
let sp = policy::policy::Policy::parse_config(ps.as_bytes())
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(sp)
} else {
None
};
if has_space_be(&ak) {
return Err(s3_error!(InvalidArgument, "has space be {ak}"));
}
let mut update = true;
if let Err(e) = iam_store.get_service_account(&req.access_key).await {
if !matches!(e, iam::error::Error::NoSuchServiceAccount(_)) {
return Err(s3_error!(InvalidArgument, "failed to get service account {ak} {e}"));
}
update = false;
}
if update {
iam_store.delete_service_account(&req.access_key, true).await.map_err(|e| {
S3Error::with_message(
S3ErrorCode::InternalError,
format!("failed to delete service account {ak} {e}"),
)
})?;
}
let opts = NewServiceAccountOpts {
session_policy: sp,
access_key: ak.clone(),
secret_key: req.secret_key,
name: Some(req.name),
description: Some(req.description),
expiration: req.expiration,
allow_site_replicator_account: false,
claims: Some(req.claims),
};
let groups = if req.groups.is_empty() { None } else { Some(req.groups) };
if let Err(e) = iam_store.new_service_account(&req.parent, groups, opts).await {
failed.service_accounts.push(IAMErrEntity {
name: ak.clone(),
error: e.to_string(),
});
} else {
added.service_accounts.push(ak.clone());
}
}
}
}
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, USER_POLICY_MAPPINGS_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let user_policy_mappings: HashMap<String, MappedPolicy> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (user_name, policies) in user_policy_mappings {
let has_temp = match iam_store.is_temp_user(&user_name).await {
Ok((has_temp, _)) => has_temp,
Err(e) => {
if !matches!(e, iam::error::Error::NoSuchUser(_)) {
return Err(s3_error!(InternalError, "is temp user failed, name: {user_name}, err: {e}"));
}
false
}
};
if has_temp {
return Err(s3_error!(InvalidArgument, "can't set policy for temp user {user_name}"));
}
if let Err(e) = iam_store
.policy_db_set(&user_name, UserType::Reg, false, &policies.policies)
.await
{
failed.user_policies.push(IAMErrPolicyEntity {
name: user_name.clone(),
error: e.to_string(),
policies: policies.policies.split(',').map(|s| s.to_string()).collect(),
});
} else {
added.user_policies.push(HashMap::from([(
user_name.clone(),
policies.policies.split(',').map(|s| s.to_string()).collect(),
)]));
}
}
}
}
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, GROUP_POLICY_MAPPINGS_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let group_policy_mappings: HashMap<String, MappedPolicy> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (group_name, policies) in group_policy_mappings {
if skipped.groups.contains(&group_name) {
continue;
}
if let Err(e) = iam_store
.policy_db_set(&group_name, UserType::None, true, &policies.policies)
.await
{
failed.group_policies.push(IAMErrPolicyEntity {
name: group_name.clone(),
error: e.to_string(),
policies: policies.policies.split(',').map(|s| s.to_string()).collect(),
});
} else {
added.group_policies.push(HashMap::from([(
group_name.clone(),
policies.policies.split(',').map(|s| s.to_string()).collect(),
)]));
}
}
}
}
{
let file_path = path_join_buf(&[IAM_ASSETS_DIR, STS_USER_POLICY_MAPPINGS_FILE]);
let file_content = match zip_reader.by_name(file_path.as_str()) {
Err(ZipError::FileNotFound) => None,
Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")),
Ok(file) => {
let mut file = file;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
Some(file_content)
}
};
if let Some(file_content) = file_content {
let sts_user_policy_mappings: HashMap<String, MappedPolicy> = serde_json::from_slice(&file_content)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
for (user_name, policies) in sts_user_policy_mappings {
if skipped.users.contains(&user_name) {
continue;
}
let has_temp = match iam_store.is_temp_user(&user_name).await {
Ok((has_temp, _)) => has_temp,
Err(e) => {
if !matches!(e, iam::error::Error::NoSuchUser(_)) {
return Err(s3_error!(InternalError, "is temp user failed, name: {user_name}, err: {e}"));
}
false
}
};
if has_temp {
return Err(s3_error!(InvalidArgument, "can't set policy for temp user {user_name}"));
}
if let Err(e) = iam_store
.policy_db_set(&user_name, UserType::Sts, false, &policies.policies)
.await
{
failed.sts_policies.push(IAMErrPolicyEntity {
name: user_name.clone(),
error: e.to_string(),
policies: policies.policies.split(',').map(|s| s.to_string()).collect(),
});
} else {
added.sts_policies.push(HashMap::from([(
user_name.clone(),
policies.policies.split(',').map(|s| s.to_string()).collect(),
)]));
}
}
}
}
let ret = ImportIAMResult {
skipped,
removed,
added,
failed,
};
let body = serde_json::to_vec(&ret).map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), header))
}
}

View File

@@ -16,8 +16,8 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
@@ -35,13 +35,13 @@ use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::utils::serialize;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::cmd::bucket_replication::get_must_replicate_options;
use ecstore::cmd::bucket_replication::must_replicate;
use ecstore::cmd::bucket_replication::schedule_replication;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::compress::is_compressible;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::compress::is_compressible;
use ecstore::error::StorageError;
use ecstore::new_object_layer_fn;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
@@ -60,11 +60,11 @@ use futures::StreamExt;
use http::HeaderMap;
use lazy_static::lazy_static;
use policy::auth;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use query::instance::make_rustfsms;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
@@ -73,23 +73,23 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_utils::path::path_join_buf;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::dto::*;
use s3s::s3_error;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;