From 0c435c6a05924bcd8f997004431d6b27e447ac8e Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 10 Apr 2025 14:57:54 +0800 Subject: [PATCH 1/4] add admin policy check for user operation --- Cargo.lock | 1 + ecstore/src/config/storageclass.rs | 8 +- ecstore/src/disk/local.rs | 9 +- ecstore/src/heal/heal_ops.rs | 5 +- ecstore/src/peer.rs | 7 +- ecstore/src/set_disk.rs | 15 +- ecstore/src/store_list_objects.rs | 12 +- iam/src/store/object.rs | 8 +- iam/src/sys.rs | 12 + madmin/Cargo.toml | 1 + madmin/src/user.rs | 40 +++ policy/src/policy/action.rs | 237 +++++++++++++++++- policy/src/policy/policy.rs | 4 +- rustfs/Cargo.toml | 15 +- rustfs/src/admin/handlers.rs | 224 ++++++++++++++--- .../admin/handlers/{policy.rs => policys.rs} | 3 +- rustfs/src/admin/handlers/service_account.rs | 159 ++++++++++-- rustfs/src/admin/handlers/user.rs | 71 ++++-- rustfs/src/admin/mod.rs | 13 +- 19 files changed, 699 insertions(+), 145 deletions(-) rename rustfs/src/admin/handlers/{policy.rs => policys.rs} (99%) diff --git a/Cargo.lock b/Cargo.lock index 68b1892c..60bd0acb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4862,6 +4862,7 @@ dependencies = [ "common", "humantime", "hyper", + "s3s", "serde", "serde_json", "time", diff --git a/ecstore/src/config/storageclass.rs b/ecstore/src/config/storageclass.rs index 3859fcb5..edb2095f 100644 --- a/ecstore/src/config/storageclass.rs +++ b/ecstore/src/config/storageclass.rs @@ -189,13 +189,7 @@ pub fn lookup_config(kvs: &KVS, set_drive_count: usize) -> Result { validate_parity_inner(standard.parity, rrs.parity, set_drive_count)?; - let optimize = { - if let Ok(ev) = env::var(OPTIMIZE_ENV) { - Some(ev) - } else { - None - } - }; + let optimize = { env::var(OPTIMIZE_ENV).ok() }; let inline_block = { if let Ok(ev) = env::var(INLINE_BLOCK_ENV) { diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index e07cf989..93681d91 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1232,7 +1232,7 @@ impl DiskAPI for LocalDisk { .join(path) .join(fi.data_dir.map_or("".to_string(), |dir| dir.to_string())) .join(format!("part.{}", part.number)); - let err = match self + let err = (self .bitrot_verify( &part_path, erasure.shard_file_size(part.size), @@ -1240,11 +1240,8 @@ impl DiskAPI for LocalDisk { &checksum_info.hash, erasure.shard_size(erasure.block_size), ) - .await - { - Ok(_) => None, - Err(err) => Some(err), - }; + .await) + .err(); resp.results[i] = conv_part_err_to_int(&err); if resp.results[i] == CHECK_PART_UNKNOWN { if let Some(err) = err { diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index d5c4544f..6f461457 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -406,10 +406,7 @@ impl HealSequence { async fn traverse_and_heal(h: Arc) { let buckets_only = false; - let result = match Self::heal_items(h.clone(), buckets_only).await { - Ok(_) => None, - Err(err) => Some(err), - }; + let result = (Self::heal_items(h.clone(), buckets_only).await).err(); let _ = h.traverse_and_heal_done_tx.read().await.send(result).await; } diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 5328cbc9..f352121f 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -80,12 +80,7 @@ impl S3PeerSys { let mut futures = Vec::with_capacity(self.clients.len()); for client in self.clients.iter() { // client_clon - futures.push(async move { - match client.get_bucket_info(bucket, &BucketOptions::default()).await { - Ok(_) => None, - Err(err) => Some(err), - } - }); + futures.push(async move { (client.get_bucket_info(bucket, &BucketOptions::default()).await).err() }); } let errs = join_all(futures).await; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 754681fe..04b3254a 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1230,7 +1230,7 @@ impl SetDisks { let shallow_versions: Vec> = metadata_shallow_versions.iter().flatten().cloned().collect(); - let read_quorum = (fileinfos.len() + 1) / 2; + let read_quorum = fileinfos.len().div_ceil(2); let versions = merge_file_meta_versions(read_quorum, false, 1, &shallow_versions); let meta = FileMeta { versions, @@ -5085,7 +5085,7 @@ fn is_object_dang_ling( }); if !valid_meta.is_valid() { - let data_blocks = (meta_arr.len() + 1) / 2; + let data_blocks = meta_arr.len().div_ceil(2); if not_found_parts_errs > data_blocks { return Ok(valid_meta); } @@ -5098,7 +5098,7 @@ fn is_object_dang_ling( } if valid_meta.deleted { - let data_blocks = (errs.len() + 1) / 2; + let data_blocks = errs.len().div_ceil(2); if not_found_meta_errs > data_blocks { return Ok(valid_meta); } @@ -5313,7 +5313,7 @@ async fn disks_with_all_parts( if let Some(data) = &meta.data { let checksum_info = meta.erasure.get_checksum_info(meta.parts[0].number); let data_len = data.len(); - let verify_err = match bitrot_verify( + let verify_err = (bitrot_verify( Box::new(Cursor::new(data.clone())), data_len, meta.erasure.shard_file_size(meta.size), @@ -5321,11 +5321,8 @@ async fn disks_with_all_parts( checksum_info.hash, meta.erasure.shard_size(meta.erasure.block_size), ) - .await - { - Ok(_) => None, - Err(err) => Some(err), - }; + .await) + .err(); if let Some(vec) = data_errs_by_part.get_mut(&0) { if index < vec.len() { diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 08c671e7..8b9118de 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -698,7 +698,7 @@ impl ECStore { futures.push(async move { let mut ask_disks = get_list_quorum(&opts.ask_disks, set.set_drive_count as i32); if ask_disks == -1 { - let new_disks = get_quorum_disks(&disks, &infos, (disks.len() + 1) / 2); + let new_disks = get_quorum_disks(&disks, &infos, disks.len().div_ceil(2)); if !new_disks.is_empty() { disks = new_disks; } else { @@ -1156,13 +1156,7 @@ async fn merge_entry_channels( if let Some(entry) = &best { let mut versions = Vec::with_capacity(to_merge.len() + 1); - let mut has_xl = { - if let Ok(meta) = entry.clone().xl_meta() { - Some(meta) - } else { - None - } - }; + let mut has_xl = { entry.clone().xl_meta().ok() }; if let Some(x) = &has_xl { versions.push(x.versions.clone()); @@ -1229,7 +1223,7 @@ impl SetDisks { let mut ask_disks = get_list_quorum(&opts.ask_disks, self.set_drive_count as i32); if ask_disks == -1 { - let new_disks = get_quorum_disks(&disks, &infos, (disks.len() + 1) / 2); + let new_disks = get_quorum_disks(&disks, &infos, disks.len().div_ceil(2)); if !new_disks.is_empty() { disks = new_disks; ask_disks = 1; diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index cf365f56..94c56fb0 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -675,11 +675,11 @@ impl Store for ObjectStore { async fn load_all(&self, cache: &Cache) -> Result<()> { let listed_config_items = self.list_all_iamconfig_items().await?; + let mut policy_docs_cache = CacheEntity::new(get_default_policyes()); + if let Some(policies_list) = listed_config_items.get(POLICIES_LIST_KEY) { let mut policies_list = policies_list.clone(); - let mut policy_docs_cache = CacheEntity::new(get_default_policyes()); - loop { if policies_list.len() < 32 { let policy_docs = self.load_policy_doc_concurrent(&policies_list).await?; @@ -712,10 +712,10 @@ impl Store for ObjectStore { policies_list = policies_list.split_off(32); } - - cache.policy_docs.store(Arc::new(policy_docs_cache.update_load_time())); } + cache.policy_docs.store(Arc::new(policy_docs_cache.update_load_time())); + let mut user_items_cache = CacheEntity::default(); // users diff --git a/iam/src/sys.rs b/iam/src/sys.rs index ee2a26b6..175dae6b 100644 --- a/iam/src/sys.rs +++ b/iam/src/sys.rs @@ -121,6 +121,18 @@ impl IamSys { // TODO: notification } + pub async fn get_role_policy(&self, arn_str: &str) -> Result<(ARN, String)> { + let Some(arn) = ARN::parse(arn_str).ok() else { + return Err(Error::msg("Invalid ARN")); + }; + + let Some(policy) = self.roles_map.get(&arn) else { + return Err(Error::msg("No such role")); + }; + + Ok((arn, policy.clone())) + } + pub async fn delete_user(&self, name: &str, _notify: bool) -> Result<()> { self.store.delete_user(name, UserType::Reg).await // TODO: notification diff --git a/madmin/Cargo.toml b/madmin/Cargo.toml index 8977a66a..18bea2ba 100644 --- a/madmin/Cargo.toml +++ b/madmin/Cargo.toml @@ -18,3 +18,4 @@ serde.workspace = true serde_json.workspace = true time.workspace = true tracing.workspace = true +s3s.workspace = true diff --git a/madmin/src/user.rs b/madmin/src/user.rs index 3a398c51..a13fa171 100644 --- a/madmin/src/user.rs +++ b/madmin/src/user.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use time::OffsetDateTime; +use crate::BackendInfo; + #[derive(Debug, Serialize, Deserialize, Default, PartialEq, Eq)] pub enum AccountStatus { #[serde(rename = "enabled")] @@ -221,3 +224,40 @@ impl UpdateServiceAccountReq { Ok(()) } } + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct AccountInfo { + pub account_name: String, + pub server: BackendInfo, + pub policy: serde_json::Value, // Use iam/policy::parse to parse the result, to be done by the caller. + pub buckets: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct BucketAccessInfo { + pub name: String, + pub size: u64, + pub objects: u64, + pub object_sizes_histogram: HashMap, + pub object_versions_histogram: HashMap, + pub details: Option, + pub prefix_usage: HashMap, + #[serde(rename = "expiration", with = "time::serde::rfc3339::option")] + pub created: Option, + pub access: AccountAccess, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct BucketDetails { + pub versioning: bool, + pub versioning_suspended: bool, + pub locking: bool, + pub replication: bool, + // pub tagging: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct AccountAccess { + pub read: bool, + pub write: bool, +} diff --git a/policy/src/policy/action.rs b/policy/src/policy/action.rs index e152e89b..b9df63b6 100644 --- a/policy/src/policy/action.rs +++ b/policy/src/policy/action.rs @@ -54,6 +54,7 @@ pub enum Action { AdminAction(AdminAction), StsAction(StsAction), KmsAction(KmsAction), + None, } impl Action { @@ -69,6 +70,7 @@ impl From<&Action> for &str { Action::AdminAction(s) => s.into(), Action::StsAction(s) => s.into(), Action::KmsAction(s) => s.into(), + Action::None => "", } } } @@ -232,35 +234,254 @@ pub enum S3Action { PutObjectFanOutAction, } +// #[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone, EnumString, IntoStaticStr, Debug, Copy)] +// #[serde(try_from = "&str", into = "&str")] +// pub enum AdminAction { +// #[strum(serialize = "admin:*")] +// AllActions, +// #[strum(serialize = "admin:Profiling")] +// ProfilingAdminAction, +// #[strum(serialize = "admin:ServerTrace")] +// TraceAdminAction, +// #[strum(serialize = "admin:ConsoleLog")] +// ConsoleLogAdminAction, +// #[strum(serialize = "admin:ServerInfo")] +// ServerInfoAdminAction, +// #[strum(serialize = "admin:OBDInfo")] +// HealthInfoAdminAction, +// #[strum(serialize = "admin:TopLocksInfo")] +// TopLocksAdminAction, +// #[strum(serialize = "admin:LicenseInfo")] +// LicenseInfoAdminAction, +// #[strum(serialize = "admin:BandwidthMonitor")] +// BandwidthMonitorAction, +// #[strum(serialize = "admin:InspectData")] +// InspectDataAction, +// #[strum(serialize = "admin:Prometheus")] +// PrometheusAdminAction, +// #[strum(serialize = "admin:ListServiceAccounts")] +// ListServiceAccountsAdminAction, +// #[strum(serialize = "admin:CreateServiceAccount")] +// CreateServiceAccountAdminAction, +// } + +// AdminAction - admin policy action. #[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone, EnumString, IntoStaticStr, Debug, Copy)] #[serde(try_from = "&str", into = "&str")] pub enum AdminAction { - #[strum(serialize = "admin:*")] - AllActions, + #[strum(serialize = "admin:Heal")] + HealAdminAction, + #[strum(serialize = "admin:Decommission")] + DecommissionAdminAction, + #[strum(serialize = "admin:Rebalance")] + RebalanceAdminAction, + #[strum(serialize = "admin:StorageInfo")] + StorageInfoAdminAction, + #[strum(serialize = "admin:Prometheus")] + PrometheusAdminAction, + #[strum(serialize = "admin:DataUsageInfo")] + DataUsageInfoAdminAction, + #[strum(serialize = "admin:ForceUnlock")] + ForceUnlockAdminAction, + #[strum(serialize = "admin:TopLocksInfo")] + TopLocksAdminAction, #[strum(serialize = "admin:Profiling")] ProfilingAdminAction, #[strum(serialize = "admin:ServerTrace")] TraceAdminAction, #[strum(serialize = "admin:ConsoleLog")] ConsoleLogAdminAction, + #[strum(serialize = "admin:KMSCreateKey")] + KMSCreateKeyAdminAction, + #[strum(serialize = "admin:KMSKeyStatus")] + KMSKeyStatusAdminAction, #[strum(serialize = "admin:ServerInfo")] ServerInfoAdminAction, #[strum(serialize = "admin:OBDInfo")] HealthInfoAdminAction, - #[strum(serialize = "admin:TopLocksInfo")] - TopLocksAdminAction, #[strum(serialize = "admin:LicenseInfo")] LicenseInfoAdminAction, #[strum(serialize = "admin:BandwidthMonitor")] BandwidthMonitorAction, #[strum(serialize = "admin:InspectData")] InspectDataAction, - #[strum(serialize = "admin:Prometheus")] - PrometheusAdminAction, - #[strum(serialize = "admin:ListServiceAccounts")] - ListServiceAccountsAdminAction, + #[strum(serialize = "admin:ServerUpdate")] + ServerUpdateAdminAction, + #[strum(serialize = "admin:ServiceRestart")] + ServiceRestartAdminAction, + #[strum(serialize = "admin:ServiceStop")] + ServiceStopAdminAction, + #[strum(serialize = "admin:ServiceFreeze")] + ServiceFreezeAdminAction, + #[strum(serialize = "admin:ConfigUpdate")] + ConfigUpdateAdminAction, + #[strum(serialize = "admin:CreateUser")] + CreateUserAdminAction, + #[strum(serialize = "admin:DeleteUser")] + DeleteUserAdminAction, + #[strum(serialize = "admin:ListUsers")] + ListUsersAdminAction, + #[strum(serialize = "admin:EnableUser")] + EnableUserAdminAction, + #[strum(serialize = "admin:DisableUser")] + DisableUserAdminAction, + #[strum(serialize = "admin:GetUser")] + GetUserAdminAction, + #[strum(serialize = "admin:SiteReplicationAdd")] + SiteReplicationAddAction, + #[strum(serialize = "admin:SiteReplicationDisable")] + SiteReplicationDisableAction, + #[strum(serialize = "admin:SiteReplicationRemove")] + SiteReplicationRemoveAction, + #[strum(serialize = "admin:SiteReplicationResync")] + SiteReplicationResyncAction, + #[strum(serialize = "admin:SiteReplicationInfo")] + SiteReplicationInfoAction, + #[strum(serialize = "admin:SiteReplicationOperation")] + SiteReplicationOperationAction, #[strum(serialize = "admin:CreateServiceAccount")] CreateServiceAccountAdminAction, + #[strum(serialize = "admin:UpdateServiceAccount")] + UpdateServiceAccountAdminAction, + #[strum(serialize = "admin:RemoveServiceAccount")] + RemoveServiceAccountAdminAction, + #[strum(serialize = "admin:ListServiceAccounts")] + ListServiceAccountsAdminAction, + #[strum(serialize = "admin:ListTemporaryAccounts")] + ListTemporaryAccountsAdminAction, + #[strum(serialize = "admin:AddUserToGroup")] + AddUserToGroupAdminAction, + #[strum(serialize = "admin:RemoveUserFromGroup")] + RemoveUserFromGroupAdminAction, + #[strum(serialize = "admin:GetGroup")] + GetGroupAdminAction, + #[strum(serialize = "admin:ListGroups")] + ListGroupsAdminAction, + #[strum(serialize = "admin:EnableGroup")] + EnableGroupAdminAction, + #[strum(serialize = "admin:DisableGroup")] + DisableGroupAdminAction, + #[strum(serialize = "admin:CreatePolicy")] + CreatePolicyAdminAction, + #[strum(serialize = "admin:DeletePolicy")] + DeletePolicyAdminAction, + #[strum(serialize = "admin:GetPolicy")] + GetPolicyAdminAction, + #[strum(serialize = "admin:AttachUserOrGroupPolicy")] + AttachPolicyAdminAction, + #[strum(serialize = "admin:UpdatePolicyAssociation")] + UpdatePolicyAssociationAction, + #[strum(serialize = "admin:ListUserPolicies")] + ListUserPoliciesAdminAction, + #[strum(serialize = "admin:SetBucketQuota")] + SetBucketQuotaAdminAction, + #[strum(serialize = "admin:GetBucketQuota")] + GetBucketQuotaAdminAction, + #[strum(serialize = "admin:SetBucketTarget")] + SetBucketTargetAction, + #[strum(serialize = "admin:GetBucketTarget")] + GetBucketTargetAction, + #[strum(serialize = "admin:ReplicationDiff")] + ReplicationDiff, + #[strum(serialize = "admin:ImportBucketMetadata")] + ImportBucketMetadataAction, + #[strum(serialize = "admin:ExportBucketMetadata")] + ExportBucketMetadataAction, + #[strum(serialize = "admin:SetTier")] + SetTierAction, + #[strum(serialize = "admin:ListTier")] + ListTierAction, + #[strum(serialize = "admin:ExportIAM")] + ExportIAMAction, + #[strum(serialize = "admin:ImportIAM")] + ImportIAMAction, + #[strum(serialize = "admin:ListBatchJobs")] + ListBatchJobsAction, + #[strum(serialize = "admin:DescribeBatchJob")] + DescribeBatchJobAction, + #[strum(serialize = "admin:StartBatchJob")] + StartBatchJobAction, + #[strum(serialize = "admin:CancelBatchJob")] + CancelBatchJobAction, + #[strum(serialize = "admin:*")] + AllAdminActions, +} + +impl AdminAction { + // IsValid - checks if action is valid or not. + pub fn is_valid(&self) -> bool { + matches!( + self, + AdminAction::HealAdminAction + | AdminAction::DecommissionAdminAction + | AdminAction::RebalanceAdminAction + | AdminAction::StorageInfoAdminAction + | AdminAction::PrometheusAdminAction + | AdminAction::DataUsageInfoAdminAction + | AdminAction::ForceUnlockAdminAction + | AdminAction::TopLocksAdminAction + | AdminAction::ProfilingAdminAction + | AdminAction::TraceAdminAction + | AdminAction::ConsoleLogAdminAction + | AdminAction::KMSCreateKeyAdminAction + | AdminAction::KMSKeyStatusAdminAction + | AdminAction::ServerInfoAdminAction + | AdminAction::HealthInfoAdminAction + | AdminAction::LicenseInfoAdminAction + | AdminAction::BandwidthMonitorAction + | AdminAction::InspectDataAction + | AdminAction::ServerUpdateAdminAction + | AdminAction::ServiceRestartAdminAction + | AdminAction::ServiceStopAdminAction + | AdminAction::ServiceFreezeAdminAction + | AdminAction::ConfigUpdateAdminAction + | AdminAction::CreateUserAdminAction + | AdminAction::DeleteUserAdminAction + | AdminAction::ListUsersAdminAction + | AdminAction::EnableUserAdminAction + | AdminAction::DisableUserAdminAction + | AdminAction::GetUserAdminAction + | AdminAction::SiteReplicationAddAction + | AdminAction::SiteReplicationDisableAction + | AdminAction::SiteReplicationRemoveAction + | AdminAction::SiteReplicationResyncAction + | AdminAction::SiteReplicationInfoAction + | AdminAction::SiteReplicationOperationAction + | AdminAction::CreateServiceAccountAdminAction + | AdminAction::UpdateServiceAccountAdminAction + | AdminAction::RemoveServiceAccountAdminAction + | AdminAction::ListServiceAccountsAdminAction + | AdminAction::ListTemporaryAccountsAdminAction + | AdminAction::AddUserToGroupAdminAction + | AdminAction::RemoveUserFromGroupAdminAction + | AdminAction::GetGroupAdminAction + | AdminAction::ListGroupsAdminAction + | AdminAction::EnableGroupAdminAction + | AdminAction::DisableGroupAdminAction + | AdminAction::CreatePolicyAdminAction + | AdminAction::DeletePolicyAdminAction + | AdminAction::GetPolicyAdminAction + | AdminAction::AttachPolicyAdminAction + | AdminAction::UpdatePolicyAssociationAction + | AdminAction::ListUserPoliciesAdminAction + | AdminAction::SetBucketQuotaAdminAction + | AdminAction::GetBucketQuotaAdminAction + | AdminAction::SetBucketTargetAction + | AdminAction::GetBucketTargetAction + | AdminAction::ReplicationDiff + | AdminAction::ImportBucketMetadataAction + | AdminAction::ExportBucketMetadataAction + | AdminAction::SetTierAction + | AdminAction::ListTierAction + | AdminAction::ExportIAMAction + | AdminAction::ImportIAMAction + | AdminAction::ListBatchJobsAction + | AdminAction::DescribeBatchJobAction + | AdminAction::StartBatchJobAction + | AdminAction::CancelBatchJobAction + | AdminAction::AllAdminActions + ) + } } #[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone, EnumString, IntoStaticStr, Debug, Copy)] diff --git a/policy/src/policy/policy.rs b/policy/src/policy/policy.rs index f1de3790..b96f66e4 100644 --- a/policy/src/policy/policy.rs +++ b/policy/src/policy/policy.rs @@ -240,7 +240,7 @@ fn get_values_from_claims(claims: &HashMap, claim_name: &str) -> (s, false) } -fn get_policies_from_claims(claims: &HashMap, policy_claim_name: &str) -> (HashSet, bool) { +pub fn get_policies_from_claims(claims: &HashMap, policy_claim_name: &str) -> (HashSet, bool) { get_values_from_claims(claims, policy_claim_name) } @@ -401,7 +401,7 @@ pub mod default { effect: Effect::Allow, actions: ActionSet({ let mut hash_set = HashSet::new(); - hash_set.insert(Action::AdminAction(AdminAction::AllActions)); + hash_set.insert(Action::AdminAction(AdminAction::AllAdminActions)); hash_set }), not_actions: ActionSet(Default::default()), diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 00c10beb..7d9e743f 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -43,7 +43,7 @@ http.workspace = true http-body.workspace = true iam = { path = "../iam" } jsonwebtoken = "9.3.0" -libsystemd = { workspace = true, optional = true } + lock.workspace = true local-ip-address = { workspace = true } matchit = { workspace = true } @@ -71,7 +71,12 @@ shadow-rs.workspace = true tracing.workspace = true time = { workspace = true, features = ["parsing", "formatting", "serde"] } tokio-util.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] } +tokio = { workspace = true, features = [ + "rt-multi-thread", + "macros", + "net", + "signal", +] } tokio-rustls.workspace = true lazy_static.workspace = true tokio-stream.workspace = true @@ -101,7 +106,11 @@ ecstore = { path = "../ecstore" } s3s.workspace = true clap = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "time"] } -hyper-util = { workspace = true, features = ["tokio", "server-auto", "server-graceful"] } +hyper-util = { workspace = true, features = [ + "tokio", + "server-auto", + "server-graceful", +] } transform-stream = { workspace = true } netif = "0.1.6" shadow-rs.workspace = true diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 3a53de3c..4476bbc1 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -1,12 +1,12 @@ use super::router::Operation; +use crate::auth::check_key_valid; +use crate::auth::get_condition_values; +use crate::auth::get_session_token; use crate::storage::error::to_s3_error; -use ::policy::policy::action::{Action, S3Action}; -use ::policy::policy::resource::Resource; -use ::policy::policy::statement::BPStatement; -use ::policy::policy::{ActionSet, BucketPolicy, Effect, ResourceSet}; use bytes::Bytes; use common::error::Error as ec_Error; use ecstore::admin_server_info::get_server_info; +use ecstore::bucket::versioning_sys::BucketVersioningSys; use ecstore::global::GLOBAL_ALlHealState; use ecstore::heal::data_usage::load_data_usage_from_backend; use ecstore::heal::heal_commands::HealOpts; @@ -15,15 +15,23 @@ use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, Metri use ecstore::new_object_layer_fn; use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::store::is_valid_object_prefix; +use ecstore::store_api::BucketOptions; use ecstore::store_api::StorageAPI; use ecstore::utils::path::path_join; use ecstore::GLOBAL_Endpoints; use futures::{Stream, StreamExt}; use http::{HeaderMap, Uri}; use hyper::StatusCode; +use iam::get_global_action_cred; +use iam::store::MappedPolicy; use madmin::metrics::RealtimeMetrics; use madmin::utils::parse_duration; use matchit::Params; +use policy::policy::action::Action; +use policy::policy::action::S3Action; +use policy::policy::default::DEFAULT_POLICIES; +use policy::policy::Args; +use policy::policy::BucketPolicy; use s3s::header::CONTENT_TYPE; use s3s::stream::{ByteStream, DynByteStream}; use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result}; @@ -43,7 +51,7 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; pub mod group; -pub mod policy; +pub mod policys; pub mod service_account; pub mod sts; pub mod trace; @@ -61,49 +69,187 @@ pub struct AccountInfoHandler {} #[async_trait::async_trait] impl Operation for AccountInfoHandler { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle AccountInfoHandler"); - - let Some(cred) = req.credentials else { return Err(s3_error!(InvalidRequest, "get cred failed")) }; - - warn!("AccountInfoHandler cread {:?}", &cred); - let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; - // test policy + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; - let mut s3_all_act = HashSet::with_capacity(1); - s3_all_act.insert(Action::S3Action(S3Action::AllActions)); + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; - let mut all_res = HashSet::with_capacity(1); - all_res.insert(Resource::S3("*".to_string())); + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; - let bucket_policy = BucketPolicy { - id: "".into(), - version: "2012-10-17".to_owned(), - statements: vec![BPStatement { - sid: "".into(), - effect: Effect::Allow, - actions: ActionSet(s3_all_act.clone()), - resources: ResourceSet(all_res), + let default_claims = HashMap::new(); + let claims = cred.claims.as_ref().unwrap_or(&default_claims); + + let cred_clone = cred.clone(); + let conditions = get_condition_values(&req.headers, &cred_clone); + let cred_clone = Arc::new(cred_clone); + let conditions = Arc::new(conditions); + + let is_allow = Box::new({ + let iam_clone = Arc::clone(&iam_store); + let cred_clone = Arc::clone(&cred_clone); + let conditions = Arc::clone(&conditions); + move |name: String| { + let iam_clone = Arc::clone(&iam_clone); + let cred_clone = Arc::clone(&cred_clone); + let conditions = Arc::clone(&conditions); + async move { + let (mut rd, mut wr) = (false, false); + if !iam_clone + .is_allowed(&Args { + account: &cred_clone.access_key, + groups: &cred_clone.groups, + action: Action::S3Action(S3Action::ListBucketAction), + bucket: &name, + conditions: &conditions, + is_owner: owner, + object: "", + claims, + deny_only: false, + }) + .await + { + rd = true + } + + if !iam_clone + .is_allowed(&Args { + account: &cred_clone.access_key, + groups: &cred_clone.groups, + action: Action::S3Action(S3Action::GetBucketLocationAction), + bucket: &name, + conditions: &conditions, + is_owner: owner, + object: "", + claims, + deny_only: false, + }) + .await + { + rd = true + } + + if !iam_clone + .is_allowed(&Args { + account: &cred_clone.access_key, + groups: &cred_clone.groups, + action: Action::S3Action(S3Action::PutObjectAction), + bucket: &name, + conditions: &conditions, + is_owner: owner, + object: "", + claims, + deny_only: false, + }) + .await + { + wr = true + } + + (rd, wr) + } + } + }); + + let account_name = if cred.is_temp() || cred.is_service_account() { + cred.parent_user.clone() + } else { + cred.access_key.clone() + }; + + let claims_args = Args { + account: "", + groups: &None, + action: Action::None, + bucket: "", + conditions: &HashMap::new(), + is_owner: false, + object: "", + claims, + deny_only: false, + }; + + let role_arn = claims_args.get_role_arn(); + + // TODO: get_policies_from_claims(claims); + + let Some(admin_cred) = get_global_action_cred() else { + return Err(S3Error::with_message( + S3ErrorCode::InternalError, + "get_global_action_cred failed".to_string(), + )); + }; + + let mut effective_policy: policy::policy::Policy = Default::default(); + + if account_name == admin_cred.access_key { + for (name, p) in DEFAULT_POLICIES.iter() { + if *name == "consoleAdmin" { + effective_policy = p.clone(); + break; + } + } + } else if let Some(arn) = role_arn { + let (_, policy_name) = iam_store + .get_role_policy(arn) + .await + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; + + let policies = MappedPolicy::new(&policy_name).to_slice(); + effective_policy = iam_store.get_combined_policy(&policies).await; + } else { + let policies = iam_store + .policy_db_get(&account_name, &cred.groups) + .await + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("get policy failed: {}", e)))?; + + effective_policy = iam_store.get_combined_policy(&policies).await; + }; + + let policy_str = serde_json::to_string(&effective_policy) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse policy failed"))?; + + let mut account_info = madmin::AccountInfo { + account_name, + server: store.backend_info().await, + policy: serde_json::Value::String(policy_str), + ..Default::default() + }; + + // TODO: bucket policy + let buckets = store + .list_bucket(&BucketOptions { + cached: true, ..Default::default() - }], - }; + }) + .await + .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?; - // let policy = bucket_policy - // .marshal_msg() - // .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse policy failed"))?; + for bucket in buckets.iter() { + let (rd, wr) = is_allow(bucket.name.clone()).await; + if rd || wr { + // TODO: BucketQuotaSys + // TODO: other attributes + account_info.buckets.push(madmin::BucketAccessInfo { + name: bucket.name.clone(), + details: Some(madmin::BucketDetails { + versioning: BucketVersioningSys::enabled(bucket.name.as_str()).await, + versioning_suspended: BucketVersioningSys::suspended(bucket.name.as_str()).await, + ..Default::default() + }), + created: bucket.created, + access: madmin::AccountAccess { read: rd, write: wr }, + ..Default::default() + }); + } + } - let backend_info = store.backend_info().await; - - let info = AccountInfo { - account_name: cred.access_key, - server: backend_info, - policy: bucket_policy, - }; - - let data = serde_json::to_vec(&info) + let data = serde_json::to_vec(&account_info) .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?; let mut header = HeaderMap::new(); @@ -128,8 +274,6 @@ pub struct ServerInfoHandler {} #[async_trait::async_trait] impl Operation for ServerInfoHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle ServerInfoHandler"); - let info = get_server_info(true).await; let data = serde_json::to_vec(&info) diff --git a/rustfs/src/admin/handlers/policy.rs b/rustfs/src/admin/handlers/policys.rs similarity index 99% rename from rustfs/src/admin/handlers/policy.rs rename to rustfs/src/admin/handlers/policys.rs index 4d650a59..2ef7f42f 100644 --- a/rustfs/src/admin/handlers/policy.rs +++ b/rustfs/src/admin/handlers/policys.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use crate::admin::{router::Operation, utils::has_space_be}; use http::{HeaderMap, StatusCode}; use iam::{error::is_err_no_such_user, get_global_action_cred, store::MappedPolicy}; @@ -8,6 +6,7 @@ use policy::policy::Policy; use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result}; use serde::Deserialize; use serde_urlencoded::from_bytes; +use std::collections::HashMap; use tracing::warn; #[derive(Debug, Deserialize, Default)] diff --git a/rustfs/src/admin/handlers/service_account.rs b/rustfs/src/admin/handlers/service_account.rs index 942bf4e5..f7d561ed 100644 --- a/rustfs/src/admin/handlers/service_account.rs +++ b/rustfs/src/admin/handlers/service_account.rs @@ -1,5 +1,5 @@ use crate::admin::utils::has_space_be; -use crate::auth::get_session_token; +use crate::auth::{get_condition_values, get_session_token}; use crate::{admin::router::Operation, auth::check_key_valid}; use http::HeaderMap; use hyper::StatusCode; @@ -13,7 +13,8 @@ use madmin::{ ServiceAccountInfo, UpdateServiceAccountReq, }; use matchit::Params; -use policy::policy::Policy; +use policy::policy::action::{Action, AdminAction}; +use policy::policy::{Args, Policy}; use s3s::S3ErrorCode::InvalidRequest; use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result}; use serde::Deserialize; @@ -30,7 +31,7 @@ impl Operation for AddServiceAccount { return Err(s3_error!(InvalidRequest, "get cred failed")); }; - let (cred, _owner) = + let (cred, owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &req_cred.access_key).await?; let mut input = req.input; @@ -91,6 +92,25 @@ impl Operation for AddServiceAccount { let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; + let deny_only = cred.access_key == target_user || cred.parent_user == target_user; + + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::CreateServiceAccountAdminAction), + bucket: "", + conditions: &get_condition_values(&req.headers, &cred), + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only, + }) + .await + { + return Err(s3_error!(AccessDenied, "access denied")); + } + if target_user != cred.access_key { let has_user = iam_store.get_user(&target_user).await; if has_user.is_none() && target_user != sys_cred.access_key { @@ -212,7 +232,31 @@ impl Operation for UpdateServiceAccount { update_req .validate() .map_err(|e| S3Error::with_message(InvalidRequest, e.to_string()))?; - // TODO: is_allowed + + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::UpdateServiceAccountAdminAction), + bucket: "", + conditions: &get_condition_values(&req.headers, &cred), + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only: false, + }) + .await + { + return Err(s3_error!(AccessDenied, "access denied")); + } + let sp = { if let Some(policy) = update_req.new_policy { let sp = Policy::parse_config(policy.as_bytes()).map_err(|e| { @@ -280,7 +324,36 @@ impl Operation for InfoServiceAccount { s3_error!(InternalError, "get service account failed") })?; - // TODO: is_allowed + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::ListServiceAccountsAdminAction), + bucket: "", + conditions: &get_condition_values(&req.headers, &cred), + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only: false, + }) + .await + { + let user = if cred.parent_user.is_empty() { + &cred.access_key + } else { + &cred.parent_user + }; + if user != &svc_account.parent_user { + return Err(s3_error!(AccessDenied, "access denied")); + } + } let implied_policy = if let Some(policy) = session_policy.as_ref() { policy.version.is_empty() && policy.statements.is_empty() @@ -359,7 +432,7 @@ impl Operation for ListServiceAccount { return Err(s3_error!(InvalidRequest, "get cred failed")); }; - let (cred, _owner) = + let (cred, owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key) .await .map_err(|e| { @@ -367,22 +440,47 @@ impl Operation for ListServiceAccount { s3_error!(InternalError, "check key failed") })?; - let target_account = if let Some(user) = query.user { - if user != input_cred.access_key { - user - } else if cred.parent_user.is_empty() { - input_cred.access_key - } else { - cred.parent_user + // let target_account = if let Some(user) = query.user { + // if user != input_cred.access_key { + // user + // } else if cred.parent_user.is_empty() { + // input_cred.access_key + // } else { + // cred.parent_user + // } + // } else if cred.parent_user.is_empty() { + // input_cred.access_key + // } else { + // cred.parent_user + // }; + + let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; + + let target_account = if query.user.as_ref().is_some_and(|v| v != &cred.access_key) { + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::UpdateServiceAccountAdminAction), + bucket: "", + conditions: &get_condition_values(&req.headers, &cred), + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only: false, + }) + .await + { + return Err(s3_error!(AccessDenied, "access denied")); } + + query.user.unwrap_or_default() } else if cred.parent_user.is_empty() { - input_cred.access_key + cred.access_key } else { cred.parent_user }; - let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; - let service_accounts = iam_store.list_service_accounts(&target_account).await.map_err(|e| { debug!("list service account failed: {e:?}"); s3_error!(InternalError, "list service account failed") @@ -420,7 +518,7 @@ impl Operation for DeleteServiceAccount { return Err(s3_error!(InvalidRequest, "get cred failed")); }; - let (_cred, _owner) = + let (cred, owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key) .await .map_err(|e| { @@ -444,7 +542,7 @@ impl Operation for DeleteServiceAccount { let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; - let _svc_account = match iam_store.get_service_account(&query.access_key).await { + let svc_account = match iam_store.get_service_account(&query.access_key).await { Ok((res, _)) => Some(res), Err(err) => { if is_err_no_such_service_account(&err) { @@ -455,7 +553,30 @@ impl Operation for DeleteServiceAccount { } }; - // TODO: is_allowed + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::RemoveServiceAccountAdminAction), + bucket: "", + conditions: &get_condition_values(&req.headers, &cred), + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only: false, + }) + .await + { + let user = if cred.parent_user.is_empty() { + &cred.access_key + } else { + &cred.parent_user + }; + + if svc_account.is_some_and(|v| &v.parent_user != user) { + return Err(s3_error!(InvalidRequest, "service account not exist")); + } + } iam_store.delete_service_account(&query.access_key).await.map_err(|e| { debug!("delete service account failed, e: {:?}", e); diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index 06e2b987..36d7c593 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -1,9 +1,13 @@ -use std::str::from_utf8; +use std::{collections::HashMap, str::from_utf8}; use http::{HeaderMap, StatusCode}; use iam::get_global_action_cred; use madmin::{AccountStatus, AddOrUpdateUserReq}; use matchit::Params; +use policy::policy::{ + action::{Action, AdminAction}, + Args, +}; use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result}; use serde::Deserialize; use serde_urlencoded::from_bytes; @@ -11,7 +15,7 @@ use tracing::warn; use crate::{ admin::{router::Operation, utils::has_space_be}, - auth::{check_key_valid, get_session_token}, + auth::{check_key_valid, get_condition_values, get_session_token}, }; #[derive(Debug, Deserialize, Default)] @@ -39,7 +43,7 @@ impl Operation for AddUser { return Err(s3_error!(InvalidRequest, "get cred failed")); }; - let (cred, _owner) = + let (cred, owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; let ak = query.access_key.as_deref().unwrap_or_default(); @@ -63,8 +67,6 @@ impl Operation for AddUser { let args: AddOrUpdateUserReq = serde_json::from_slice(&body) .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("unmarshal body err {}", e)))?; - warn!("add user args {:?}", args); - if args.secret_key.is_empty() { return Err(s3_error!(InvalidArgument, "access key is empty")); } @@ -89,13 +91,24 @@ impl Operation for AddUser { return Err(s3_error!(InvalidArgument, "access key is not utf8")); } - // let check_deny_only = if ak == cred.access_key { - // true - // } else { - // false - // }; - - // TODO: is_allowed + let deny_only = ak == cred.access_key; + let conditions = get_condition_values(&req.headers, &cred); + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::CreateUserAdminAction), + bucket: "", + conditions: &conditions, + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only, + }) + .await + { + return Err(s3_error!(AccessDenied, "access denied")); + } iam_store .create_user(ak, &args) @@ -113,8 +126,6 @@ pub struct SetUserStatus {} #[async_trait::async_trait] impl Operation for SetUserStatus { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle SetUserStatus"); - let query = { if let Some(query) = req.uri.query() { let input: AddUserQuery = @@ -165,8 +176,6 @@ pub struct ListUsers {} #[async_trait::async_trait] impl Operation for ListUsers { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle ListUsers"); - let query = { if let Some(query) = req.uri.query() { let input: BucketQuery = @@ -214,8 +223,6 @@ pub struct RemoveUser {} #[async_trait::async_trait] impl Operation for RemoveUser { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle RemoveUser"); - let query = { if let Some(query) = req.uri.query() { let input: AddUserQuery = @@ -277,8 +284,6 @@ pub struct GetUserInfo {} #[async_trait::async_trait] impl Operation for GetUserInfo { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle GetUserInfo"); - let query = { if let Some(query) = req.uri.query() { let input: AddUserQuery = @@ -297,6 +302,32 @@ impl Operation for GetUserInfo { let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) }; + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + + let deny_only = ak == cred.access_key; + let conditions = get_condition_values(&req.headers, &cred); + if !iam_store + .is_allowed(&Args { + account: &cred.access_key, + groups: &cred.groups, + action: Action::AdminAction(AdminAction::GetUserAdminAction), + bucket: "", + conditions: &conditions, + is_owner: owner, + object: "", + claims: cred.claims.as_ref().unwrap_or(&HashMap::new()), + deny_only, + }) + .await + { + return Err(s3_error!(AccessDenied, "access denied")); + } + let info = iam_store .get_user_info(ak) .await diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 4b398132..094a8c8e 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -6,10 +6,11 @@ pub mod utils; use common::error::Result; // use ecstore::global::{is_dist_erasure, is_erasure}; use handlers::{ - group, policy, + group, policys, service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount}, sts, user, }; + use hyper::Method; use router::{AdminOperation, S3Router}; use rpc::regist_rpc_route; @@ -231,35 +232,35 @@ fn regist_user_route(r: &mut S3Router) -> Result<()> { r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/list-canned-policies").as_str(), - AdminOperation(&policy::ListCannedPolicies {}), + AdminOperation(&policys::ListCannedPolicies {}), )?; // info-canned-policy?name=xxx r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/info-canned-policy").as_str(), - AdminOperation(&policy::InfoCannedPolicy {}), + AdminOperation(&policys::InfoCannedPolicy {}), )?; // add-canned-policy?name=xxx r.insert( Method::PUT, format!("{}{}", ADMIN_PREFIX, "/v3/add-canned-policy").as_str(), - AdminOperation(&policy::AddCannedPolicy {}), + AdminOperation(&policys::AddCannedPolicy {}), )?; // remove-canned-policy?name=xxx r.insert( Method::DELETE, format!("{}{}", ADMIN_PREFIX, "/v3/remove-canned-policy").as_str(), - AdminOperation(&policy::RemoveCannedPolicy {}), + AdminOperation(&policys::RemoveCannedPolicy {}), )?; // set-user-or-group-policy?policyName=xxx&userOrGroup=xxx&isGroup=xxx r.insert( Method::PUT, format!("{}{}", ADMIN_PREFIX, "/v3/set-user-or-group-policy").as_str(), - AdminOperation(&policy::SetPolicyForUserOrGroup {}), + AdminOperation(&policys::SetPolicyForUserOrGroup {}), )?; Ok(()) From f97c262a1b425ba3d28a10bcb850b8a07ee622e1 Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 11 Apr 2025 11:33:44 +0800 Subject: [PATCH 2/4] fix:#309 add head_object options --- rustfs/src/storage/ecfs.rs | 65 +++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index f517220a..a235542b 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -261,14 +261,7 @@ impl S3 for FS { .await .map_err(to_s3_error)?; - let version_id = opts - .version_id - .as_ref() - .map(|v| match Uuid::parse_str(v) { - Ok(id) => Some(id), - Err(_) => None, - }) - .unwrap_or_default(); + let version_id = opts.version_id.as_ref().map(|v| Uuid::parse_str(v).ok()).unwrap_or_default(); let dobj = ObjectToDelete { object_name: key, version_id, @@ -325,14 +318,7 @@ impl S3 for FS { .objects .iter() .map(|v| { - let version_id = v - .version_id - .as_ref() - .map(|v| match Uuid::parse_str(v) { - Ok(id) => Some(id), - Err(_) => None, - }) - .unwrap_or_default(); + let version_id = v.version_id.as_ref().map(|v| Uuid::parse_str(v).ok()).unwrap_or_default(); ObjectToDelete { object_name: v.key.clone(), version_id, @@ -407,8 +393,6 @@ impl S3 for FS { async fn get_object(&self, req: S3Request) -> S3Result> { // mc get 3 - // warn!("get_object input {:?}, vid {:?}", &req.input, req.input.version_id); - let GetObjectInput { bucket, key, @@ -447,8 +431,6 @@ impl S3 for FS { return Err(s3_error!(InvalidArgument, "range and part_number invalid")); } - // let metadata = extract_metadata(&req.headers); - let opts: ObjectOptions = get_opts(&bucket, &key, version_id, part_number, &req.headers) .await .map_err(to_s3_error)?; @@ -516,16 +498,49 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self, req))] async fn head_object(&self, req: S3Request) -> S3Result> { // mc get 2 - let HeadObjectInput { bucket, key, .. } = req.input; + let HeadObjectInput { + bucket, + key, + version_id, + part_number, + range, + .. + } = req.input; + + let part_number = part_number.map(|v| v as usize); + + if let Some(part_num) = part_number { + if part_num == 0 { + return Err(s3_error!(InvalidArgument, "part_numer invalid")); + } + } + + let rs = range.map(|v| match v { + Range::Int { first, last } => HTTPRangeSpec { + is_suffix_length: false, + start: first as usize, + end: last.map(|v| v as usize), + }, + Range::Suffix { length } => HTTPRangeSpec { + is_suffix_length: true, + start: length as usize, + end: None, + }, + }); + + if rs.is_some() && part_number.is_some() { + return Err(s3_error!(InvalidArgument, "range and part_number invalid")); + } + + let opts: ObjectOptions = get_opts(&bucket, &key, version_id, part_number, &req.headers) + .await + .map_err(to_s3_error)?; let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; - let info = store - .get_object_info(&bucket, &key, &ObjectOptions::default()) - .await - .map_err(to_s3_error)?; + let info = store.get_object_info(&bucket, &key, &opts).await.map_err(to_s3_error)?; // warn!("head_object info {:?}", &info); From 9baede4bc40d6be2b4c2c2689aa194c5e1b2b9db Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 11 Apr 2025 14:27:55 +0800 Subject: [PATCH 3/4] Update obs.example.toml --- config/obs.example.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/obs.example.toml b/config/obs.example.toml index 0ba434ad..b69ba338 100644 --- a/config/obs.example.toml +++ b/config/obs.example.toml @@ -6,7 +6,7 @@ meter_interval = 30 service_name = "rustfs" service_version = "0.1.0" environment = "develop" -looger_level = "info" +logger_level = "info" [sinks] [sinks.kafka] # Kafka sink is disabled by default @@ -30,4 +30,4 @@ batch_size = 100 batch_timeout_ms = 1000 # Default is 8192 bytes if not specified [logger] -queue_capacity = 10000 \ No newline at end of file +queue_capacity = 10000 From 1b24fbdb009b1a3c7c793f7dc3fa48a874f58a5a Mon Sep 17 00:00:00 2001 From: Nugine Date: Fri, 11 Apr 2025 16:01:46 +0800 Subject: [PATCH 4/4] fix: upgrade s3s --- Cargo.lock | 75 ++++++++++++++++++++++++++------------ Cargo.toml | 6 +-- rustfs/src/admin/router.rs | 6 ++- rustfs/src/main.rs | 5 +-- rustfs/src/storage/ecfs.rs | 6 +-- 5 files changed, 62 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60bd0acb..ed330cd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,7 +388,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.8.0", + "indexmap 2.9.0", "lexical-core", "memchr", "num", @@ -1626,6 +1626,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32c" version = "0.6.8" @@ -1644,6 +1659,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crc64fast-nvme" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4955638f00a809894c947f85a024020a20815b65a5eea633798ea7924edab2b3" +dependencies = [ + "crc", +] + [[package]] name = "crossbeam-channel" version = "0.5.14" @@ -1933,7 +1957,7 @@ dependencies = [ "base64 0.22.1", "half", "hashbrown 0.14.5", - "indexmap 2.8.0", + "indexmap 2.9.0", "libc", "log", "object_store", @@ -2028,7 +2052,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr-common", - "indexmap 2.8.0", + "indexmap 2.9.0", "paste", "recursive", "serde_json", @@ -2043,7 +2067,7 @@ checksum = "18f0a851a436c5a2139189eb4617a54e6a9ccb9edc96c4b3c83b3bb7c58b950e" dependencies = [ "arrow", "datafusion-common", - "indexmap 2.8.0", + "indexmap 2.9.0", "itertools 0.14.0", "paste", ] @@ -2197,7 +2221,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "indexmap 2.8.0", + "indexmap 2.9.0", "itertools 0.14.0", "log", "recursive", @@ -2220,7 +2244,7 @@ dependencies = [ "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", - "indexmap 2.8.0", + "indexmap 2.9.0", "itertools 0.14.0", "log", "paste", @@ -2282,7 +2306,7 @@ dependencies = [ "futures", "half", "hashbrown 0.14.5", - "indexmap 2.8.0", + "indexmap 2.9.0", "itertools 0.14.0", "log", "parking_lot 0.12.3", @@ -2300,7 +2324,7 @@ dependencies = [ "bigdecimal", "datafusion-common", "datafusion-expr", - "indexmap 2.8.0", + "indexmap 2.9.0", "log", "recursive", "regex", @@ -3816,7 +3840,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.8.0", + "indexmap 2.9.0", "slab", "tokio", "tokio-util", @@ -4301,9 +4325,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" +checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -6000,7 +6024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ "fixedbitset", - "indexmap 2.8.0", + "indexmap 2.9.0", ] [[package]] @@ -7338,8 +7362,8 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "s3s" -version = "0.11.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=ab139f72fe768fb9d8cecfe36269451da1ca9779#ab139f72fe768fb9d8cecfe36269451da1ca9779" +version = "0.12.0-dev" +source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3" dependencies = [ "arrayvec", "async-trait", @@ -7348,17 +7372,20 @@ dependencies = [ "bytes", "bytestring", "chrono", + "const-str", "crc32c", "crc32fast", - "digest 0.11.0-pre.10", + "crc64fast-nvme", "futures", "hex-simd", "hmac 0.13.0-pre.5", + "http", "http-body", "http-body-util", "httparse", "hyper", "itoa 1.0.15", + "md-5", "memchr", "mime", "nom", @@ -7385,10 +7412,10 @@ dependencies = [ [[package]] name = "s3s-policy" -version = "0.11.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=ab139f72fe768fb9d8cecfe36269451da1ca9779#ab139f72fe768fb9d8cecfe36269451da1ca9779" +version = "0.12.0-dev" +source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.9.0", "serde", "serde_json", "thiserror 2.0.12", @@ -7839,9 +7866,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" +checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" [[package]] name = "snafu" @@ -8455,7 +8482,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.9.0", "toml_datetime", "winnow 0.5.40", ] @@ -8466,7 +8493,7 @@ version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.9.0", "toml_datetime", "winnow 0.5.40", ] @@ -8477,7 +8504,7 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.9.0", "serde", "serde_spanned", "toml_datetime", @@ -8596,7 +8623,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.8.0", + "indexmap 2.9.0", "pin-project-lite", "slab", "sync_wrapper", diff --git a/Cargo.toml b/Cargo.toml index 9f1b721c..780477d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,10 +101,8 @@ rust-embed = "8.6.0" rustls = { version = "0.23" } rustls-pki-types = "1.11.0" rustls-pemfile = "2.2.0" -s3s = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779", default-features = true, features = [ - "tower", -] } -s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779" } +s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" } +s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" } shadow-rs = { version = "0.38.0", default-features = false } serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.138" diff --git a/rustfs/src/admin/router.rs b/rustfs/src/admin/router.rs index 4fb605d6..d19d4da2 100644 --- a/rustfs/src/admin/router.rs +++ b/rustfs/src/admin/router.rs @@ -67,14 +67,16 @@ where uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) } - async fn call(&self, req: S3Request) -> S3Result> { + async fn call(&self, req: S3Request) -> S3Result> { let uri = format!("{}|{}", &req.method, req.uri.path()); // warn!("get uri {}", &uri); if let Ok(mat) = self.router.at(&uri) { let op: &T = mat.value; - return op.call(req, mat.params).await; + let mut resp = op.call(req, mat.params).await?; + resp.status = Some(resp.output.0); + return Ok(resp.map_output(|x| x.1)); } return Err(s3_error!(NotImplemented)); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index cd1c72d1..f2b69999 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -226,7 +226,7 @@ async fn run(opt: config::Opt) -> Result<()> { // Setup S3 service // 本项目使用 s3s 库来实现 s3 服务 - let service = { + let s3_service = { let store = storage::ecfs::FS::new(); // let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?); let mut b = S3ServiceBuilder::new(store.clone()); @@ -308,11 +308,10 @@ async fn run(opt: config::Opt) -> Result<()> { let mut sigterm_inner = sigterm_inner; let mut sigint_inner = sigint_inner; - let hyper_service = service.into_shared(); let hybrid_service = TowerToHyperService::new( tower::ServiceBuilder::new() .layer(CorsLayer::permissive()) - .service(hybrid(hyper_service, rpc_service)), + .service(hybrid(s3_service, rpc_service)), ); let http_server = ConnBuilder::new(TokioExecutor::new()); diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index a235542b..f1f53b9f 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1069,11 +1069,11 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self))] async fn get_bucket_tagging(&self, req: S3Request) -> S3Result> { - let GetBucketTaggingInput { bucket, .. } = req.input; + let bucket = req.input.bucket.clone(); // check bucket exists. let _bucket = self - .head_bucket(S3Request::new(HeadBucketInput { - bucket: bucket.clone(), + .head_bucket(req.map_input(|input| HeadBucketInput { + bucket: input.bucket, expected_bucket_owner: None, })) .await?;