feat: Replace LRU cache with Moka async cache in policy variables (#1166)

Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
yxrxy
2025-12-17 00:19:31 +08:00
committed by GitHub
parent 17828ec2a8
commit 8821fcc1e7
15 changed files with 289 additions and 277 deletions

63
Cargo.lock generated
View File

@@ -63,17 +63,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ahash"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
dependencies = [
"getrandom 0.2.16",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.12"
@@ -296,7 +285,7 @@ version = "57.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a23eaff85a44e9fa914660fb0d0bb00b79c4a3d888b5334adb3ea4330c84f002"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-buffer",
"arrow-data",
"arrow-schema",
@@ -453,7 +442,7 @@ version = "57.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae980d021879ea119dd6e2a13912d81e64abed372d53163e804dfe84639d8010"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-data",
@@ -726,7 +715,7 @@ dependencies = [
"http 0.2.12",
"http 1.4.0",
"http-body 0.4.6",
"lru 0.12.5",
"lru",
"percent-encoding",
"regex-lite",
"sha2 0.10.9",
@@ -2293,7 +2282,7 @@ version = "51.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c10f7659e96127d25e8366be7c8be4109595d6a2c3eac70421f380a7006a1b0"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"arrow-ipc",
"chrono",
@@ -2554,7 +2543,7 @@ version = "51.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c25210520a9dcf9c2b2cbbce31ebd4131ef5af7fc60ee92b266dc7d159cb305"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-doc",
@@ -2575,7 +2564,7 @@ version = "51.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62f4a66f3b87300bb70f4124b55434d2ae3fe80455f3574701d0348da040b55d"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr-common",
@@ -2686,7 +2675,7 @@ version = "51.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c30cc8012e9eedcb48bbe112c6eff4ae5ed19cf3003cb0f505662e88b7014c5d"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr",
@@ -2723,7 +2712,7 @@ version = "51.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90da43e1ec550b172f34c87ec68161986ced70fd05c8d2a2add66eef9c276f03"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr-common",
@@ -2756,7 +2745,7 @@ version = "51.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0acf0ad6b6924c6b1aa7d213b181e012e2d3ec0a64ff5b10ee6282ab0f8532ac"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"arrow-ord",
"arrow-schema",
@@ -3955,9 +3944,6 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash 0.7.8",
]
[[package]]
name = "hashbrown"
@@ -3965,7 +3951,7 @@ version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash 0.8.12",
"ahash",
"allocator-api2",
]
@@ -4475,7 +4461,7 @@ version = "0.11.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash 0.8.12",
"ahash",
"indexmap 2.12.1",
"is-terminal",
"itoa",
@@ -4493,7 +4479,7 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d35223c50fdd26419a4ccea2c73be68bd2b29a3d7d6123ffe101c17f4c20a52a"
dependencies = [
"ahash 0.8.12",
"ahash",
"clap",
"crossbeam-channel",
"crossbeam-utils",
@@ -4907,15 +4893,6 @@ dependencies = [
"value-bag",
]
[[package]]
name = "lru"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a"
dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.12.5"
@@ -5070,7 +5047,7 @@ version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8"
dependencies = [
"ahash 0.8.12",
"ahash",
"portable-atomic",
]
@@ -5732,7 +5709,7 @@ version = "57.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be3e4f6d320dd92bfa7d612e265d7d08bba0a240bab86af3425e1d255a511d89"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-cast",
@@ -6073,6 +6050,12 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "pollster"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f3a9f18d041e6d0e102a0a46750538147e5e8992d3b4873aaafee2520b00ce3"
[[package]]
name = "poly1305"
version = "0.9.0-rc.3"
@@ -7330,6 +7313,7 @@ dependencies = [
"base64-simd",
"futures",
"jsonwebtoken",
"pollster",
"rand 0.10.0-rc.5",
"rustfs-crypto",
"rustfs-ecstore",
@@ -7483,11 +7467,14 @@ dependencies = [
name = "rustfs-policy"
version = "0.0.5"
dependencies = [
"async-trait",
"base64-simd",
"chrono",
"futures",
"ipnetwork",
"jsonwebtoken",
"lru 0.7.8",
"moka",
"pollster",
"rand 0.10.0-rc.5",
"regex",
"reqwest",

View File

@@ -103,6 +103,7 @@ axum-server = { version = "0.8.0", features = ["tls-rustls-no-provider"], defaul
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
pollster = "0.4.0"
hyper = { version = "1.8.1", features = ["http2", "http1", "server"] }
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "ring", "webpki-roots"] }
hyper-util = { version = "0.1.19", features = ["tokio", "server-auto", "server-graceful"] }
@@ -251,7 +252,6 @@ walkdir = "2.5.0"
wildmatch = { version = "2.6.1", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
lru = "0.7.1"
zip = "6.0.0"
zstd = "0.13.3"

View File

@@ -22,7 +22,7 @@ pub struct PolicySys {}
impl PolicySys {
pub async fn is_allowed(args: &BucketPolicyArgs<'_>) -> bool {
match Self::get(args.bucket).await {
Ok(cfg) => return cfg.is_allowed(args),
Ok(cfg) => return cfg.is_allowed(args).await,
Err(err) => {
if err != StorageError::ConfigNotFound {
info!("config get err {:?}", err);

View File

@@ -47,5 +47,7 @@ tracing.workspace = true
rustfs-madmin.workspace = true
rustfs-utils = { workspace = true, features = ["path"] }
tokio-util.workspace = true
pollster.workspace = true
[dev-dependencies]
pollster.workspace = true

View File

@@ -23,6 +23,7 @@ use crate::{
UpdateServiceAccountOpts,
},
};
use futures::future::join_all;
use rustfs_ecstore::global::get_global_action_cred;
use rustfs_madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
use rustfs_policy::{
@@ -402,13 +403,25 @@ where
self.cache.policy_docs.store(Arc::new(cache));
let ret = m
let items: Vec<_> = m.into_iter().map(|(k, v)| (k, v.policy.clone())).collect();
let futures: Vec<_> = items.iter().map(|(_, policy)| policy.match_resource(bucket_name)).collect();
let results = join_all(futures).await;
let filtered = items
.into_iter()
.filter(|(_, v)| bucket_name.is_empty() || v.policy.match_resource(bucket_name))
.map(|(k, v)| (k, v.policy))
.zip(results)
.filter_map(|((k, policy), matches)| {
if bucket_name.is_empty() || matches {
Some((k, policy))
} else {
None
}
})
.collect();
Ok(ret)
Ok(filtered)
}
pub async fn merge_policies(&self, name: &str) -> (String, Policy) {
@@ -456,22 +469,51 @@ where
self.cache.policy_docs.store(Arc::new(cache));
let ret = m
.into_iter()
.filter(|(_, v)| bucket_name.is_empty() || v.policy.match_resource(bucket_name))
let items: Vec<_> = m.into_iter().map(|(k, v)| (k, v.clone())).collect();
let futures: Vec<_> = items
.iter()
.map(|(_, policy_doc)| policy_doc.policy.match_resource(bucket_name))
.collect();
Ok(ret)
let results = join_all(futures).await;
let filtered = items
.into_iter()
.zip(results)
.filter_map(|((k, policy_doc), matches)| {
if bucket_name.is_empty() || matches {
Some((k, policy_doc))
} else {
None
}
})
.collect();
Ok(filtered)
}
pub async fn list_policy_docs_internal(&self, bucket_name: &str) -> Result<HashMap<String, PolicyDoc>> {
let ret = self
.cache
.policy_docs
.load()
let cache = self.cache.policy_docs.load();
let items: Vec<_> = cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
let futures: Vec<_> = items
.iter()
.filter(|(_, v)| bucket_name.is_empty() || v.policy.match_resource(bucket_name))
.map(|(k, v)| (k.clone(), v.clone()))
.map(|(_, policy_doc)| policy_doc.policy.match_resource(bucket_name))
.collect();
let results = join_all(futures).await;
let ret = items
.into_iter()
.zip(results)
.filter_map(|((k, policy_doc), matches)| {
if bucket_name.is_empty() || matches {
Some((k, policy_doc))
} else {
None
}
})
.collect();
Ok(ret)
@@ -1753,7 +1795,7 @@ fn filter_policies(cache: &Cache, policy_name: &str, bucket_name: &str) -> (Stri
}
if let Some(p) = cache.policy_docs.load().get(&policy) {
if bucket_name.is_empty() || p.policy.match_resource(bucket_name) {
if bucket_name.is_empty() || pollster::block_on(p.policy.match_resource(bucket_name)) {
policies.push(policy);
to_merge.push(p.policy.clone());
}

View File

@@ -755,10 +755,10 @@ impl<T: Store> IamSys<T> {
let (has_session_policy, is_allowed_sp) = is_allowed_by_session_policy(args);
if has_session_policy {
return is_allowed_sp && (is_owner || combined_policy.is_allowed(args));
return is_allowed_sp && (is_owner || combined_policy.is_allowed(args).await);
}
is_owner || combined_policy.is_allowed(args)
is_owner || combined_policy.is_allowed(args).await
}
pub async fn is_allowed_service_account(&self, args: &Args<'_>, parent_user: &str) -> bool {
@@ -814,15 +814,15 @@ impl<T: Store> IamSys<T> {
};
if sa_str == INHERITED_POLICY_TYPE {
return is_owner || combined_policy.is_allowed(&parent_args);
return is_owner || combined_policy.is_allowed(&parent_args).await;
}
let (has_session_policy, is_allowed_sp) = is_allowed_by_session_policy_for_service_account(args);
if has_session_policy {
return is_allowed_sp && (is_owner || combined_policy.is_allowed(&parent_args));
return is_allowed_sp && (is_owner || combined_policy.is_allowed(&parent_args).await);
}
is_owner || combined_policy.is_allowed(&parent_args)
is_owner || combined_policy.is_allowed(&parent_args).await
}
pub async fn get_combined_policy(&self, policies: &[String]) -> Policy {
@@ -857,7 +857,7 @@ impl<T: Store> IamSys<T> {
return false;
}
self.get_combined_policy(&policies).await.is_allowed(args)
self.get_combined_policy(&policies).await.is_allowed(args).await
}
}
@@ -883,7 +883,7 @@ fn is_allowed_by_session_policy(args: &Args<'_>) -> (bool, bool) {
let mut session_policy_args = args.clone();
session_policy_args.is_owner = false;
(has_session_policy, sub_policy.is_allowed(&session_policy_args))
(has_session_policy, pollster::block_on(sub_policy.is_allowed(&session_policy_args)))
}
fn is_allowed_by_session_policy_for_service_account(args: &Args<'_>) -> (bool, bool) {
@@ -909,7 +909,7 @@ fn is_allowed_by_session_policy_for_service_account(args: &Args<'_>) -> (bool, b
let mut session_policy_args = args.clone();
session_policy_args.is_owner = false;
(has_session_policy, sub_policy.is_allowed(&session_policy_args))
(has_session_policy, pollster::block_on(sub_policy.is_allowed(&session_policy_args)))
}
#[derive(Debug, Clone, Default)]

View File

@@ -45,8 +45,12 @@ regex = { workspace = true }
reqwest.workspace = true
chrono.workspace = true
tracing.workspace = true
lru.workspace = true
moka.workspace = true
async-trait.workspace = true
futures.workspace = true
pollster.workspace = true
[dev-dependencies]
pollster.workspace = true
test-case.workspace = true
temp-env = { workspace = true }

View File

@@ -38,29 +38,29 @@ pub struct Functions {
}
impl Functions {
pub fn evaluate(&self, values: &HashMap<String, Vec<String>>) -> bool {
self.evaluate_with_resolver(values, None)
pub async fn evaluate(&self, values: &HashMap<String, Vec<String>>) -> bool {
self.evaluate_with_resolver(values, None).await
}
pub fn evaluate_with_resolver(
pub async fn evaluate_with_resolver(
&self,
values: &HashMap<String, Vec<String>>,
resolver: Option<&dyn PolicyVariableResolver>,
) -> bool {
for c in self.for_any_value.iter() {
if !c.evaluate_with_resolver(false, values, resolver) {
if !c.evaluate_with_resolver(false, values, resolver).await {
return false;
}
}
for c in self.for_all_values.iter() {
if !c.evaluate_with_resolver(true, values, resolver) {
if !c.evaluate_with_resolver(true, values, resolver).await {
return false;
}
}
for c in self.for_normal.iter() {
if !c.evaluate_with_resolver(false, values, resolver) {
if !c.evaluate_with_resolver(false, values, resolver).await {
return false;
}
}

View File

@@ -107,7 +107,7 @@ impl Condition {
}
}
pub fn evaluate_with_resolver(
pub async fn evaluate_with_resolver(
&self,
for_all: bool,
values: &HashMap<String, Vec<String>>,
@@ -116,12 +116,12 @@ impl Condition {
use Condition::*;
let r = match self {
StringEquals(s) => s.evaluate_with_resolver(for_all, false, false, false, values, resolver),
StringNotEquals(s) => s.evaluate_with_resolver(for_all, false, false, true, values, resolver),
StringEqualsIgnoreCase(s) => s.evaluate_with_resolver(for_all, true, false, false, values, resolver),
StringNotEqualsIgnoreCase(s) => s.evaluate_with_resolver(for_all, true, false, true, values, resolver),
StringLike(s) => s.evaluate_with_resolver(for_all, false, true, false, values, resolver),
StringNotLike(s) => s.evaluate_with_resolver(for_all, false, true, true, values, resolver),
StringEquals(s) => s.evaluate_with_resolver(for_all, false, false, false, values, resolver).await,
StringNotEquals(s) => s.evaluate_with_resolver(for_all, false, false, true, values, resolver).await,
StringEqualsIgnoreCase(s) => s.evaluate_with_resolver(for_all, true, false, false, values, resolver).await,
StringNotEqualsIgnoreCase(s) => s.evaluate_with_resolver(for_all, true, false, true, values, resolver).await,
StringLike(s) => s.evaluate_with_resolver(for_all, false, true, false, values, resolver).await,
StringNotLike(s) => s.evaluate_with_resolver(for_all, false, true, true, values, resolver).await,
BinaryEquals(s) => s.evaluate(values),
IpAddress(s) => s.evaluate(values),
NotIpAddress(s) => s.evaluate(values),

View File

@@ -21,16 +21,17 @@ use std::{borrow::Cow, collections::HashMap};
use crate::policy::function::func::FuncKeyValue;
use crate::policy::utils::wildcard;
use futures::future;
use serde::{Deserialize, Deserializer, Serialize, de, ser::SerializeSeq};
use super::{func::InnerFunc, key_name::KeyName};
use crate::policy::variables::{PolicyVariableResolver, resolve_aws_variables};
use crate::policy::variables::PolicyVariableResolver;
pub type StringFunc = InnerFunc<StringFuncValue>;
impl StringFunc {
#[allow(clippy::too_many_arguments)]
pub(crate) fn evaluate_with_resolver(
pub(crate) async fn evaluate_with_resolver(
&self,
for_all: bool,
ignore_case: bool,
@@ -41,9 +42,9 @@ impl StringFunc {
) -> bool {
for inner in self.0.iter() {
let result = if like {
inner.eval_like(for_all, values, resolver) ^ negate
inner.eval_like(for_all, values, resolver).await ^ negate
} else {
inner.eval(for_all, ignore_case, values, resolver) ^ negate
inner.eval(for_all, ignore_case, values, resolver).await ^ negate
};
if !result {
@@ -56,7 +57,7 @@ impl StringFunc {
}
impl FuncKeyValue<StringFuncValue> {
fn eval(
async fn eval(
&self,
for_all: bool,
ignore_case: bool,
@@ -79,17 +80,18 @@ impl FuncKeyValue<StringFuncValue> {
})
.unwrap_or_default();
let fvalues = self
.values
.0
.iter()
.flat_map(|c| {
if let Some(res) = resolver {
resolve_aws_variables(c, res)
} else {
vec![c.to_string()]
}
})
let resolved_values: Vec<Vec<String>> = futures::future::join_all(self.values.0.iter().map(|c| async {
if let Some(res) = resolver {
super::super::variables::resolve_aws_variables(c, res).await
} else {
vec![c.to_string()]
}
}))
.await;
let fvalues = resolved_values
.into_iter()
.flatten()
.map(|resolved_c| {
let mut c = Cow::from(resolved_c);
for key in KeyName::COMMON_KEYS {
@@ -113,7 +115,7 @@ impl FuncKeyValue<StringFuncValue> {
}
}
fn eval_like(
async fn eval_like(
&self,
for_all: bool,
values: &HashMap<String, Vec<String>>,
@@ -121,17 +123,22 @@ impl FuncKeyValue<StringFuncValue> {
) -> bool {
if let Some(rvalues) = values.get(self.key.name().as_str()) {
for v in rvalues.iter() {
let matched = self
let resolved_futures: Vec<_> = self
.values
.0
.iter()
.flat_map(|c| {
.map(|c| async {
if let Some(res) = resolver {
resolve_aws_variables(c, res)
super::super::variables::resolve_aws_variables(c, res).await
} else {
vec![c.to_string()]
}
})
.collect();
let resolved_values = future::join_all(resolved_futures).await;
let matched = resolved_values
.into_iter()
.flatten()
.map(|resolved_c| {
let mut c = Cow::from(resolved_c);
for key in KeyName::COMMON_KEYS {
@@ -242,6 +249,7 @@ mod tests {
key_name::AwsKeyName::*,
key_name::KeyName::{self, *},
};
use std::collections::HashMap;
use crate::policy::function::key_name::S3KeyName::S3LocationConstraint;
use test_case::test_case;
@@ -303,17 +311,13 @@ mod tests {
negate: bool,
values: Vec<(&str, Vec<&str>)>,
) -> bool {
let result = s.eval(
for_all,
ignore_case,
&values
.into_iter()
.map(|(k, v)| (k.to_owned(), v.into_iter().map(ToOwned::to_owned).collect::<Vec<String>>()))
.collect(),
None,
);
let map: HashMap<String, Vec<String>> = values
.into_iter()
.map(|(k, v)| (k.to_owned(), v.into_iter().map(ToOwned::to_owned).collect::<Vec<String>>()))
.collect();
let result = s.eval(for_all, ignore_case, &map, None);
result ^ negate
pollster::block_on(result) ^ negate
}
#[test_case(new_fkv("s3:x-amz-copy-source", vec!["mybucket/myobject"]), false, vec![("x-amz-copy-source", vec!["mybucket/myobject"])] => true ; "1")]
@@ -409,16 +413,13 @@ mod tests {
}
fn test_eval_like(s: FuncKeyValue<StringFuncValue>, for_all: bool, negate: bool, values: Vec<(&str, Vec<&str>)>) -> bool {
let result = s.eval_like(
for_all,
&values
.into_iter()
.map(|(k, v)| (k.to_owned(), v.into_iter().map(ToOwned::to_owned).collect::<Vec<String>>()))
.collect(),
None,
);
let map: HashMap<String, Vec<String>> = values
.into_iter()
.map(|(k, v)| (k.to_owned(), v.into_iter().map(ToOwned::to_owned).collect::<Vec<String>>()))
.collect();
let result = s.eval_like(for_all, &map, None);
result ^ negate
pollster::block_on(result) ^ negate
}
#[test_case(new_fkv("s3:x-amz-copy-source", vec!["mybucket/myobject"]), false, vec![("x-amz-copy-source", vec!["mybucket/myobject"])] => true ; "1")]

View File

@@ -62,9 +62,9 @@ pub struct Policy {
}
impl Policy {
pub fn is_allowed(&self, args: &Args) -> bool {
pub async fn is_allowed(&self, args: &Args<'_>) -> bool {
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Deny)) {
if !statement.is_allowed(args) {
if !statement.is_allowed(args).await {
return false;
}
}
@@ -74,7 +74,7 @@ impl Policy {
}
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Allow)) {
if statement.is_allowed(args) {
if statement.is_allowed(args).await {
return true;
}
}
@@ -82,9 +82,9 @@ impl Policy {
false
}
pub fn match_resource(&self, resource: &str) -> bool {
pub async fn match_resource(&self, resource: &str) -> bool {
for statement in self.statements.iter() {
if statement.resources.match_resource(resource) {
if statement.resources.match_resource(resource).await {
return true;
}
}
@@ -188,9 +188,9 @@ pub struct BucketPolicy {
}
impl BucketPolicy {
pub fn is_allowed(&self, args: &BucketPolicyArgs) -> bool {
pub async fn is_allowed(&self, args: &BucketPolicyArgs<'_>) -> bool {
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Deny)) {
if !statement.is_allowed(args) {
if !statement.is_allowed(args).await {
return false;
}
}
@@ -200,7 +200,7 @@ impl BucketPolicy {
}
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Allow)) {
if statement.is_allowed(args) {
if statement.is_allowed(args).await {
return true;
}
}
@@ -577,8 +577,8 @@ mod test {
deny_only: false,
};
assert!(policy.is_allowed(&args1));
assert!(!policy.is_allowed(&args2));
assert!(pollster::block_on(policy.is_allowed(&args1)));
assert!(!pollster::block_on(policy.is_allowed(&args2)));
Ok(())
}
@@ -631,8 +631,8 @@ mod test {
deny_only: false,
};
assert!(policy.is_allowed(&args1));
assert!(!policy.is_allowed(&args2));
assert!(pollster::block_on(policy.is_allowed(&args1)));
assert!(!pollster::block_on(policy.is_allowed(&args2)));
Ok(())
}
@@ -686,8 +686,8 @@ mod test {
deny_only: false,
};
assert!(policy.is_allowed(&args1));
assert!(!policy.is_allowed(&args2));
assert!(pollster::block_on(policy.is_allowed(&args1)));
assert!(!pollster::block_on(policy.is_allowed(&args2)));
Ok(())
}
@@ -741,8 +741,8 @@ mod test {
deny_only: false,
};
assert!(policy.is_allowed(&args1));
assert!(!policy.is_allowed(&args2));
assert!(pollster::block_on(policy.is_allowed(&args1)));
assert!(!pollster::block_on(policy.is_allowed(&args2)));
Ok(())
}
@@ -798,7 +798,7 @@ mod test {
};
// Either user1 or user2 should be allowed
assert!(policy.is_allowed(&args1) || policy.is_allowed(&args2));
assert!(pollster::block_on(policy.is_allowed(&args1)) || pollster::block_on(policy.is_allowed(&args2)));
Ok(())
}

View File

@@ -24,25 +24,25 @@ use super::{
Error as IamError, Validator,
function::key_name::KeyName,
utils::{path, wildcard},
variables::{PolicyVariableResolver, resolve_aws_variables},
variables::PolicyVariableResolver,
};
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct ResourceSet(pub HashSet<Resource>);
impl ResourceSet {
pub fn is_match(&self, resource: &str, conditions: &HashMap<String, Vec<String>>) -> bool {
self.is_match_with_resolver(resource, conditions, None)
pub async fn is_match(&self, resource: &str, conditions: &HashMap<String, Vec<String>>) -> bool {
self.is_match_with_resolver(resource, conditions, None).await
}
pub fn is_match_with_resolver(
pub async fn is_match_with_resolver(
&self,
resource: &str,
conditions: &HashMap<String, Vec<String>>,
resolver: Option<&dyn PolicyVariableResolver>,
) -> bool {
for re in self.0.iter() {
if re.is_match_with_resolver(resource, conditions, resolver) {
if re.is_match_with_resolver(resource, conditions, resolver).await {
return true;
}
}
@@ -50,9 +50,9 @@ impl ResourceSet {
false
}
pub fn match_resource(&self, resource: &str) -> bool {
pub async fn match_resource(&self, resource: &str) -> bool {
for re in self.0.iter() {
if re.match_resource(resource) {
if re.match_resource(resource).await {
return true;
}
}
@@ -95,11 +95,11 @@ pub enum Resource {
impl Resource {
pub const S3_PREFIX: &'static str = "arn:aws:s3:::";
pub fn is_match(&self, resource: &str, conditions: &HashMap<String, Vec<String>>) -> bool {
self.is_match_with_resolver(resource, conditions, None)
pub async fn is_match(&self, resource: &str, conditions: &HashMap<String, Vec<String>>) -> bool {
self.is_match_with_resolver(resource, conditions, None).await
}
pub fn is_match_with_resolver(
pub async fn is_match_with_resolver(
&self,
resource: &str,
conditions: &HashMap<String, Vec<String>>,
@@ -111,7 +111,7 @@ impl Resource {
};
let patterns = if let Some(res) = resolver {
resolve_aws_variables(&pattern, res)
super::variables::resolve_aws_variables(&pattern, res).await
} else {
vec![pattern.clone()]
};
@@ -143,8 +143,8 @@ impl Resource {
false
}
pub fn match_resource(&self, resource: &str) -> bool {
self.is_match(resource, &HashMap::new())
pub async fn match_resource(&self, resource: &str) -> bool {
self.is_match(resource, &HashMap::new()).await
}
}
@@ -232,6 +232,7 @@ mod tests {
#[test_case("arn:aws:s3:::mybucket","mybucket/myobject" => false; "15")]
fn test_resource_is_match(resource: &str, object: &str) -> bool {
let resource: Resource = resource.try_into().unwrap();
resource.is_match(object, &HashMap::new())
pollster::block_on(resource.is_match(object, &HashMap::new()))
}
}

View File

@@ -69,7 +69,7 @@ impl Statement {
false
}
pub fn is_allowed(&self, args: &Args) -> bool {
pub async fn is_allowed(&self, args: &Args<'_>) -> bool {
let mut context = VariableContext::new();
context.claims = Some(args.claims.clone());
context.conditions = args.conditions.clone();
@@ -104,19 +104,20 @@ impl Statement {
}
if self.is_kms() && (resource == "/" || self.resources.is_empty()) {
break 'c self.conditions.evaluate_with_resolver(args.conditions, Some(&resolver));
break 'c self.conditions.evaluate_with_resolver(args.conditions, Some(&resolver)).await;
}
if !self
.resources
.is_match_with_resolver(&resource, args.conditions, Some(&resolver))
.await
&& !self.is_admin()
&& !self.is_sts()
{
break 'c false;
}
self.conditions.evaluate_with_resolver(args.conditions, Some(&resolver))
self.conditions.evaluate_with_resolver(args.conditions, Some(&resolver)).await
};
self.effect.is_allowed(check)
@@ -178,7 +179,7 @@ pub struct BPStatement {
}
impl BPStatement {
pub fn is_allowed(&self, args: &BucketPolicyArgs) -> bool {
pub async fn is_allowed(&self, args: &BucketPolicyArgs<'_>) -> bool {
let check = 'c: {
if !self.principal.is_match(args.account) {
break 'c false;
@@ -199,15 +200,15 @@ impl BPStatement {
resource.push('/');
}
if !self.resources.is_empty() && !self.resources.is_match(&resource, args.conditions) {
if !self.resources.is_empty() && !self.resources.is_match(&resource, args.conditions).await {
break 'c false;
}
if !self.not_resources.is_empty() && self.not_resources.is_match(&resource, args.conditions) {
if !self.not_resources.is_empty() && self.not_resources.is_match(&resource, args.conditions).await {
break 'c false;
}
self.conditions.evaluate(args.conditions)
self.conditions.evaluate(args.conditions).await
};
self.effect.is_allowed(check)

View File

@@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use lru::LruCache;
use async_trait::async_trait;
use moka::future::Cache;
use serde_json::Value;
use std::cell::RefCell;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
use std::future::Future;
use std::time::Duration;
use time::OffsetDateTime;
/// Context information for variable resolution
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct VariableContext {
pub is_https: bool,
pub source_ip: Option<String>,
@@ -35,109 +35,75 @@ pub struct VariableContext {
impl VariableContext {
pub fn new() -> Self {
Self {
is_https: false,
source_ip: None,
account_id: None,
region: None,
username: None,
claims: None,
conditions: HashMap::new(),
custom_variables: HashMap::new(),
}
Self::default()
}
}
impl Default for VariableContext {
fn default() -> Self {
Self::new()
}
}
/// Variable resolution cache
struct CachedVariable {
value: String,
timestamp: Instant,
is_dynamic: bool,
}
pub struct VariableResolverCache {
/// LRU cache storing resolved results
cache: LruCache<String, CachedVariable>,
/// Cache expiration time
ttl: Duration,
/// Moka cache storing resolved results
cache: Cache<String, String>,
}
impl VariableResolverCache {
pub fn new(capacity: usize, ttl_seconds: u64) -> Self {
Self {
cache: LruCache::new(usize::from(NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::new(100).unwrap()))),
ttl: Duration::from_secs(ttl_seconds),
}
let cache = Cache::builder()
.max_capacity(capacity as u64)
.time_to_live(Duration::from_secs(ttl_seconds))
.build();
Self { cache }
}
pub fn get(&mut self, key: &str) -> Option<String> {
if let Some(cached) = self.cache.get(key) {
// Check if expired
if !cached.is_dynamic && cached.timestamp.elapsed() < self.ttl {
return Some(cached.value.clone());
}
}
None
pub async fn get(&self, key: &str) -> Option<String> {
self.cache.get(key).await
}
pub fn put(&mut self, key: String, value: String, is_dynamic: bool) {
let cached = CachedVariable {
value,
timestamp: Instant::now(),
is_dynamic,
};
self.cache.put(key, cached);
pub async fn put(&self, key: String, value: String) {
self.cache.insert(key, value).await;
}
pub fn clear(&mut self) {
self.cache.clear();
pub async fn clear(&self) {
self.cache.invalidate_all();
}
}
/// Cached dynamic AWS variable resolver
pub struct CachedAwsVariableResolver {
inner: VariableResolver,
cache: RefCell<VariableResolverCache>,
cache: VariableResolverCache,
}
impl CachedAwsVariableResolver {
pub fn new(context: VariableContext) -> Self {
Self {
inner: VariableResolver::new(context),
cache: RefCell::new(VariableResolverCache::new(100, 300)), // 100 entries, 5 minutes expiration
cache: VariableResolverCache::new(100, 300), // 100 entries, 5 minutes expiration
}
}
pub fn is_dynamic(&self, variable_name: &str) -> bool {
self.inner.is_dynamic(variable_name)
}
}
#[async_trait]
impl PolicyVariableResolver for CachedAwsVariableResolver {
fn resolve(&self, variable_name: &str) -> Option<String> {
async fn resolve(&self, variable_name: &str) -> Option<String> {
if self.is_dynamic(variable_name) {
return self.inner.resolve(variable_name);
return self.inner.resolve(variable_name).await;
}
if let Some(cached) = self.cache.borrow_mut().get(variable_name) {
if let Some(cached) = self.cache.get(variable_name).await {
return Some(cached);
}
let value = self.inner.resolve(variable_name)?;
self.cache.borrow_mut().put(variable_name.to_string(), value.clone(), false);
let value = self.inner.resolve(variable_name).await?;
self.cache.put(variable_name.to_string(), value.clone()).await;
Some(value)
}
fn resolve_multiple(&self, variable_name: &str) -> Option<Vec<String>> {
if self.is_dynamic(variable_name) {
return self.inner.resolve_multiple(variable_name);
}
self.inner.resolve_multiple(variable_name)
async fn resolve_multiple(&self, variable_name: &str) -> Option<Vec<String>> {
self.inner.resolve_multiple(variable_name).await
}
fn is_dynamic(&self, variable_name: &str) -> bool {
@@ -146,10 +112,11 @@ impl PolicyVariableResolver for CachedAwsVariableResolver {
}
/// Policy variable resolver trait
pub trait PolicyVariableResolver {
fn resolve(&self, variable_name: &str) -> Option<String>;
fn resolve_multiple(&self, variable_name: &str) -> Option<Vec<String>> {
self.resolve(variable_name).map(|s| vec![s])
#[async_trait]
pub trait PolicyVariableResolver: Sync {
async fn resolve(&self, variable_name: &str) -> Option<String>;
async fn resolve_multiple(&self, variable_name: &str) -> Option<Vec<String>> {
self.resolve(variable_name).await.map(|s| vec![s])
}
fn is_dynamic(&self, variable_name: &str) -> bool;
}
@@ -192,18 +159,9 @@ impl VariableResolver {
}
fn resolve_userid(&self) -> Option<String> {
// Check claims for sub or parent
if let Some(claims) = &self.context.claims {
if let Some(sub) = claims.get("sub").and_then(|v| v.as_str()) {
return Some(sub.to_string());
}
if let Some(parent) = claims.get("parent").and_then(|v| v.as_str()) {
return Some(parent.to_string());
}
}
None
self.get_claim_as_strings("sub")
.or_else(|| self.get_claim_as_strings("parent"))
.and_then(|mut vec| vec.pop()) // 取第一个值,保持原有逻辑
}
fn resolve_principal_type(&self) -> String {
@@ -252,8 +210,9 @@ impl VariableResolver {
}
}
#[async_trait]
impl PolicyVariableResolver for VariableResolver {
fn resolve(&self, variable_name: &str) -> Option<String> {
async fn resolve(&self, variable_name: &str) -> Option<String> {
match variable_name {
"aws:username" => self.resolve_username(),
"aws:userid" => self.resolve_userid(),
@@ -275,22 +234,15 @@ impl PolicyVariableResolver for VariableResolver {
}
}
fn resolve_multiple(&self, variable_name: &str) -> Option<Vec<String>> {
async fn resolve_multiple(&self, variable_name: &str) -> Option<Vec<String>> {
match variable_name {
"aws:username" => {
// Check context.username
if let Some(ref username) = self.context.username {
Some(vec![username.clone()])
} else {
None
}
}
"aws:userid" => {
// Check claims for sub or parent
self.get_claim_as_strings("sub")
.or_else(|| self.get_claim_as_strings("parent"))
}
_ => self.resolve(variable_name).map(|s| vec![s]),
"aws:username" => self.resolve_username().map(|s| vec![s]),
"aws:userid" => self
.get_claim_as_strings("sub")
.or_else(|| self.get_claim_as_strings("parent")),
_ => self.resolve(variable_name).await.map(|s| vec![s]),
}
}
@@ -299,8 +251,7 @@ impl PolicyVariableResolver for VariableResolver {
}
}
/// Dynamically resolve AWS variables
pub fn resolve_aws_variables(pattern: &str, resolver: &dyn PolicyVariableResolver) -> Vec<String> {
pub async fn resolve_aws_variables(pattern: &str, resolver: &dyn PolicyVariableResolver) -> Vec<String> {
let mut results = vec![pattern.to_string()];
let mut changed = true;
@@ -313,7 +264,7 @@ pub fn resolve_aws_variables(pattern: &str, resolver: &dyn PolicyVariableResolve
let mut new_results = Vec::new();
for result in &results {
let resolved = resolve_single_pass(result, resolver);
let resolved = resolve_single_pass(result, resolver).await;
if resolved.len() > 1 || (resolved.len() == 1 && &resolved[0] != result) {
changed = true;
}
@@ -333,8 +284,16 @@ pub fn resolve_aws_variables(pattern: &str, resolver: &dyn PolicyVariableResolve
results
}
// Need to box the future to avoid infinite size due to recursion
fn resolve_aws_variables_boxed<'a>(
pattern: &'a str,
resolver: &'a dyn PolicyVariableResolver,
) -> std::pin::Pin<Box<dyn Future<Output = Vec<String>> + Send + 'a>> {
Box::pin(resolve_aws_variables(pattern, resolver))
}
/// Single pass resolution of variables in a string
fn resolve_single_pass(pattern: &str, resolver: &dyn PolicyVariableResolver) -> Vec<String> {
async fn resolve_single_pass(pattern: &str, resolver: &dyn PolicyVariableResolver) -> Vec<String> {
// Find all ${...} format variables
let mut results = vec![pattern.to_string()];
@@ -370,7 +329,7 @@ fn resolve_single_pass(pattern: &str, resolver: &dyn PolicyVariableResolver) ->
if var_name.contains("${") {
// For nested variables like ${${a}-${b}}, we need to resolve the inner variables first
// Then use the resolved result as a new variable to resolve
let resolved_inner = resolve_aws_variables(var_name, resolver);
let resolved_inner = resolve_aws_variables_boxed(var_name, resolver).await;
let mut new_results = Vec::new();
for resolved_var_name in resolved_inner {
@@ -390,7 +349,7 @@ fn resolve_single_pass(pattern: &str, resolver: &dyn PolicyVariableResolver) ->
}
} else {
// Regular variable resolution
if let Some(values) = resolver.resolve_multiple(var_name) {
if let Some(values) = resolver.resolve_multiple(var_name).await {
if !values.is_empty() {
// If there are multiple values, create a new result for each value
let mut new_results = Vec::new();
@@ -440,19 +399,18 @@ mod tests {
use serde_json::Value;
use std::collections::HashMap;
#[test]
fn test_resolve_aws_variables_with_username() {
#[tokio::test]
async fn test_resolve_aws_variables_with_username() {
let mut context = VariableContext::new();
context.username = Some("testuser".to_string());
let resolver = VariableResolver::new(context);
let result = resolve_aws_variables("${aws:username}-bucket", &resolver);
let result = resolve_aws_variables("${aws:username}-bucket", &resolver).await;
assert_eq!(result, vec!["testuser-bucket".to_string()]);
}
#[test]
fn test_resolve_aws_variables_with_userid() {
#[tokio::test]
async fn test_resolve_aws_variables_with_userid() {
let mut claims = HashMap::new();
claims.insert("sub".to_string(), Value::String("AIDACKCEVSQ6C2EXAMPLE".to_string()));
@@ -460,13 +418,12 @@ mod tests {
context.claims = Some(claims);
let resolver = VariableResolver::new(context);
let result = resolve_aws_variables("${aws:userid}-bucket", &resolver);
let result = resolve_aws_variables("${aws:userid}-bucket", &resolver).await;
assert_eq!(result, vec!["AIDACKCEVSQ6C2EXAMPLE-bucket".to_string()]);
}
#[test]
fn test_resolve_aws_variables_with_multiple_variables() {
#[tokio::test]
async fn test_resolve_aws_variables_with_multiple_variables() {
let mut claims = HashMap::new();
claims.insert("sub".to_string(), Value::String("AIDACKCEVSQ6C2EXAMPLE".to_string()));
@@ -475,17 +432,34 @@ mod tests {
context.username = Some("testuser".to_string());
let resolver = VariableResolver::new(context);
let result = resolve_aws_variables("${aws:username}-${aws:userid}-bucket", &resolver);
let result = resolve_aws_variables("${aws:username}-${aws:userid}-bucket", &resolver).await;
assert_eq!(result, vec!["testuser-AIDACKCEVSQ6C2EXAMPLE-bucket".to_string()]);
}
#[test]
fn test_resolve_aws_variables_no_variables() {
#[tokio::test]
async fn test_resolve_aws_variables_no_variables() {
let context = VariableContext::new();
let resolver = VariableResolver::new(context);
let result = resolve_aws_variables("test-bucket", &resolver);
let result = resolve_aws_variables("test-bucket", &resolver).await;
assert_eq!(result, vec!["test-bucket".to_string()]);
}
#[tokio::test]
async fn test_cached_aws_variable_resolver_dynamic_variables() {
let context = VariableContext::new();
let cached_resolver = CachedAwsVariableResolver::new(context);
// Dynamic variables should not be cached
let result1 = resolve_aws_variables("${aws:EpochTime}-bucket", &cached_resolver).await;
// Add a delay of 1 second to ensure different timestamps
tokio::time::sleep(Duration::from_secs(1)).await;
let result2 = resolve_aws_variables("${aws:EpochTime}-bucket", &cached_resolver).await;
// Both results should be different (different timestamps)
assert_ne!(result1, result2);
}
}

View File

@@ -612,7 +612,7 @@ struct ArgsBuilder {
"24"
)]
fn policy_is_allowed(policy: Policy, args: ArgsBuilder) -> bool {
policy.is_allowed(&Args {
pollster::block_on(policy.is_allowed(&Args {
account: &args.account,
groups: &{
if args.groups.is_empty() {
@@ -628,5 +628,5 @@ fn policy_is_allowed(policy: Policy, args: ArgsBuilder) -> bool {
object: &args.object,
claims: &args.claims,
deny_only: args.deny_only,
})
}))
}