diff --git a/crates/notify/src/lib.rs b/crates/notify/src/lib.rs index d0fa376f..da069849 100644 --- a/crates/notify/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -26,7 +26,7 @@ pub use rules::BucketNotificationConfig; use std::io::IsTerminal; pub use target::Target; -use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter}; +use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt}; /// Initialize the tracing log system /// diff --git a/crates/notify/src/rules/xml_config.rs b/crates/notify/src/rules/xml_config.rs index b1f6f471..ea995ca9 100644 --- a/crates/notify/src/rules/xml_config.rs +++ b/crates/notify/src/rules/xml_config.rs @@ -1,5 +1,5 @@ use super::pattern; -use crate::arn::{ArnError, TargetIDError, ARN}; +use crate::arn::{ARN, ArnError, TargetIDError}; use crate::event::EventName; use serde::{Deserialize, Serialize}; use std::collections::HashSet; diff --git a/crates/obs/src/metrics/system_process.rs b/crates/obs/src/metrics/system_process.rs index a4cc9b9e..f021aabe 100644 --- a/crates/obs/src/metrics/system_process.rs +++ b/crates/obs/src/metrics/system_process.rs @@ -1,6 +1,5 @@ /// process related metric descriptors -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; - +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref PROCESS_LOCKS_READ_TOTAL_MD: MetricDescriptor = diff --git a/ecstore/src/client/api_put_object.rs b/ecstore/src/client/api_put_object.rs index 7e304f50..e71e6d0e 100644 --- a/ecstore/src/client/api_put_object.rs +++ b/ecstore/src/client/api_put_object.rs @@ -30,9 +30,7 @@ use crate::client::{ transition_api::{ReaderImpl, TransitionClient, UploadInfo}, utils::{is_amz_header, is_minio_header, is_rustfs_header, is_standard_header, is_storageclass_header}, }; -use rustfs_utils::{ - crypto::base64_encode, -}; +use rustfs_utils::crypto::base64_encode; #[derive(Debug, Clone)] pub struct AdvancedPutOptions { diff --git a/iam/src/manager.rs b/iam/src/manager.rs index 9551f455..53d81436 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -1549,6 +1549,7 @@ where ); } } + UserType::None => {} } Ok(()) diff --git a/iam/src/store.rs b/iam/src/store.rs index 54bc25ed..04650ca6 100644 --- a/iam/src/store.rs +++ b/iam/src/store.rs @@ -59,6 +59,7 @@ pub enum UserType { Svc, Sts, Reg, + None, } impl UserType { @@ -67,6 +68,7 @@ impl UserType { UserType::Svc => "service-accounts/", UserType::Sts => "sts/", UserType::Reg => "users/", + UserType::None => "", } } } diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index e623cd46..533ef6f0 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -445,6 +445,7 @@ impl Store for ObjectStore { UserType::Reg => IAM_CONFIG_USERS_PREFIX.as_str(), UserType::Svc => IAM_CONFIG_SERVICE_ACCOUNTS_PREFIX.as_str(), UserType::Sts => IAM_CONFIG_STS_PREFIX.as_str(), + UserType::None => "", }; let (ctx_tx, ctx_rx) = broadcast::channel(1); diff --git a/iam/src/sys.rs b/iam/src/sys.rs index e7a663f3..00fc2302 100644 --- a/iam/src/sys.rs +++ b/iam/src/sys.rs @@ -416,7 +416,7 @@ impl IamSys { extract_jwt_claims(&u) } - pub async fn delete_service_account(&self, access_key: &str) -> Result<()> { + pub async fn delete_service_account(&self, access_key: &str, _notify: bool) -> Result<()> { let Some(u) = self.store.get_user(access_key).await else { return Ok(()); }; diff --git a/rustfs/src/admin/handlers/service_account.rs b/rustfs/src/admin/handlers/service_account.rs index e676bf43..c0e2013d 100644 --- a/rustfs/src/admin/handlers/service_account.rs +++ b/rustfs/src/admin/handlers/service_account.rs @@ -576,7 +576,7 @@ impl Operation for DeleteServiceAccount { } } - iam_store.delete_service_account(&query.access_key).await.map_err(|e| { + iam_store.delete_service_account(&query.access_key, true).await.map_err(|e| { debug!("delete service account failed, e: {:?}", e); s3_error!(InternalError, "delete service account failed") })?; diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index bb164bbd..d8f60151 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -4,10 +4,13 @@ use crate::{ }; use ecstore::global::get_global_action_cred; use http::{HeaderMap, StatusCode}; -use iam::store::UserType; +use iam::{ + store::{GroupInfo, MappedPolicy, UserType}, + sys::NewServiceAccountOpts, +}; use madmin::{ - AccountStatus, AddOrUpdateUserReq, - user::{SRSessionPolicy, SRSvcAccCreate}, + AccountStatus, AddOrUpdateUserReq, IAMEntities, IAMErrEntities, IAMErrEntity, IAMErrPolicyEntity, + user::{ImportIAMResult, SRSessionPolicy, SRSvcAccCreate}, }; use matchit::Params; use policy::policy::{ @@ -443,7 +446,7 @@ impl Operation for ExportIam { .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; } ALL_GROUPS_FILE => { - let mut groups = HashMap::new(); + let mut groups: HashMap = HashMap::new(); iam_store .load_groups(&mut groups) .await @@ -514,7 +517,7 @@ impl Operation for ExportIam { .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; } USER_POLICY_MAPPINGS_FILE => { - let mut user_policy_mappings = HashMap::new(); + let mut user_policy_mappings: HashMap = HashMap::new(); iam_store .load_mapped_policys(UserType::Reg, false, &mut user_policy_mappings) .await @@ -546,7 +549,7 @@ impl Operation for ExportIam { .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; } STS_USER_POLICY_MAPPINGS_FILE => { - let mut sts_user_policy_mappings = HashMap::new(); + let mut sts_user_policy_mappings: HashMap = HashMap::new(); iam_store .load_mapped_policys(UserType::Sts, false, &mut sts_user_policy_mappings) .await @@ -600,6 +603,11 @@ impl Operation for ImportIam { let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; + let skipped = IAMEntities::default(); + let mut removed = IAMEntities::default(); + let mut added = IAMEntities::default(); + let mut failed = IAMErrEntities::default(); + { let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_POLICIES_FILE]); let file_content = match zip_reader.by_name(file_path.as_str()) { @@ -619,17 +627,19 @@ impl Operation for ImportIam { .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; for (name, policy) in policies { if policy.id.is_empty() { - iam_store - .delete_policy(&name, true) - .await - .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + let res = iam_store.delete_policy(&name, true).await; + removed.policies.push(name.clone()); + if let Err(e) = res { + return Err(s3_error!(InternalError, "delete policy failed, name: {name}, err: {e}")); + } continue; } - iam_store - .set_policy(&name, policy) - .await - .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + let res = iam_store.set_policy(&name, policy).await; + added.policies.push(name.clone()); + if let Err(e) = res { + return Err(s3_error!(InternalError, "set policy failed, name: {name}, err: {e}")); + } } } } @@ -668,17 +678,290 @@ impl Operation for ImportIam { return Err(s3_error!(InvalidArgument, "has space be")); } - iam_store - .create_user(&ak, &req) - .await - .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + if let Err(e) = iam_store.create_user(&ak, &req).await { + failed.users.push(IAMErrEntity { + name: ak.clone(), + error: e.to_string(), + }); + } else { + added.users.push(ak.clone()); + } } } } + { + let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_GROUPS_FILE]); + let file_content = match zip_reader.by_name(file_path.as_str()) { + Err(ZipError::FileNotFound) => None, + Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")), + Ok(file) => { + let mut file = file; + let mut file_content = Vec::new(); + file.read_to_end(&mut file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + Some(file_content) + } + }; + + if let Some(file_content) = file_content { + let groups: HashMap = serde_json::from_slice(&file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + for (group_name, group_info) in groups { + if let Err(e) = iam_store.get_group_description(&group_name).await { + if matches!(e, iam::error::Error::NoSuchGroup(_)) || has_space_be(&group_name) { + return Err(s3_error!(InvalidArgument, "group not found or has space be")); + } + } + + if let Err(e) = iam_store.add_users_to_group(&group_name, group_info.members.clone()).await { + failed.groups.push(IAMErrEntity { + name: group_name.clone(), + error: e.to_string(), + }); + } else { + added.groups.push(group_name.clone()); + } + } + } + } + + { + let file_path = path_join_buf(&[IAM_ASSETS_DIR, ALL_SVC_ACCTS_FILE]); + let file_content = match zip_reader.by_name(file_path.as_str()) { + Err(ZipError::FileNotFound) => None, + Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")), + Ok(file) => { + let mut file = file; + let mut file_content = Vec::new(); + file.read_to_end(&mut file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + Some(file_content) + } + }; + + if let Some(file_content) = file_content { + let svc_accts: HashMap = serde_json::from_slice(&file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + for (ak, req) in svc_accts { + if skipped.service_accounts.contains(&ak) { + continue; + } + + let sp = if let Some(ps) = req.session_policy.as_str() { + let sp = policy::policy::Policy::parse_config(ps.as_bytes()) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + Some(sp) + } else { + None + }; + + if has_space_be(&ak) { + return Err(s3_error!(InvalidArgument, "has space be {ak}")); + } + + let mut update = true; + + if let Err(e) = iam_store.get_service_account(&req.access_key).await { + if !matches!(e, iam::error::Error::NoSuchServiceAccount(_)) { + return Err(s3_error!(InvalidArgument, "failed to get service account {ak} {e}")); + } + update = false; + } + + if update { + iam_store.delete_service_account(&req.access_key, true).await.map_err(|e| { + S3Error::with_message( + S3ErrorCode::InternalError, + format!("failed to delete service account {ak} {e}"), + ) + })?; + } + + let opts = NewServiceAccountOpts { + session_policy: sp, + access_key: ak.clone(), + secret_key: req.secret_key, + name: Some(req.name), + description: Some(req.description), + expiration: req.expiration, + allow_site_replicator_account: false, + claims: Some(req.claims), + }; + + let groups = if req.groups.is_empty() { None } else { Some(req.groups) }; + + if let Err(e) = iam_store.new_service_account(&req.parent, groups, opts).await { + failed.service_accounts.push(IAMErrEntity { + name: ak.clone(), + error: e.to_string(), + }); + } else { + added.service_accounts.push(ak.clone()); + } + } + } + } + + { + let file_path = path_join_buf(&[IAM_ASSETS_DIR, USER_POLICY_MAPPINGS_FILE]); + let file_content = match zip_reader.by_name(file_path.as_str()) { + Err(ZipError::FileNotFound) => None, + Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")), + Ok(file) => { + let mut file = file; + let mut file_content = Vec::new(); + file.read_to_end(&mut file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + Some(file_content) + } + }; + + if let Some(file_content) = file_content { + let user_policy_mappings: HashMap = serde_json::from_slice(&file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + for (user_name, policies) in user_policy_mappings { + let has_temp = match iam_store.is_temp_user(&user_name).await { + Ok((has_temp, _)) => has_temp, + Err(e) => { + if !matches!(e, iam::error::Error::NoSuchUser(_)) { + return Err(s3_error!(InternalError, "is temp user failed, name: {user_name}, err: {e}")); + } + false + } + }; + + if has_temp { + return Err(s3_error!(InvalidArgument, "can't set policy for temp user {user_name}")); + } + + if let Err(e) = iam_store + .policy_db_set(&user_name, UserType::Reg, false, &policies.policies) + .await + { + failed.user_policies.push(IAMErrPolicyEntity { + name: user_name.clone(), + error: e.to_string(), + policies: policies.policies.split(',').map(|s| s.to_string()).collect(), + }); + } else { + added.user_policies.push(HashMap::from([( + user_name.clone(), + policies.policies.split(',').map(|s| s.to_string()).collect(), + )])); + } + } + } + } + + { + let file_path = path_join_buf(&[IAM_ASSETS_DIR, GROUP_POLICY_MAPPINGS_FILE]); + let file_content = match zip_reader.by_name(file_path.as_str()) { + Err(ZipError::FileNotFound) => None, + Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")), + Ok(file) => { + let mut file = file; + let mut file_content = Vec::new(); + file.read_to_end(&mut file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + Some(file_content) + } + }; + + if let Some(file_content) = file_content { + let group_policy_mappings: HashMap = serde_json::from_slice(&file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + for (group_name, policies) in group_policy_mappings { + if skipped.groups.contains(&group_name) { + continue; + } + + if let Err(e) = iam_store + .policy_db_set(&group_name, UserType::None, true, &policies.policies) + .await + { + failed.group_policies.push(IAMErrPolicyEntity { + name: group_name.clone(), + error: e.to_string(), + policies: policies.policies.split(',').map(|s| s.to_string()).collect(), + }); + } else { + added.group_policies.push(HashMap::from([( + group_name.clone(), + policies.policies.split(',').map(|s| s.to_string()).collect(), + )])); + } + } + } + } + + { + let file_path = path_join_buf(&[IAM_ASSETS_DIR, STS_USER_POLICY_MAPPINGS_FILE]); + let file_content = match zip_reader.by_name(file_path.as_str()) { + Err(ZipError::FileNotFound) => None, + Err(_) => return Err(s3_error!(InvalidRequest, "get file failed")), + Ok(file) => { + let mut file = file; + let mut file_content = Vec::new(); + file.read_to_end(&mut file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + Some(file_content) + } + }; + + if let Some(file_content) = file_content { + let sts_user_policy_mappings: HashMap = serde_json::from_slice(&file_content) + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + for (user_name, policies) in sts_user_policy_mappings { + if skipped.users.contains(&user_name) { + continue; + } + + let has_temp = match iam_store.is_temp_user(&user_name).await { + Ok((has_temp, _)) => has_temp, + Err(e) => { + if !matches!(e, iam::error::Error::NoSuchUser(_)) { + return Err(s3_error!(InternalError, "is temp user failed, name: {user_name}, err: {e}")); + } + false + } + }; + + if has_temp { + return Err(s3_error!(InvalidArgument, "can't set policy for temp user {user_name}")); + } + + if let Err(e) = iam_store + .policy_db_set(&user_name, UserType::Sts, false, &policies.policies) + .await + { + failed.sts_policies.push(IAMErrPolicyEntity { + name: user_name.clone(), + error: e.to_string(), + policies: policies.policies.split(',').map(|s| s.to_string()).collect(), + }); + } else { + added.sts_policies.push(HashMap::from([( + user_name.clone(), + policies.policies.split(',').map(|s| s.to_string()).collect(), + )])); + } + } + } + } + + let ret = ImportIAMResult { + skipped, + removed, + added, + failed, + }; + + let body = serde_json::to_vec(&ret).map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), header)) } } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 17ba383d..4fe33d16 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -16,8 +16,8 @@ use bytes::Bytes; use chrono::DateTime; use chrono::Utc; use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; -use datafusion::arrow::json::writer::JsonArray; use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; +use datafusion::arrow::json::writer::JsonArray; // use ecstore::store_api::RESERVED_METADATA_PREFIX; use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier; @@ -35,13 +35,13 @@ use ecstore::bucket::tagging::decode_tags; use ecstore::bucket::tagging::encode_tags; use ecstore::bucket::utils::serialize; use ecstore::bucket::versioning_sys::BucketVersioningSys; +use ecstore::cmd::bucket_replication::ReplicationStatusType; +use ecstore::cmd::bucket_replication::ReplicationType; use ecstore::cmd::bucket_replication::get_must_replicate_options; use ecstore::cmd::bucket_replication::must_replicate; use ecstore::cmd::bucket_replication::schedule_replication; -use ecstore::cmd::bucket_replication::ReplicationStatusType; -use ecstore::cmd::bucket_replication::ReplicationType; -use ecstore::compress::is_compressible; use ecstore::compress::MIN_COMPRESSIBLE_SIZE; +use ecstore::compress::is_compressible; use ecstore::error::StorageError; use ecstore::new_object_layer_fn; use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; @@ -60,11 +60,11 @@ use futures::StreamExt; use http::HeaderMap; use lazy_static::lazy_static; use policy::auth; -use policy::policy::action::Action; -use policy::policy::action::S3Action; use policy::policy::BucketPolicy; use policy::policy::BucketPolicyArgs; use policy::policy::Validator; +use policy::policy::action::Action; +use policy::policy::action::S3Action; use query::instance::make_rustfsms; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING}; @@ -73,23 +73,23 @@ use rustfs_rio::EtagReader; use rustfs_rio::HashReader; use rustfs_rio::Reader; use rustfs_rio::WarpReader; -use rustfs_utils::path::path_join_buf; use rustfs_utils::CompressionAlgorithm; +use rustfs_utils::path::path_join_buf; use rustfs_zip::CompressionFormat; -use s3s::dto::*; -use s3s::s3_error; +use s3s::S3; use s3s::S3Error; use s3s::S3ErrorCode; use s3s::S3Result; -use s3s::S3; +use s3s::dto::*; +use s3s::s3_error; use s3s::{S3Request, S3Response}; use std::collections::HashMap; use std::fmt::Debug; use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive;