Merge branch 'main' of github.com:rustfs/s3-rustfs into feature/Systemd.service

This commit is contained in:
houseme
2025-04-11 16:48:46 +08:00
24 changed files with 802 additions and 207 deletions

76
Cargo.lock generated
View File

@@ -388,7 +388,7 @@ dependencies = [
"arrow-schema",
"chrono",
"half",
"indexmap 2.8.0",
"indexmap 2.9.0",
"lexical-core",
"memchr",
"num",
@@ -1637,6 +1637,21 @@ dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc32c"
version = "0.6.8"
@@ -1655,6 +1670,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crc64fast-nvme"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4955638f00a809894c947f85a024020a20815b65a5eea633798ea7924edab2b3"
dependencies = [
"crc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.14"
@@ -1944,7 +1968,7 @@ dependencies = [
"base64 0.22.1",
"half",
"hashbrown 0.14.5",
"indexmap 2.8.0",
"indexmap 2.9.0",
"libc",
"log",
"object_store",
@@ -2039,7 +2063,7 @@ dependencies = [
"datafusion-functions-aggregate-common",
"datafusion-functions-window-common",
"datafusion-physical-expr-common",
"indexmap 2.8.0",
"indexmap 2.9.0",
"paste",
"recursive",
"serde_json",
@@ -2054,7 +2078,7 @@ checksum = "18f0a851a436c5a2139189eb4617a54e6a9ccb9edc96c4b3c83b3bb7c58b950e"
dependencies = [
"arrow",
"datafusion-common",
"indexmap 2.8.0",
"indexmap 2.9.0",
"itertools 0.14.0",
"paste",
]
@@ -2208,7 +2232,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"indexmap 2.8.0",
"indexmap 2.9.0",
"itertools 0.14.0",
"log",
"recursive",
@@ -2231,7 +2255,7 @@ dependencies = [
"datafusion-physical-expr-common",
"half",
"hashbrown 0.14.5",
"indexmap 2.8.0",
"indexmap 2.9.0",
"itertools 0.14.0",
"log",
"paste",
@@ -2293,7 +2317,7 @@ dependencies = [
"futures",
"half",
"hashbrown 0.14.5",
"indexmap 2.8.0",
"indexmap 2.9.0",
"itertools 0.14.0",
"log",
"parking_lot 0.12.3",
@@ -2311,7 +2335,7 @@ dependencies = [
"bigdecimal",
"datafusion-common",
"datafusion-expr",
"indexmap 2.8.0",
"indexmap 2.9.0",
"log",
"recursive",
"regex",
@@ -3827,7 +3851,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http",
"indexmap 2.8.0",
"indexmap 2.9.0",
"slab",
"tokio",
"tokio-util",
@@ -4312,9 +4336,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.8.0"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
@@ -4873,6 +4897,7 @@ dependencies = [
"common",
"humantime",
"hyper",
"s3s",
"serde",
"serde_json",
"time",
@@ -6010,7 +6035,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
dependencies = [
"fixedbitset",
"indexmap 2.8.0",
"indexmap 2.9.0",
]
[[package]]
@@ -7349,8 +7374,8 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "s3s"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=ab139f72fe768fb9d8cecfe36269451da1ca9779#ab139f72fe768fb9d8cecfe36269451da1ca9779"
version = "0.12.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3"
dependencies = [
"arrayvec",
"async-trait",
@@ -7359,17 +7384,20 @@ dependencies = [
"bytes",
"bytestring",
"chrono",
"const-str",
"crc32c",
"crc32fast",
"digest 0.11.0-pre.10",
"crc64fast-nvme",
"futures",
"hex-simd",
"hmac 0.13.0-pre.5",
"http",
"http-body",
"http-body-util",
"httparse",
"hyper",
"itoa 1.0.15",
"md-5",
"memchr",
"mime",
"nom",
@@ -7396,10 +7424,10 @@ dependencies = [
[[package]]
name = "s3s-policy"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=ab139f72fe768fb9d8cecfe36269451da1ca9779#ab139f72fe768fb9d8cecfe36269451da1ca9779"
version = "0.12.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3"
dependencies = [
"indexmap 2.8.0",
"indexmap 2.9.0",
"serde",
"serde_json",
"thiserror 2.0.12",
@@ -7850,9 +7878,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.14.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd"
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
[[package]]
name = "snafu"
@@ -8466,7 +8494,7 @@ version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap 2.8.0",
"indexmap 2.9.0",
"toml_datetime",
"winnow 0.5.40",
]
@@ -8477,7 +8505,7 @@ version = "0.20.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81"
dependencies = [
"indexmap 2.8.0",
"indexmap 2.9.0",
"toml_datetime",
"winnow 0.5.40",
]
@@ -8488,7 +8516,7 @@ version = "0.22.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474"
dependencies = [
"indexmap 2.8.0",
"indexmap 2.9.0",
"serde",
"serde_spanned",
"toml_datetime",
@@ -8607,7 +8635,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"indexmap 2.8.0",
"indexmap 2.9.0",
"pin-project-lite",
"slab",
"sync_wrapper",

View File

@@ -102,10 +102,8 @@ rust-embed = "8.6.0"
rustls = { version = "0.23.26" }
rustls-pki-types = "1.11.0"
rustls-pemfile = "2.2.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779", default-features = true, features = [
"tower",
] }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779" }
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" }
shadow-rs = { version = "0.38.0", default-features = false }
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"

View File

@@ -30,4 +30,4 @@ batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[logger]
queue_capacity = 10000
queue_capacity = 10000

View File

@@ -189,13 +189,7 @@ pub fn lookup_config(kvs: &KVS, set_drive_count: usize) -> Result<Config> {
validate_parity_inner(standard.parity, rrs.parity, set_drive_count)?;
let optimize = {
if let Ok(ev) = env::var(OPTIMIZE_ENV) {
Some(ev)
} else {
None
}
};
let optimize = { env::var(OPTIMIZE_ENV).ok() };
let inline_block = {
if let Ok(ev) = env::var(INLINE_BLOCK_ENV) {

View File

@@ -1232,7 +1232,7 @@ impl DiskAPI for LocalDisk {
.join(path)
.join(fi.data_dir.map_or("".to_string(), |dir| dir.to_string()))
.join(format!("part.{}", part.number));
let err = match self
let err = (self
.bitrot_verify(
&part_path,
erasure.shard_file_size(part.size),
@@ -1240,11 +1240,8 @@ impl DiskAPI for LocalDisk {
&checksum_info.hash,
erasure.shard_size(erasure.block_size),
)
.await
{
Ok(_) => None,
Err(err) => Some(err),
};
.await)
.err();
resp.results[i] = conv_part_err_to_int(&err);
if resp.results[i] == CHECK_PART_UNKNOWN {
if let Some(err) = err {

View File

@@ -406,10 +406,7 @@ impl HealSequence {
async fn traverse_and_heal(h: Arc<HealSequence>) {
let buckets_only = false;
let result = match Self::heal_items(h.clone(), buckets_only).await {
Ok(_) => None,
Err(err) => Some(err),
};
let result = (Self::heal_items(h.clone(), buckets_only).await).err();
let _ = h.traverse_and_heal_done_tx.read().await.send(result).await;
}

View File

@@ -80,12 +80,7 @@ impl S3PeerSys {
let mut futures = Vec::with_capacity(self.clients.len());
for client in self.clients.iter() {
// client_clon
futures.push(async move {
match client.get_bucket_info(bucket, &BucketOptions::default()).await {
Ok(_) => None,
Err(err) => Some(err),
}
});
futures.push(async move { (client.get_bucket_info(bucket, &BucketOptions::default()).await).err() });
}
let errs = join_all(futures).await;

View File

@@ -1230,7 +1230,7 @@ impl SetDisks {
let shallow_versions: Vec<Vec<FileMetaShallowVersion>> = metadata_shallow_versions.iter().flatten().cloned().collect();
let read_quorum = (fileinfos.len() + 1) / 2;
let read_quorum = fileinfos.len().div_ceil(2);
let versions = merge_file_meta_versions(read_quorum, false, 1, &shallow_versions);
let meta = FileMeta {
versions,
@@ -5085,7 +5085,7 @@ fn is_object_dang_ling(
});
if !valid_meta.is_valid() {
let data_blocks = (meta_arr.len() + 1) / 2;
let data_blocks = meta_arr.len().div_ceil(2);
if not_found_parts_errs > data_blocks {
return Ok(valid_meta);
}
@@ -5098,7 +5098,7 @@ fn is_object_dang_ling(
}
if valid_meta.deleted {
let data_blocks = (errs.len() + 1) / 2;
let data_blocks = errs.len().div_ceil(2);
if not_found_meta_errs > data_blocks {
return Ok(valid_meta);
}
@@ -5313,7 +5313,7 @@ async fn disks_with_all_parts(
if let Some(data) = &meta.data {
let checksum_info = meta.erasure.get_checksum_info(meta.parts[0].number);
let data_len = data.len();
let verify_err = match bitrot_verify(
let verify_err = (bitrot_verify(
Box::new(Cursor::new(data.clone())),
data_len,
meta.erasure.shard_file_size(meta.size),
@@ -5321,11 +5321,8 @@ async fn disks_with_all_parts(
checksum_info.hash,
meta.erasure.shard_size(meta.erasure.block_size),
)
.await
{
Ok(_) => None,
Err(err) => Some(err),
};
.await)
.err();
if let Some(vec) = data_errs_by_part.get_mut(&0) {
if index < vec.len() {

View File

@@ -698,7 +698,7 @@ impl ECStore {
futures.push(async move {
let mut ask_disks = get_list_quorum(&opts.ask_disks, set.set_drive_count as i32);
if ask_disks == -1 {
let new_disks = get_quorum_disks(&disks, &infos, (disks.len() + 1) / 2);
let new_disks = get_quorum_disks(&disks, &infos, disks.len().div_ceil(2));
if !new_disks.is_empty() {
disks = new_disks;
} else {
@@ -1156,13 +1156,7 @@ async fn merge_entry_channels(
if let Some(entry) = &best {
let mut versions = Vec::with_capacity(to_merge.len() + 1);
let mut has_xl = {
if let Ok(meta) = entry.clone().xl_meta() {
Some(meta)
} else {
None
}
};
let mut has_xl = { entry.clone().xl_meta().ok() };
if let Some(x) = &has_xl {
versions.push(x.versions.clone());
@@ -1229,7 +1223,7 @@ impl SetDisks {
let mut ask_disks = get_list_quorum(&opts.ask_disks, self.set_drive_count as i32);
if ask_disks == -1 {
let new_disks = get_quorum_disks(&disks, &infos, (disks.len() + 1) / 2);
let new_disks = get_quorum_disks(&disks, &infos, disks.len().div_ceil(2));
if !new_disks.is_empty() {
disks = new_disks;
ask_disks = 1;

View File

@@ -675,11 +675,11 @@ impl Store for ObjectStore {
async fn load_all(&self, cache: &Cache) -> Result<()> {
let listed_config_items = self.list_all_iamconfig_items().await?;
let mut policy_docs_cache = CacheEntity::new(get_default_policyes());
if let Some(policies_list) = listed_config_items.get(POLICIES_LIST_KEY) {
let mut policies_list = policies_list.clone();
let mut policy_docs_cache = CacheEntity::new(get_default_policyes());
loop {
if policies_list.len() < 32 {
let policy_docs = self.load_policy_doc_concurrent(&policies_list).await?;
@@ -712,10 +712,10 @@ impl Store for ObjectStore {
policies_list = policies_list.split_off(32);
}
cache.policy_docs.store(Arc::new(policy_docs_cache.update_load_time()));
}
cache.policy_docs.store(Arc::new(policy_docs_cache.update_load_time()));
let mut user_items_cache = CacheEntity::default();
// users

View File

@@ -121,6 +121,18 @@ impl<T: Store> IamSys<T> {
// TODO: notification
}
pub async fn get_role_policy(&self, arn_str: &str) -> Result<(ARN, String)> {
let Some(arn) = ARN::parse(arn_str).ok() else {
return Err(Error::msg("Invalid ARN"));
};
let Some(policy) = self.roles_map.get(&arn) else {
return Err(Error::msg("No such role"));
};
Ok((arn, policy.clone()))
}
pub async fn delete_user(&self, name: &str, _notify: bool) -> Result<()> {
self.store.delete_user(name, UserType::Reg).await
// TODO: notification

View File

@@ -18,3 +18,4 @@ serde.workspace = true
serde_json.workspace = true
time.workspace = true
tracing.workspace = true
s3s.workspace = true

View File

@@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use time::OffsetDateTime;
use crate::BackendInfo;
#[derive(Debug, Serialize, Deserialize, Default, PartialEq, Eq)]
pub enum AccountStatus {
#[serde(rename = "enabled")]
@@ -221,3 +224,40 @@ impl UpdateServiceAccountReq {
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct AccountInfo {
pub account_name: String,
pub server: BackendInfo,
pub policy: serde_json::Value, // Use iam/policy::parse to parse the result, to be done by the caller.
pub buckets: Vec<BucketAccessInfo>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct BucketAccessInfo {
pub name: String,
pub size: u64,
pub objects: u64,
pub object_sizes_histogram: HashMap<String, u64>,
pub object_versions_histogram: HashMap<String, u64>,
pub details: Option<BucketDetails>,
pub prefix_usage: HashMap<String, u64>,
#[serde(rename = "expiration", with = "time::serde::rfc3339::option")]
pub created: Option<OffsetDateTime>,
pub access: AccountAccess,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct BucketDetails {
pub versioning: bool,
pub versioning_suspended: bool,
pub locking: bool,
pub replication: bool,
// pub tagging: Option<Tagging>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct AccountAccess {
pub read: bool,
pub write: bool,
}

View File

@@ -54,6 +54,7 @@ pub enum Action {
AdminAction(AdminAction),
StsAction(StsAction),
KmsAction(KmsAction),
None,
}
impl Action {
@@ -69,6 +70,7 @@ impl From<&Action> for &str {
Action::AdminAction(s) => s.into(),
Action::StsAction(s) => s.into(),
Action::KmsAction(s) => s.into(),
Action::None => "",
}
}
}
@@ -232,35 +234,254 @@ pub enum S3Action {
PutObjectFanOutAction,
}
// #[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone, EnumString, IntoStaticStr, Debug, Copy)]
// #[serde(try_from = "&str", into = "&str")]
// pub enum AdminAction {
// #[strum(serialize = "admin:*")]
// AllActions,
// #[strum(serialize = "admin:Profiling")]
// ProfilingAdminAction,
// #[strum(serialize = "admin:ServerTrace")]
// TraceAdminAction,
// #[strum(serialize = "admin:ConsoleLog")]
// ConsoleLogAdminAction,
// #[strum(serialize = "admin:ServerInfo")]
// ServerInfoAdminAction,
// #[strum(serialize = "admin:OBDInfo")]
// HealthInfoAdminAction,
// #[strum(serialize = "admin:TopLocksInfo")]
// TopLocksAdminAction,
// #[strum(serialize = "admin:LicenseInfo")]
// LicenseInfoAdminAction,
// #[strum(serialize = "admin:BandwidthMonitor")]
// BandwidthMonitorAction,
// #[strum(serialize = "admin:InspectData")]
// InspectDataAction,
// #[strum(serialize = "admin:Prometheus")]
// PrometheusAdminAction,
// #[strum(serialize = "admin:ListServiceAccounts")]
// ListServiceAccountsAdminAction,
// #[strum(serialize = "admin:CreateServiceAccount")]
// CreateServiceAccountAdminAction,
// }
// AdminAction - admin policy action.
#[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone, EnumString, IntoStaticStr, Debug, Copy)]
#[serde(try_from = "&str", into = "&str")]
pub enum AdminAction {
#[strum(serialize = "admin:*")]
AllActions,
#[strum(serialize = "admin:Heal")]
HealAdminAction,
#[strum(serialize = "admin:Decommission")]
DecommissionAdminAction,
#[strum(serialize = "admin:Rebalance")]
RebalanceAdminAction,
#[strum(serialize = "admin:StorageInfo")]
StorageInfoAdminAction,
#[strum(serialize = "admin:Prometheus")]
PrometheusAdminAction,
#[strum(serialize = "admin:DataUsageInfo")]
DataUsageInfoAdminAction,
#[strum(serialize = "admin:ForceUnlock")]
ForceUnlockAdminAction,
#[strum(serialize = "admin:TopLocksInfo")]
TopLocksAdminAction,
#[strum(serialize = "admin:Profiling")]
ProfilingAdminAction,
#[strum(serialize = "admin:ServerTrace")]
TraceAdminAction,
#[strum(serialize = "admin:ConsoleLog")]
ConsoleLogAdminAction,
#[strum(serialize = "admin:KMSCreateKey")]
KMSCreateKeyAdminAction,
#[strum(serialize = "admin:KMSKeyStatus")]
KMSKeyStatusAdminAction,
#[strum(serialize = "admin:ServerInfo")]
ServerInfoAdminAction,
#[strum(serialize = "admin:OBDInfo")]
HealthInfoAdminAction,
#[strum(serialize = "admin:TopLocksInfo")]
TopLocksAdminAction,
#[strum(serialize = "admin:LicenseInfo")]
LicenseInfoAdminAction,
#[strum(serialize = "admin:BandwidthMonitor")]
BandwidthMonitorAction,
#[strum(serialize = "admin:InspectData")]
InspectDataAction,
#[strum(serialize = "admin:Prometheus")]
PrometheusAdminAction,
#[strum(serialize = "admin:ListServiceAccounts")]
ListServiceAccountsAdminAction,
#[strum(serialize = "admin:ServerUpdate")]
ServerUpdateAdminAction,
#[strum(serialize = "admin:ServiceRestart")]
ServiceRestartAdminAction,
#[strum(serialize = "admin:ServiceStop")]
ServiceStopAdminAction,
#[strum(serialize = "admin:ServiceFreeze")]
ServiceFreezeAdminAction,
#[strum(serialize = "admin:ConfigUpdate")]
ConfigUpdateAdminAction,
#[strum(serialize = "admin:CreateUser")]
CreateUserAdminAction,
#[strum(serialize = "admin:DeleteUser")]
DeleteUserAdminAction,
#[strum(serialize = "admin:ListUsers")]
ListUsersAdminAction,
#[strum(serialize = "admin:EnableUser")]
EnableUserAdminAction,
#[strum(serialize = "admin:DisableUser")]
DisableUserAdminAction,
#[strum(serialize = "admin:GetUser")]
GetUserAdminAction,
#[strum(serialize = "admin:SiteReplicationAdd")]
SiteReplicationAddAction,
#[strum(serialize = "admin:SiteReplicationDisable")]
SiteReplicationDisableAction,
#[strum(serialize = "admin:SiteReplicationRemove")]
SiteReplicationRemoveAction,
#[strum(serialize = "admin:SiteReplicationResync")]
SiteReplicationResyncAction,
#[strum(serialize = "admin:SiteReplicationInfo")]
SiteReplicationInfoAction,
#[strum(serialize = "admin:SiteReplicationOperation")]
SiteReplicationOperationAction,
#[strum(serialize = "admin:CreateServiceAccount")]
CreateServiceAccountAdminAction,
#[strum(serialize = "admin:UpdateServiceAccount")]
UpdateServiceAccountAdminAction,
#[strum(serialize = "admin:RemoveServiceAccount")]
RemoveServiceAccountAdminAction,
#[strum(serialize = "admin:ListServiceAccounts")]
ListServiceAccountsAdminAction,
#[strum(serialize = "admin:ListTemporaryAccounts")]
ListTemporaryAccountsAdminAction,
#[strum(serialize = "admin:AddUserToGroup")]
AddUserToGroupAdminAction,
#[strum(serialize = "admin:RemoveUserFromGroup")]
RemoveUserFromGroupAdminAction,
#[strum(serialize = "admin:GetGroup")]
GetGroupAdminAction,
#[strum(serialize = "admin:ListGroups")]
ListGroupsAdminAction,
#[strum(serialize = "admin:EnableGroup")]
EnableGroupAdminAction,
#[strum(serialize = "admin:DisableGroup")]
DisableGroupAdminAction,
#[strum(serialize = "admin:CreatePolicy")]
CreatePolicyAdminAction,
#[strum(serialize = "admin:DeletePolicy")]
DeletePolicyAdminAction,
#[strum(serialize = "admin:GetPolicy")]
GetPolicyAdminAction,
#[strum(serialize = "admin:AttachUserOrGroupPolicy")]
AttachPolicyAdminAction,
#[strum(serialize = "admin:UpdatePolicyAssociation")]
UpdatePolicyAssociationAction,
#[strum(serialize = "admin:ListUserPolicies")]
ListUserPoliciesAdminAction,
#[strum(serialize = "admin:SetBucketQuota")]
SetBucketQuotaAdminAction,
#[strum(serialize = "admin:GetBucketQuota")]
GetBucketQuotaAdminAction,
#[strum(serialize = "admin:SetBucketTarget")]
SetBucketTargetAction,
#[strum(serialize = "admin:GetBucketTarget")]
GetBucketTargetAction,
#[strum(serialize = "admin:ReplicationDiff")]
ReplicationDiff,
#[strum(serialize = "admin:ImportBucketMetadata")]
ImportBucketMetadataAction,
#[strum(serialize = "admin:ExportBucketMetadata")]
ExportBucketMetadataAction,
#[strum(serialize = "admin:SetTier")]
SetTierAction,
#[strum(serialize = "admin:ListTier")]
ListTierAction,
#[strum(serialize = "admin:ExportIAM")]
ExportIAMAction,
#[strum(serialize = "admin:ImportIAM")]
ImportIAMAction,
#[strum(serialize = "admin:ListBatchJobs")]
ListBatchJobsAction,
#[strum(serialize = "admin:DescribeBatchJob")]
DescribeBatchJobAction,
#[strum(serialize = "admin:StartBatchJob")]
StartBatchJobAction,
#[strum(serialize = "admin:CancelBatchJob")]
CancelBatchJobAction,
#[strum(serialize = "admin:*")]
AllAdminActions,
}
impl AdminAction {
// IsValid - checks if action is valid or not.
pub fn is_valid(&self) -> bool {
matches!(
self,
AdminAction::HealAdminAction
| AdminAction::DecommissionAdminAction
| AdminAction::RebalanceAdminAction
| AdminAction::StorageInfoAdminAction
| AdminAction::PrometheusAdminAction
| AdminAction::DataUsageInfoAdminAction
| AdminAction::ForceUnlockAdminAction
| AdminAction::TopLocksAdminAction
| AdminAction::ProfilingAdminAction
| AdminAction::TraceAdminAction
| AdminAction::ConsoleLogAdminAction
| AdminAction::KMSCreateKeyAdminAction
| AdminAction::KMSKeyStatusAdminAction
| AdminAction::ServerInfoAdminAction
| AdminAction::HealthInfoAdminAction
| AdminAction::LicenseInfoAdminAction
| AdminAction::BandwidthMonitorAction
| AdminAction::InspectDataAction
| AdminAction::ServerUpdateAdminAction
| AdminAction::ServiceRestartAdminAction
| AdminAction::ServiceStopAdminAction
| AdminAction::ServiceFreezeAdminAction
| AdminAction::ConfigUpdateAdminAction
| AdminAction::CreateUserAdminAction
| AdminAction::DeleteUserAdminAction
| AdminAction::ListUsersAdminAction
| AdminAction::EnableUserAdminAction
| AdminAction::DisableUserAdminAction
| AdminAction::GetUserAdminAction
| AdminAction::SiteReplicationAddAction
| AdminAction::SiteReplicationDisableAction
| AdminAction::SiteReplicationRemoveAction
| AdminAction::SiteReplicationResyncAction
| AdminAction::SiteReplicationInfoAction
| AdminAction::SiteReplicationOperationAction
| AdminAction::CreateServiceAccountAdminAction
| AdminAction::UpdateServiceAccountAdminAction
| AdminAction::RemoveServiceAccountAdminAction
| AdminAction::ListServiceAccountsAdminAction
| AdminAction::ListTemporaryAccountsAdminAction
| AdminAction::AddUserToGroupAdminAction
| AdminAction::RemoveUserFromGroupAdminAction
| AdminAction::GetGroupAdminAction
| AdminAction::ListGroupsAdminAction
| AdminAction::EnableGroupAdminAction
| AdminAction::DisableGroupAdminAction
| AdminAction::CreatePolicyAdminAction
| AdminAction::DeletePolicyAdminAction
| AdminAction::GetPolicyAdminAction
| AdminAction::AttachPolicyAdminAction
| AdminAction::UpdatePolicyAssociationAction
| AdminAction::ListUserPoliciesAdminAction
| AdminAction::SetBucketQuotaAdminAction
| AdminAction::GetBucketQuotaAdminAction
| AdminAction::SetBucketTargetAction
| AdminAction::GetBucketTargetAction
| AdminAction::ReplicationDiff
| AdminAction::ImportBucketMetadataAction
| AdminAction::ExportBucketMetadataAction
| AdminAction::SetTierAction
| AdminAction::ListTierAction
| AdminAction::ExportIAMAction
| AdminAction::ImportIAMAction
| AdminAction::ListBatchJobsAction
| AdminAction::DescribeBatchJobAction
| AdminAction::StartBatchJobAction
| AdminAction::CancelBatchJobAction
| AdminAction::AllAdminActions
)
}
}
#[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone, EnumString, IntoStaticStr, Debug, Copy)]

View File

@@ -240,7 +240,7 @@ fn get_values_from_claims(claims: &HashMap<String, Value>, claim_name: &str) ->
(s, false)
}
fn get_policies_from_claims(claims: &HashMap<String, Value>, policy_claim_name: &str) -> (HashSet<String>, bool) {
pub fn get_policies_from_claims(claims: &HashMap<String, Value>, policy_claim_name: &str) -> (HashSet<String>, bool) {
get_values_from_claims(claims, policy_claim_name)
}
@@ -401,7 +401,7 @@ pub mod default {
effect: Effect::Allow,
actions: ActionSet({
let mut hash_set = HashSet::new();
hash_set.insert(Action::AdminAction(AdminAction::AllActions));
hash_set.insert(Action::AdminAction(AdminAction::AllAdminActions));
hash_set
}),
not_actions: ActionSet(Default::default()),

View File

@@ -44,7 +44,7 @@ http.workspace = true
http-body.workspace = true
iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
libsystemd = { workspace = true, optional = true }
lock.workspace = true
local-ip-address = { workspace = true }
matchit = { workspace = true }
@@ -72,7 +72,12 @@ shadow-rs.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] }
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
"net",
"signal",
] }
tokio-rustls.workspace = true
lazy_static.workspace = true
tokio-stream.workspace = true
@@ -102,7 +107,11 @@ ecstore = { path = "../ecstore" }
s3s.workspace = true
clap = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "time"] }
hyper-util = { workspace = true, features = ["tokio", "server-auto", "server-graceful"] }
hyper-util = { workspace = true, features = [
"tokio",
"server-auto",
"server-graceful",
] }
transform-stream = { workspace = true }
netif = "0.1.6"
shadow-rs.workspace = true

View File

@@ -1,12 +1,12 @@
use super::router::Operation;
use crate::auth::check_key_valid;
use crate::auth::get_condition_values;
use crate::auth::get_session_token;
use crate::storage::error::to_s3_error;
use ::policy::policy::action::{Action, S3Action};
use ::policy::policy::resource::Resource;
use ::policy::policy::statement::BPStatement;
use ::policy::policy::{ActionSet, BucketPolicy, Effect, ResourceSet};
use bytes::Bytes;
use common::error::Error as ec_Error;
use ecstore::admin_server_info::get_server_info;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::global::GLOBAL_ALlHealState;
use ecstore::heal::data_usage::load_data_usage_from_backend;
use ecstore::heal::heal_commands::HealOpts;
@@ -15,15 +15,23 @@ use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, Metri
use ecstore::new_object_layer_fn;
use ecstore::peer::is_reserved_or_invalid_bucket;
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::StorageAPI;
use ecstore::utils::path::path_join;
use ecstore::GLOBAL_Endpoints;
use futures::{Stream, StreamExt};
use http::{HeaderMap, Uri};
use hyper::StatusCode;
use iam::get_global_action_cred;
use iam::store::MappedPolicy;
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;
use matchit::Params;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::default::DEFAULT_POLICIES;
use policy::policy::Args;
use policy::policy::BucketPolicy;
use s3s::header::CONTENT_TYPE;
use s3s::stream::{ByteStream, DynByteStream};
use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result};
@@ -43,7 +51,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info, warn};
pub mod group;
pub mod policy;
pub mod policys;
pub mod service_account;
pub mod sts;
pub mod trace;
@@ -61,49 +69,187 @@ pub struct AccountInfoHandler {}
#[async_trait::async_trait]
impl Operation for AccountInfoHandler {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle AccountInfoHandler");
let Some(cred) = req.credentials else { return Err(s3_error!(InvalidRequest, "get cred failed")) };
warn!("AccountInfoHandler cread {:?}", &cred);
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// test policy
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let mut s3_all_act = HashSet::with_capacity(1);
s3_all_act.insert(Action::S3Action(S3Action::AllActions));
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let mut all_res = HashSet::with_capacity(1);
all_res.insert(Resource::S3("*".to_string()));
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let bucket_policy = BucketPolicy {
id: "".into(),
version: "2012-10-17".to_owned(),
statements: vec![BPStatement {
sid: "".into(),
effect: Effect::Allow,
actions: ActionSet(s3_all_act.clone()),
resources: ResourceSet(all_res),
let default_claims = HashMap::new();
let claims = cred.claims.as_ref().unwrap_or(&default_claims);
let cred_clone = cred.clone();
let conditions = get_condition_values(&req.headers, &cred_clone);
let cred_clone = Arc::new(cred_clone);
let conditions = Arc::new(conditions);
let is_allow = Box::new({
let iam_clone = Arc::clone(&iam_store);
let cred_clone = Arc::clone(&cred_clone);
let conditions = Arc::clone(&conditions);
move |name: String| {
let iam_clone = Arc::clone(&iam_clone);
let cred_clone = Arc::clone(&cred_clone);
let conditions = Arc::clone(&conditions);
async move {
let (mut rd, mut wr) = (false, false);
if !iam_clone
.is_allowed(&Args {
account: &cred_clone.access_key,
groups: &cred_clone.groups,
action: Action::S3Action(S3Action::ListBucketAction),
bucket: &name,
conditions: &conditions,
is_owner: owner,
object: "",
claims,
deny_only: false,
})
.await
{
rd = true
}
if !iam_clone
.is_allowed(&Args {
account: &cred_clone.access_key,
groups: &cred_clone.groups,
action: Action::S3Action(S3Action::GetBucketLocationAction),
bucket: &name,
conditions: &conditions,
is_owner: owner,
object: "",
claims,
deny_only: false,
})
.await
{
rd = true
}
if !iam_clone
.is_allowed(&Args {
account: &cred_clone.access_key,
groups: &cred_clone.groups,
action: Action::S3Action(S3Action::PutObjectAction),
bucket: &name,
conditions: &conditions,
is_owner: owner,
object: "",
claims,
deny_only: false,
})
.await
{
wr = true
}
(rd, wr)
}
}
});
let account_name = if cred.is_temp() || cred.is_service_account() {
cred.parent_user.clone()
} else {
cred.access_key.clone()
};
let claims_args = Args {
account: "",
groups: &None,
action: Action::None,
bucket: "",
conditions: &HashMap::new(),
is_owner: false,
object: "",
claims,
deny_only: false,
};
let role_arn = claims_args.get_role_arn();
// TODO: get_policies_from_claims(claims);
let Some(admin_cred) = get_global_action_cred() else {
return Err(S3Error::with_message(
S3ErrorCode::InternalError,
"get_global_action_cred failed".to_string(),
));
};
let mut effective_policy: policy::policy::Policy = Default::default();
if account_name == admin_cred.access_key {
for (name, p) in DEFAULT_POLICIES.iter() {
if *name == "consoleAdmin" {
effective_policy = p.clone();
break;
}
}
} else if let Some(arn) = role_arn {
let (_, policy_name) = iam_store
.get_role_policy(arn)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
let policies = MappedPolicy::new(&policy_name).to_slice();
effective_policy = iam_store.get_combined_policy(&policies).await;
} else {
let policies = iam_store
.policy_db_get(&account_name, &cred.groups)
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("get policy failed: {}", e)))?;
effective_policy = iam_store.get_combined_policy(&policies).await;
};
let policy_str = serde_json::to_string(&effective_policy)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse policy failed"))?;
let mut account_info = madmin::AccountInfo {
account_name,
server: store.backend_info().await,
policy: serde_json::Value::String(policy_str),
..Default::default()
};
// TODO: bucket policy
let buckets = store
.list_bucket(&BucketOptions {
cached: true,
..Default::default()
}],
};
})
.await
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, e.to_string()))?;
// let policy = bucket_policy
// .marshal_msg()
// .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse policy failed"))?;
for bucket in buckets.iter() {
let (rd, wr) = is_allow(bucket.name.clone()).await;
if rd || wr {
// TODO: BucketQuotaSys
// TODO: other attributes
account_info.buckets.push(madmin::BucketAccessInfo {
name: bucket.name.clone(),
details: Some(madmin::BucketDetails {
versioning: BucketVersioningSys::enabled(bucket.name.as_str()).await,
versioning_suspended: BucketVersioningSys::suspended(bucket.name.as_str()).await,
..Default::default()
}),
created: bucket.created,
access: madmin::AccountAccess { read: rd, write: wr },
..Default::default()
});
}
}
let backend_info = store.backend_info().await;
let info = AccountInfo {
account_name: cred.access_key,
server: backend_info,
policy: bucket_policy,
};
let data = serde_json::to_vec(&info)
let data = serde_json::to_vec(&account_info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
let mut header = HeaderMap::new();
@@ -128,8 +274,6 @@ pub struct ServerInfoHandler {}
#[async_trait::async_trait]
impl Operation for ServerInfoHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ServerInfoHandler");
let info = get_server_info(true).await;
let data = serde_json::to_vec(&info)

View File

@@ -1,5 +1,3 @@
use std::collections::HashMap;
use crate::admin::{router::Operation, utils::has_space_be};
use http::{HeaderMap, StatusCode};
use iam::{error::is_err_no_such_user, get_global_action_cred, store::MappedPolicy};
@@ -8,6 +6,7 @@ use policy::policy::Policy;
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
use std::collections::HashMap;
use tracing::warn;
#[derive(Debug, Deserialize, Default)]

View File

@@ -1,5 +1,5 @@
use crate::admin::utils::has_space_be;
use crate::auth::get_session_token;
use crate::auth::{get_condition_values, get_session_token};
use crate::{admin::router::Operation, auth::check_key_valid};
use http::HeaderMap;
use hyper::StatusCode;
@@ -13,7 +13,8 @@ use madmin::{
ServiceAccountInfo, UpdateServiceAccountReq,
};
use matchit::Params;
use policy::policy::Policy;
use policy::policy::action::{Action, AdminAction};
use policy::policy::{Args, Policy};
use s3s::S3ErrorCode::InvalidRequest;
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde::Deserialize;
@@ -30,7 +31,7 @@ impl Operation for AddServiceAccount {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, _owner) =
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &req_cred.access_key).await?;
let mut input = req.input;
@@ -91,6 +92,25 @@ impl Operation for AddServiceAccount {
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let deny_only = cred.access_key == target_user || cred.parent_user == target_user;
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::CreateServiceAccountAdminAction),
bucket: "",
conditions: &get_condition_values(&req.headers, &cred),
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only,
})
.await
{
return Err(s3_error!(AccessDenied, "access denied"));
}
if target_user != cred.access_key {
let has_user = iam_store.get_user(&target_user).await;
if has_user.is_none() && target_user != sys_cred.access_key {
@@ -212,7 +232,31 @@ impl Operation for UpdateServiceAccount {
update_req
.validate()
.map_err(|e| S3Error::with_message(InvalidRequest, e.to_string()))?;
// TODO: is_allowed
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::UpdateServiceAccountAdminAction),
bucket: "",
conditions: &get_condition_values(&req.headers, &cred),
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only: false,
})
.await
{
return Err(s3_error!(AccessDenied, "access denied"));
}
let sp = {
if let Some(policy) = update_req.new_policy {
let sp = Policy::parse_config(policy.as_bytes()).map_err(|e| {
@@ -280,7 +324,36 @@ impl Operation for InfoServiceAccount {
s3_error!(InternalError, "get service account failed")
})?;
// TODO: is_allowed
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::ListServiceAccountsAdminAction),
bucket: "",
conditions: &get_condition_values(&req.headers, &cred),
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only: false,
})
.await
{
let user = if cred.parent_user.is_empty() {
&cred.access_key
} else {
&cred.parent_user
};
if user != &svc_account.parent_user {
return Err(s3_error!(AccessDenied, "access denied"));
}
}
let implied_policy = if let Some(policy) = session_policy.as_ref() {
policy.version.is_empty() && policy.statements.is_empty()
@@ -359,7 +432,7 @@ impl Operation for ListServiceAccount {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, _owner) =
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key)
.await
.map_err(|e| {
@@ -367,22 +440,47 @@ impl Operation for ListServiceAccount {
s3_error!(InternalError, "check key failed")
})?;
let target_account = if let Some(user) = query.user {
if user != input_cred.access_key {
user
} else if cred.parent_user.is_empty() {
input_cred.access_key
} else {
cred.parent_user
// let target_account = if let Some(user) = query.user {
// if user != input_cred.access_key {
// user
// } else if cred.parent_user.is_empty() {
// input_cred.access_key
// } else {
// cred.parent_user
// }
// } else if cred.parent_user.is_empty() {
// input_cred.access_key
// } else {
// cred.parent_user
// };
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let target_account = if query.user.as_ref().is_some_and(|v| v != &cred.access_key) {
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::UpdateServiceAccountAdminAction),
bucket: "",
conditions: &get_condition_values(&req.headers, &cred),
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only: false,
})
.await
{
return Err(s3_error!(AccessDenied, "access denied"));
}
query.user.unwrap_or_default()
} else if cred.parent_user.is_empty() {
input_cred.access_key
cred.access_key
} else {
cred.parent_user
};
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let service_accounts = iam_store.list_service_accounts(&target_account).await.map_err(|e| {
debug!("list service account failed: {e:?}");
s3_error!(InternalError, "list service account failed")
@@ -420,7 +518,7 @@ impl Operation for DeleteServiceAccount {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (_cred, _owner) =
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key)
.await
.map_err(|e| {
@@ -444,7 +542,7 @@ impl Operation for DeleteServiceAccount {
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let _svc_account = match iam_store.get_service_account(&query.access_key).await {
let svc_account = match iam_store.get_service_account(&query.access_key).await {
Ok((res, _)) => Some(res),
Err(err) => {
if is_err_no_such_service_account(&err) {
@@ -455,7 +553,30 @@ impl Operation for DeleteServiceAccount {
}
};
// TODO: is_allowed
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::RemoveServiceAccountAdminAction),
bucket: "",
conditions: &get_condition_values(&req.headers, &cred),
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only: false,
})
.await
{
let user = if cred.parent_user.is_empty() {
&cred.access_key
} else {
&cred.parent_user
};
if svc_account.is_some_and(|v| &v.parent_user != user) {
return Err(s3_error!(InvalidRequest, "service account not exist"));
}
}
iam_store.delete_service_account(&query.access_key).await.map_err(|e| {
debug!("delete service account failed, e: {:?}", e);

View File

@@ -1,9 +1,13 @@
use std::str::from_utf8;
use std::{collections::HashMap, str::from_utf8};
use http::{HeaderMap, StatusCode};
use iam::get_global_action_cred;
use madmin::{AccountStatus, AddOrUpdateUserReq};
use matchit::Params;
use policy::policy::{
action::{Action, AdminAction},
Args,
};
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use serde::Deserialize;
use serde_urlencoded::from_bytes;
@@ -11,7 +15,7 @@ use tracing::warn;
use crate::{
admin::{router::Operation, utils::has_space_be},
auth::{check_key_valid, get_session_token},
auth::{check_key_valid, get_condition_values, get_session_token},
};
#[derive(Debug, Deserialize, Default)]
@@ -39,7 +43,7 @@ impl Operation for AddUser {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, _owner) =
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let ak = query.access_key.as_deref().unwrap_or_default();
@@ -63,8 +67,6 @@ impl Operation for AddUser {
let args: AddOrUpdateUserReq = serde_json::from_slice(&body)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("unmarshal body err {}", e)))?;
warn!("add user args {:?}", args);
if args.secret_key.is_empty() {
return Err(s3_error!(InvalidArgument, "access key is empty"));
}
@@ -89,13 +91,24 @@ impl Operation for AddUser {
return Err(s3_error!(InvalidArgument, "access key is not utf8"));
}
// let check_deny_only = if ak == cred.access_key {
// true
// } else {
// false
// };
// TODO: is_allowed
let deny_only = ak == cred.access_key;
let conditions = get_condition_values(&req.headers, &cred);
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::CreateUserAdminAction),
bucket: "",
conditions: &conditions,
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only,
})
.await
{
return Err(s3_error!(AccessDenied, "access denied"));
}
iam_store
.create_user(ak, &args)
@@ -113,8 +126,6 @@ pub struct SetUserStatus {}
#[async_trait::async_trait]
impl Operation for SetUserStatus {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle SetUserStatus");
let query = {
if let Some(query) = req.uri.query() {
let input: AddUserQuery =
@@ -165,8 +176,6 @@ pub struct ListUsers {}
#[async_trait::async_trait]
impl Operation for ListUsers {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ListUsers");
let query = {
if let Some(query) = req.uri.query() {
let input: BucketQuery =
@@ -214,8 +223,6 @@ pub struct RemoveUser {}
#[async_trait::async_trait]
impl Operation for RemoveUser {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RemoveUser");
let query = {
if let Some(query) = req.uri.query() {
let input: AddUserQuery =
@@ -277,8 +284,6 @@ pub struct GetUserInfo {}
#[async_trait::async_trait]
impl Operation for GetUserInfo {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle GetUserInfo");
let query = {
if let Some(query) = req.uri.query() {
let input: AddUserQuery =
@@ -297,6 +302,32 @@ impl Operation for GetUserInfo {
let Ok(iam_store) = iam::get() else { return Err(s3_error!(InvalidRequest, "iam not init")) };
let Some(input_cred) = req.credentials else {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let deny_only = ak == cred.access_key;
let conditions = get_condition_values(&req.headers, &cred);
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
groups: &cred.groups,
action: Action::AdminAction(AdminAction::GetUserAdminAction),
bucket: "",
conditions: &conditions,
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only,
})
.await
{
return Err(s3_error!(AccessDenied, "access denied"));
}
let info = iam_store
.get_user_info(ak)
.await

View File

@@ -6,10 +6,11 @@ pub mod utils;
use common::error::Result;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::{
group, policy,
group, policys,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
sts, user,
};
use hyper::Method;
use router::{AdminOperation, S3Router};
use rpc::regist_rpc_route;
@@ -231,35 +232,35 @@ fn regist_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-canned-policies").as_str(),
AdminOperation(&policy::ListCannedPolicies {}),
AdminOperation(&policys::ListCannedPolicies {}),
)?;
// info-canned-policy?name=xxx
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/info-canned-policy").as_str(),
AdminOperation(&policy::InfoCannedPolicy {}),
AdminOperation(&policys::InfoCannedPolicy {}),
)?;
// add-canned-policy?name=xxx
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/add-canned-policy").as_str(),
AdminOperation(&policy::AddCannedPolicy {}),
AdminOperation(&policys::AddCannedPolicy {}),
)?;
// remove-canned-policy?name=xxx
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-canned-policy").as_str(),
AdminOperation(&policy::RemoveCannedPolicy {}),
AdminOperation(&policys::RemoveCannedPolicy {}),
)?;
// set-user-or-group-policy?policyName=xxx&userOrGroup=xxx&isGroup=xxx
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-user-or-group-policy").as_str(),
AdminOperation(&policy::SetPolicyForUserOrGroup {}),
AdminOperation(&policys::SetPolicyForUserOrGroup {}),
)?;
Ok(())

View File

@@ -67,14 +67,16 @@ where
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX)
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<(StatusCode, Body)>> {
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
let uri = format!("{}|{}", &req.method, req.uri.path());
// warn!("get uri {}", &uri);
if let Ok(mat) = self.router.at(&uri) {
let op: &T = mat.value;
return op.call(req, mat.params).await;
let mut resp = op.call(req, mat.params).await?;
resp.status = Some(resp.output.0);
return Ok(resp.map_output(|x| x.1));
}
return Err(s3_error!(NotImplemented));

View File

@@ -205,7 +205,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// Setup S3 service
// 本项目使用 s3s 库来实现 s3 服务
let service = {
let s3_service = {
let store = storage::ecfs::FS::new();
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?);
let mut b = S3ServiceBuilder::new(store.clone());
@@ -293,11 +293,10 @@ async fn run(opt: config::Opt) -> Result<()> {
let mut sigterm_inner = sigterm_inner;
let mut sigint_inner = sigint_inner;
let hyper_service = service.into_shared();
let hybrid_service = TowerToHyperService::new(
tower::ServiceBuilder::new()
.layer(CorsLayer::permissive())
.service(hybrid(hyper_service, rpc_service)),
.service(hybrid(s3_service, rpc_service)),
);
let http_server = ConnBuilder::new(TokioExecutor::new());

View File

@@ -261,14 +261,7 @@ impl S3 for FS {
.await
.map_err(to_s3_error)?;
let version_id = opts
.version_id
.as_ref()
.map(|v| match Uuid::parse_str(v) {
Ok(id) => Some(id),
Err(_) => None,
})
.unwrap_or_default();
let version_id = opts.version_id.as_ref().map(|v| Uuid::parse_str(v).ok()).unwrap_or_default();
let dobj = ObjectToDelete {
object_name: key,
version_id,
@@ -325,14 +318,7 @@ impl S3 for FS {
.objects
.iter()
.map(|v| {
let version_id = v
.version_id
.as_ref()
.map(|v| match Uuid::parse_str(v) {
Ok(id) => Some(id),
Err(_) => None,
})
.unwrap_or_default();
let version_id = v.version_id.as_ref().map(|v| Uuid::parse_str(v).ok()).unwrap_or_default();
ObjectToDelete {
object_name: v.key.clone(),
version_id,
@@ -407,8 +393,6 @@ impl S3 for FS {
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
// mc get 3
// warn!("get_object input {:?}, vid {:?}", &req.input, req.input.version_id);
let GetObjectInput {
bucket,
key,
@@ -447,8 +431,6 @@ impl S3 for FS {
return Err(s3_error!(InvalidArgument, "range and part_number invalid"));
}
// let metadata = extract_metadata(&req.headers);
let opts: ObjectOptions = get_opts(&bucket, &key, version_id, part_number, &req.headers)
.await
.map_err(to_s3_error)?;
@@ -516,16 +498,49 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
// mc get 2
let HeadObjectInput { bucket, key, .. } = req.input;
let HeadObjectInput {
bucket,
key,
version_id,
part_number,
range,
..
} = req.input;
let part_number = part_number.map(|v| v as usize);
if let Some(part_num) = part_number {
if part_num == 0 {
return Err(s3_error!(InvalidArgument, "part_numer invalid"));
}
}
let rs = range.map(|v| match v {
Range::Int { first, last } => HTTPRangeSpec {
is_suffix_length: false,
start: first as usize,
end: last.map(|v| v as usize),
},
Range::Suffix { length } => HTTPRangeSpec {
is_suffix_length: true,
start: length as usize,
end: None,
},
});
if rs.is_some() && part_number.is_some() {
return Err(s3_error!(InvalidArgument, "range and part_number invalid"));
}
let opts: ObjectOptions = get_opts(&bucket, &key, version_id, part_number, &req.headers)
.await
.map_err(to_s3_error)?;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let info = store
.get_object_info(&bucket, &key, &ObjectOptions::default())
.await
.map_err(to_s3_error)?;
let info = store.get_object_info(&bucket, &key, &opts).await.map_err(to_s3_error)?;
// warn!("head_object info {:?}", &info);
@@ -1054,11 +1069,11 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self))]
async fn get_bucket_tagging(&self, req: S3Request<GetBucketTaggingInput>) -> S3Result<S3Response<GetBucketTaggingOutput>> {
let GetBucketTaggingInput { bucket, .. } = req.input;
let bucket = req.input.bucket.clone();
// check bucket exists.
let _bucket = self
.head_bucket(S3Request::new(HeadBucketInput {
bucket: bucket.clone(),
.head_bucket(req.map_input(|input| HeadBucketInput {
bucket: input.bucket,
expected_bucket_owner: None,
}))
.await?;