From aae768f4462b4687f34b6e061b26363bfc9306be Mon Sep 17 00:00:00 2001 From: gatewayJ <835269233@qq.com> Date: Thu, 16 Oct 2025 10:35:26 +0800 Subject: [PATCH] feat: Simple OPA support (#644) * opa-feature * Update crates/policy/src/policy/opa.rs * add the content related to 'Copyright' --------- Co-authored-by: root Co-authored-by: houseme Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .vscode/launch.json | 7 +- Cargo.lock | 6 + Cargo.toml | 1 + crates/config/Cargo.toml | 2 +- crates/config/src/lib.rs | 2 + crates/config/src/opa/mod.rs | 21 +++ crates/iam/Cargo.toml | 1 + crates/iam/src/sys.rs | 42 ++++- crates/policy/Cargo.toml | 5 + crates/policy/src/policy.rs | 1 + crates/policy/src/policy/opa.rs | 288 +++++++++++++++++++++++++++++ crates/policy/src/policy/policy.rs | 1 - crates/utils/src/certs.rs | 2 +- rustfs/src/storage/ecfs.rs | 22 ++- scripts/run.sh | 3 + 15 files changed, 391 insertions(+), 13 deletions(-) create mode 100644 crates/config/src/opa/mod.rs create mode 100644 crates/policy/src/policy/opa.rs diff --git a/.vscode/launch.json b/.vscode/launch.json index 3e78b81d..0fa11923 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -20,7 +20,10 @@ } }, "env": { - "RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=info" + "RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=debug", + "RUSTFS_SKIP_BACKGROUND_TASK": "on", + // "RUSTFS_POLICY_PLUGIN_URL":"http://localhost:8181/v1/data/rustfs/authz/allow", + // "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN":"your-opa-token" }, "args": [ "--access-key", @@ -29,6 +32,8 @@ "rustfsadmin", "--address", "0.0.0.0:9010", + "--server-domains", + "127.0.0.1:9010", "./target/volume/test{1...4}" ], "cwd": "${workspaceFolder}" diff --git a/Cargo.lock b/Cargo.lock index 5e603b4f..142c9063 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6662,6 +6662,7 @@ dependencies = [ "futures", "jsonwebtoken", "rand 0.9.2", + "rustfs-config", "rustfs-crypto", "rustfs-ecstore", "rustfs-madmin", @@ -6823,18 +6824,23 @@ name = "rustfs-policy" version = "0.0.5" dependencies = [ "base64-simd", + "chrono", "ipnetwork", "jsonwebtoken", "rand 0.9.2", "regex", + "reqwest", + "rustfs-config", "rustfs-crypto", "serde", "serde_json", "strum 0.27.2", + "temp-env", "test-case", "thiserror 2.0.17", "time", "tokio", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d13d3b31..38367992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "crates/madmin", # Management dashboard and admin API interface "crates/notify", # Notification system for events "crates/obs", # Observability utilities + "crates/policy",# Policy management "crates/protos", # Protocol buffer definitions "crates/rio", # Rust I/O utilities and abstractions "crates/targets", # Target-specific configurations and utilities diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index 7cf5ef86..3b2dcf9c 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -36,4 +36,4 @@ audit = ["dep:const-str", "constants"] constants = ["dep:const-str"] notify = ["dep:const-str", "constants"] observability = ["constants"] - +opa = ["constants"] diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index b4291879..c5b14581 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -32,3 +32,5 @@ pub mod audit; pub mod notify; #[cfg(feature = "observability")] pub mod observability; +#[cfg(feature = "opa")] +pub mod opa; diff --git a/crates/config/src/opa/mod.rs b/crates/config/src/opa/mod.rs new file mode 100644 index 00000000..26fa1472 --- /dev/null +++ b/crates/config/src/opa/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//opa env vars +pub const ENV_POLICY_PLUGIN_OPA_URL: &str = "RUSTFS_POLICY_PLUGIN_URL"; +pub const ENV_POLICY_PLUGIN_AUTH_TOKEN: &str = "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN"; + +pub const ENV_POLICY_PLUGIN_KEYS: &[&str] = &[ENV_POLICY_PLUGIN_OPA_URL, ENV_POLICY_PLUGIN_AUTH_TOKEN]; + +pub const POLICY_PLUGIN_SUB_SYS: &str = "policy_plugin"; diff --git a/crates/iam/Cargo.toml b/crates/iam/Cargo.toml index e119fe34..15e3c0e2 100644 --- a/crates/iam/Cargo.toml +++ b/crates/iam/Cargo.toml @@ -34,6 +34,7 @@ time = { workspace = true, features = ["serde-human-readable"] } serde = { workspace = true, features = ["derive", "rc"] } rustfs-ecstore = { workspace = true } rustfs-policy.workspace = true +rustfs-config.workspace = true serde_json.workspace = true async-trait.workspace = true thiserror.workspace = true diff --git a/crates/iam/src/sys.rs b/crates/iam/src/sys.rs index 8aa0bc30..4de85881 100644 --- a/crates/iam/src/sys.rs +++ b/crates/iam/src/sys.rs @@ -35,14 +35,17 @@ use rustfs_policy::auth::{ is_access_key_valid, is_secret_key_valid, }; use rustfs_policy::policy::Args; +use rustfs_policy::policy::opa; use rustfs_policy::policy::{EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, iam_policy_claim_name_sa}; use rustfs_utils::crypto::{base64_decode, base64_encode}; use serde_json::Value; use serde_json::json; use std::collections::HashMap; use std::sync::Arc; +use std::sync::OnceLock; use time::OffsetDateTime; -use tracing::warn; +use tokio::sync::RwLock; +use tracing::{error, info, warn}; pub const MAX_SVCSESSION_POLICY_SIZE: usize = 4096; @@ -53,6 +56,12 @@ pub const POLICYNAME: &str = "policy"; pub const SESSION_POLICY_NAME: &str = "sessionPolicy"; pub const SESSION_POLICY_NAME_EXTRACTED: &str = "sessionPolicy-extracted"; +static POLICY_PLUGIN_CLIENT: OnceLock>>> = OnceLock::new(); + +fn get_policy_plugin_client() -> Arc>> { + POLICY_PLUGIN_CLIENT.get_or_init(|| Arc::new(RwLock::new(None))).clone() +} + pub struct IamSys { store: Arc>, roles_map: HashMap, @@ -60,6 +69,20 @@ pub struct IamSys { impl IamSys { pub fn new(store: Arc>) -> Self { + tokio::spawn(async move { + match opa::lookup_config().await { + Ok(conf) => { + if conf.enable() { + Self::set_policy_plugin_client(opa::AuthZPlugin::new(conf)).await; + info!("OPA plugin enabled"); + } + } + Err(e) => { + error!("Error loading OPA configuration err:{}", e); + } + }; + }); + Self { store, roles_map: HashMap::new(), @@ -69,6 +92,18 @@ impl IamSys { self.store.api.has_watcher() } + pub async fn set_policy_plugin_client(client: rustfs_policy::policy::opa::AuthZPlugin) { + let policy_plugin_client = get_policy_plugin_client(); + let mut guard = policy_plugin_client.write().await; + *guard = Some(client); + } + + pub async fn get_policy_plugin_client() -> Option { + let policy_plugin_client = get_policy_plugin_client(); + let guard = policy_plugin_client.read().await; + guard.clone() + } + pub async fn load_group(&self, name: &str) -> Result<()> { self.store.group_notification_handler(name).await } @@ -766,6 +801,11 @@ impl IamSys { return true; } + let opa_enable = Self::get_policy_plugin_client().await; + if let Some(opa_enable) = opa_enable { + return opa_enable.is_allowed(args).await; + } + let Ok((is_temp, parent_user)) = self.is_temp_user(args.account).await else { return false }; if is_temp { diff --git a/crates/policy/Cargo.toml b/crates/policy/Cargo.toml index 2601465a..ae74bd8d 100644 --- a/crates/policy/Cargo.toml +++ b/crates/policy/Cargo.toml @@ -29,6 +29,7 @@ documentation = "https://docs.rs/rustfs-policy/latest/rustfs_policy/" workspace = true [dependencies] +rustfs-config = { workspace = true, features = ["constants","opa"] } tokio.workspace = true time = { workspace = true, features = ["serde-human-readable"] } serde = { workspace = true, features = ["derive", "rc"] } @@ -41,6 +42,10 @@ rand.workspace = true base64-simd = { workspace = true } jsonwebtoken = { workspace = true } regex = { workspace = true } +reqwest.workspace = true +chrono.workspace = true +tracing.workspace = true [dev-dependencies] test-case.workspace = true +temp-env = { workspace = true } diff --git a/crates/policy/src/policy.rs b/crates/policy/src/policy.rs index 3dce8d50..c6b35332 100644 --- a/crates/policy/src/policy.rs +++ b/crates/policy/src/policy.rs @@ -17,6 +17,7 @@ mod doc; mod effect; mod function; mod id; +pub mod opa; #[allow(clippy::module_inception)] mod policy; mod principal; diff --git a/crates/policy/src/policy/opa.rs b/crates/policy/src/policy/opa.rs new file mode 100644 index 00000000..e222270f --- /dev/null +++ b/crates/policy/src/policy/opa.rs @@ -0,0 +1,288 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::policy::Args as PArgs; +use rustfs_config::{ENV_PREFIX, opa::*}; +use serde::Deserialize; +use serde_json::json; +use std::{collections::HashMap, env, time::Duration}; +use tracing::{error, info}; + +#[derive(Debug, Clone, Default)] +pub struct Args { + pub url: String, + pub auth_token: String, +} +impl Args { + pub fn enable(&self) -> bool { + !self.url.is_empty() + } +} + +#[derive(Debug, Clone)] +pub struct AuthZPlugin { + client: reqwest::Client, + args: Args, +} + +fn check() -> Result<(), String> { + let env_list = env::vars(); + let mut candidate = HashMap::new(); + let prefix = format!("{}{}", ENV_PREFIX, POLICY_PLUGIN_SUB_SYS).to_uppercase(); + for (key, value) in env_list { + if key.starts_with(&prefix) { + candidate.insert(key.to_string(), value); + } + } + + //check required env vars + if candidate.remove(ENV_POLICY_PLUGIN_OPA_URL).is_none() { + return Err(format!("Missing required env var: {}", ENV_POLICY_PLUGIN_OPA_URL)); + } + + // check optional env vars + candidate.remove(ENV_POLICY_PLUGIN_AUTH_TOKEN); + if !candidate.is_empty() { + return Err(format!("Invalid env vars: {:?}", candidate)); + } + Ok(()) +} +async fn validate(config: &Args) -> Result<(), String> { + let client = reqwest::Client::new(); + + match client.post(&config.url).send().await { + Ok(resp) => { + match resp.status() { + reqwest::StatusCode::OK => { + info!("OPA is ready to accept requests."); + } + _ => { + return Err(format!("OPA returned an error: {}", resp.status())); + } + }; + } + Err(err) => { + return Err(format!("Error connecting to OPA: {}", err)); + } + }; + Ok(()) +} + +pub async fn lookup_config() -> Result { + let args = Args::default(); + + let get_cfg = + |cfg: &str| -> Result { env::var(cfg).map_err(|e| format!("Error getting env var {}: {:?}", cfg, e)) }; + + let url = get_cfg(ENV_POLICY_PLUGIN_OPA_URL); + if url.is_err() { + info!("OPA is not enabled."); + return Ok(args); + } + check()?; + let args = Args { + url: url.ok().unwrap(), + auth_token: get_cfg(ENV_POLICY_PLUGIN_AUTH_TOKEN).unwrap_or_default(), + }; + validate(&args).await?; + Ok(args) +} + +impl AuthZPlugin { + pub fn new(config: Args) -> Self { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .connect_timeout(Duration::from_secs(1)) + .pool_max_idle_per_host(10) + .pool_idle_timeout(Some(Duration::from_secs(60))) + .tcp_keepalive(Some(Duration::from_secs(30))) + .tcp_nodelay(true) + .http2_keep_alive_interval(Some(Duration::from_secs(30))) + .http2_keep_alive_timeout(Duration::from_secs(15)) + .build() + .unwrap(); + + Self { client, args: config } + } + + pub async fn is_allowed(&self, args: &PArgs<'_>) -> bool { + let payload = self.build_opa_input(args); + + let mut request = self.client.post(self.args.url.clone()).json(&payload); + if !self.args.auth_token.is_empty() { + request = request.header("Authorization", format!("Bearer {}", self.args.auth_token)); + } + + match request.send().await { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + error!("OPA returned non-success status: {}", status); + return false; + } + + match resp.json::().await { + Ok(response_enum) => match response_enum { + OpaResponseEnum::SimpleResult(result) => result.result, + OpaResponseEnum::AllowResult(response) => response.result.allow, + }, + Err(err) => { + error!("Error parsing OPA response: {:?}", err); + false + } + } + } + Err(err) => { + error!("Error sending request to OPA: {:?}", err); + false + } + } + } + + fn build_opa_input(&self, args: &PArgs<'_>) -> serde_json::Value { + let groups = match args.groups { + Some(g) => g.clone(), + None => vec![], + }; + let action_str: &str = (&args.action).into(); + json!({ + // Core authorization parameters for OPA policy evaluation + "input":{ + "identity": { + "account": args.account, + "groups": groups, + "is_owner": args.is_owner, + "claims": args.claims + }, + + "resource": { + "bucket": args.bucket, + "object": args.object, + "arn": if args.object.is_empty() { + format!("arn:aws:s3:::{}", args.bucket) + } else { + format!("arn:aws:s3:::{}/{}", args.bucket, args.object) + } + }, + + "action": action_str, + + "context": { + "conditions": args.conditions, + "deny_only": args.deny_only, + "timestamp": chrono::Utc::now().to_rfc3339() + } + } + }) + } +} + +#[derive(Deserialize, Default)] +struct OpaResultAllow { + allow: bool, +} + +#[derive(Deserialize, Default)] +struct OpaResult { + result: bool, +} + +#[derive(Deserialize, Default)] +struct OpaResponse { + result: OpaResultAllow, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum OpaResponseEnum { + SimpleResult(OpaResult), + AllowResult(OpaResponse), +} + +#[cfg(test)] +mod tests { + use super::*; + use temp_env; + + #[test] + fn test_check_valid_config() { + // Use temp_env to temporarily set environment variables + temp_env::with_vars( + [ + ("RUSTFS_POLICY_PLUGIN_URL", Some("http://localhost:8181/v1/data/rustfs/authz/allow")), + ("RUSTFS_POLICY_PLUGIN_AUTH_TOKEN", Some("test-token")), + ], + || { + assert!(check().is_ok()); + }, + ); + } + + #[test] + fn test_check_missing_required_env() { + temp_env::with_var_unset("RUSTFS_POLICY_PLUGIN_URL", || { + temp_env::with_var("RUSTFS_POLICY_PLUGIN_AUTH_TOKEN", Some("test-token"), || { + let result = check(); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Missing required env var")); + }); + }); + } + + #[test] + fn test_check_invalid_env_vars() { + temp_env::with_vars( + [ + ("RUSTFS_POLICY_PLUGIN_URL", Some("http://localhost:8181/v1/data/rustfs/authz/allow")), + ("RUSTFS_POLICY_PLUGIN_INVALID", Some("invalid-value")), + ], + || { + let result = check(); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Invalid env vars")); + }, + ); + } + + #[test] + fn test_lookup_config_not_enabled() { + temp_env::with_var_unset("RUSTFS_POLICY_PLUGIN_URL", || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { lookup_config().await }); + + // Should return the default empty Args + assert!(result.is_ok()); + let args = result.unwrap(); + assert!(!args.enable()); + assert_eq!(args.url, ""); + assert_eq!(args.auth_token, ""); + }); + } + + #[test] + fn test_args_enable() { + // Test Args enable method + let args_enabled = Args { + url: "http://localhost:8181".to_string(), + auth_token: "token".to_string(), + }; + assert!(args_enabled.enable()); + + let args_disabled = Args { + url: "".to_string(), + auth_token: "".to_string(), + }; + assert!(!args_disabled.enable()); + } +} diff --git a/crates/policy/src/policy/policy.rs b/crates/policy/src/policy/policy.rs index 4af04922..c4596fae 100644 --- a/crates/policy/src/policy/policy.rs +++ b/crates/policy/src/policy/policy.rs @@ -17,7 +17,6 @@ use crate::error::{Error, Result}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::{HashMap, HashSet}; - /// DEFAULT_VERSION is the default version. /// https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html pub const DEFAULT_VERSION: &str = "2012-10-17"; diff --git a/crates/utils/src/certs.rs b/crates/utils/src/certs.rs index 86aa4a01..9fc40bf3 100644 --- a/crates/utils/src/certs.rs +++ b/crates/utils/src/certs.rs @@ -102,7 +102,7 @@ pub fn load_all_certs_from_directory( let path = entry.path(); if path.is_dir() { - let domain_name = path + let domain_name: &str = path .file_name() .and_then(|name| name.to_str()) .ok_or_else(|| certs_error(format!("invalid domain name directory:{path:?}")))?; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 6124795a..34eb95fc 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1854,20 +1854,26 @@ impl S3 for FS { .await .is_err() { - bucket_infos.retain(|info| { - let req_info = req.extensions.get_mut::().expect("ReqInfo not found"); + bucket_infos = futures::stream::iter(bucket_infos) + .filter_map(|info| async { + let mut req_clone = req.clone(); + let req_info = req_clone.extensions.get_mut::().expect("ReqInfo not found"); + req_info.bucket = Some(info.name.clone()); - req_info.bucket = Some(info.name.clone()); - - futures::executor::block_on(async { - authorize_request(&mut req, Action::S3Action(S3Action::ListBucketAction)) + if authorize_request(&mut req_clone, Action::S3Action(S3Action::ListBucketAction)) .await .is_ok() - || authorize_request(&mut req, Action::S3Action(S3Action::GetBucketLocationAction)) + || authorize_request(&mut req_clone, Action::S3Action(S3Action::GetBucketLocationAction)) .await .is_ok() + { + Some(info) + } else { + None + } }) - }); + .collect() + .await; } let buckets: Vec = bucket_infos diff --git a/scripts/run.sh b/scripts/run.sh index 04e1ca67..f2a98634 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -115,6 +115,9 @@ export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook noti export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook notification address export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify" +# export RUSTFS_POLICY_PLUGIN_URL="http://localhost:8181/v1/data/rustfs/authz/allow" # The URL of the OPA system +# export RUSTFS_POLICY_PLUGIN_AUTH_TOKEN="your-opa-token" # The authentication token for the OPA system is optional + export RUSTFS_NS_SCANNER_INTERVAL=60 # Object scanning interval in seconds # exportRUSTFS_SKIP_BACKGROUND_TASK=true