feat: Simple OPA support (#644)

* opa-feature

* Update crates/policy/src/policy/opa.rs

* add the content related to 'Copyright'

---------

Co-authored-by: root <root@debian.localdomain>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
gatewayJ
2025-10-16 10:35:26 +08:00
committed by GitHub
parent d447b3e426
commit aae768f446
15 changed files with 391 additions and 13 deletions

7
.vscode/launch.json vendored
View File

@@ -20,7 +20,10 @@
}
},
"env": {
"RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=info"
"RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=debug",
"RUSTFS_SKIP_BACKGROUND_TASK": "on",
// "RUSTFS_POLICY_PLUGIN_URL":"http://localhost:8181/v1/data/rustfs/authz/allow",
// "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN":"your-opa-token"
},
"args": [
"--access-key",
@@ -29,6 +32,8 @@
"rustfsadmin",
"--address",
"0.0.0.0:9010",
"--server-domains",
"127.0.0.1:9010",
"./target/volume/test{1...4}"
],
"cwd": "${workspaceFolder}"

6
Cargo.lock generated
View File

@@ -6662,6 +6662,7 @@ dependencies = [
"futures",
"jsonwebtoken",
"rand 0.9.2",
"rustfs-config",
"rustfs-crypto",
"rustfs-ecstore",
"rustfs-madmin",
@@ -6823,18 +6824,23 @@ name = "rustfs-policy"
version = "0.0.5"
dependencies = [
"base64-simd",
"chrono",
"ipnetwork",
"jsonwebtoken",
"rand 0.9.2",
"regex",
"reqwest",
"rustfs-config",
"rustfs-crypto",
"serde",
"serde_json",
"strum 0.27.2",
"temp-env",
"test-case",
"thiserror 2.0.17",
"time",
"tokio",
"tracing",
]
[[package]]

View File

@@ -28,6 +28,7 @@ members = [
"crates/madmin", # Management dashboard and admin API interface
"crates/notify", # Notification system for events
"crates/obs", # Observability utilities
"crates/policy",# Policy management
"crates/protos", # Protocol buffer definitions
"crates/rio", # Rust I/O utilities and abstractions
"crates/targets", # Target-specific configurations and utilities

View File

@@ -36,4 +36,4 @@ audit = ["dep:const-str", "constants"]
constants = ["dep:const-str"]
notify = ["dep:const-str", "constants"]
observability = ["constants"]
opa = ["constants"]

View File

@@ -32,3 +32,5 @@ pub mod audit;
pub mod notify;
#[cfg(feature = "observability")]
pub mod observability;
#[cfg(feature = "opa")]
pub mod opa;

View File

@@ -0,0 +1,21 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//opa env vars
pub const ENV_POLICY_PLUGIN_OPA_URL: &str = "RUSTFS_POLICY_PLUGIN_URL";
pub const ENV_POLICY_PLUGIN_AUTH_TOKEN: &str = "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN";
pub const ENV_POLICY_PLUGIN_KEYS: &[&str] = &[ENV_POLICY_PLUGIN_OPA_URL, ENV_POLICY_PLUGIN_AUTH_TOKEN];
pub const POLICY_PLUGIN_SUB_SYS: &str = "policy_plugin";

View File

@@ -34,6 +34,7 @@ time = { workspace = true, features = ["serde-human-readable"] }
serde = { workspace = true, features = ["derive", "rc"] }
rustfs-ecstore = { workspace = true }
rustfs-policy.workspace = true
rustfs-config.workspace = true
serde_json.workspace = true
async-trait.workspace = true
thiserror.workspace = true

View File

@@ -35,14 +35,17 @@ use rustfs_policy::auth::{
is_access_key_valid, is_secret_key_valid,
};
use rustfs_policy::policy::Args;
use rustfs_policy::policy::opa;
use rustfs_policy::policy::{EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, iam_policy_claim_name_sa};
use rustfs_utils::crypto::{base64_decode, base64_encode};
use serde_json::Value;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use time::OffsetDateTime;
use tracing::warn;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
pub const MAX_SVCSESSION_POLICY_SIZE: usize = 4096;
@@ -53,6 +56,12 @@ pub const POLICYNAME: &str = "policy";
pub const SESSION_POLICY_NAME: &str = "sessionPolicy";
pub const SESSION_POLICY_NAME_EXTRACTED: &str = "sessionPolicy-extracted";
static POLICY_PLUGIN_CLIENT: OnceLock<Arc<RwLock<Option<rustfs_policy::policy::opa::AuthZPlugin>>>> = OnceLock::new();
fn get_policy_plugin_client() -> Arc<RwLock<Option<rustfs_policy::policy::opa::AuthZPlugin>>> {
POLICY_PLUGIN_CLIENT.get_or_init(|| Arc::new(RwLock::new(None))).clone()
}
pub struct IamSys<T> {
store: Arc<IamCache<T>>,
roles_map: HashMap<ARN, String>,
@@ -60,6 +69,20 @@ pub struct IamSys<T> {
impl<T: Store> IamSys<T> {
pub fn new(store: Arc<IamCache<T>>) -> Self {
tokio::spawn(async move {
match opa::lookup_config().await {
Ok(conf) => {
if conf.enable() {
Self::set_policy_plugin_client(opa::AuthZPlugin::new(conf)).await;
info!("OPA plugin enabled");
}
}
Err(e) => {
error!("Error loading OPA configuration err:{}", e);
}
};
});
Self {
store,
roles_map: HashMap::new(),
@@ -69,6 +92,18 @@ impl<T: Store> IamSys<T> {
self.store.api.has_watcher()
}
pub async fn set_policy_plugin_client(client: rustfs_policy::policy::opa::AuthZPlugin) {
let policy_plugin_client = get_policy_plugin_client();
let mut guard = policy_plugin_client.write().await;
*guard = Some(client);
}
pub async fn get_policy_plugin_client() -> Option<rustfs_policy::policy::opa::AuthZPlugin> {
let policy_plugin_client = get_policy_plugin_client();
let guard = policy_plugin_client.read().await;
guard.clone()
}
pub async fn load_group(&self, name: &str) -> Result<()> {
self.store.group_notification_handler(name).await
}
@@ -766,6 +801,11 @@ impl<T: Store> IamSys<T> {
return true;
}
let opa_enable = Self::get_policy_plugin_client().await;
if let Some(opa_enable) = opa_enable {
return opa_enable.is_allowed(args).await;
}
let Ok((is_temp, parent_user)) = self.is_temp_user(args.account).await else { return false };
if is_temp {

View File

@@ -29,6 +29,7 @@ documentation = "https://docs.rs/rustfs-policy/latest/rustfs_policy/"
workspace = true
[dependencies]
rustfs-config = { workspace = true, features = ["constants","opa"] }
tokio.workspace = true
time = { workspace = true, features = ["serde-human-readable"] }
serde = { workspace = true, features = ["derive", "rc"] }
@@ -41,6 +42,10 @@ rand.workspace = true
base64-simd = { workspace = true }
jsonwebtoken = { workspace = true }
regex = { workspace = true }
reqwest.workspace = true
chrono.workspace = true
tracing.workspace = true
[dev-dependencies]
test-case.workspace = true
temp-env = { workspace = true }

View File

@@ -17,6 +17,7 @@ mod doc;
mod effect;
mod function;
mod id;
pub mod opa;
#[allow(clippy::module_inception)]
mod policy;
mod principal;

View File

@@ -0,0 +1,288 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::policy::Args as PArgs;
use rustfs_config::{ENV_PREFIX, opa::*};
use serde::Deserialize;
use serde_json::json;
use std::{collections::HashMap, env, time::Duration};
use tracing::{error, info};
#[derive(Debug, Clone, Default)]
pub struct Args {
pub url: String,
pub auth_token: String,
}
impl Args {
pub fn enable(&self) -> bool {
!self.url.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct AuthZPlugin {
client: reqwest::Client,
args: Args,
}
fn check() -> Result<(), String> {
let env_list = env::vars();
let mut candidate = HashMap::new();
let prefix = format!("{}{}", ENV_PREFIX, POLICY_PLUGIN_SUB_SYS).to_uppercase();
for (key, value) in env_list {
if key.starts_with(&prefix) {
candidate.insert(key.to_string(), value);
}
}
//check required env vars
if candidate.remove(ENV_POLICY_PLUGIN_OPA_URL).is_none() {
return Err(format!("Missing required env var: {}", ENV_POLICY_PLUGIN_OPA_URL));
}
// check optional env vars
candidate.remove(ENV_POLICY_PLUGIN_AUTH_TOKEN);
if !candidate.is_empty() {
return Err(format!("Invalid env vars: {:?}", candidate));
}
Ok(())
}
async fn validate(config: &Args) -> Result<(), String> {
let client = reqwest::Client::new();
match client.post(&config.url).send().await {
Ok(resp) => {
match resp.status() {
reqwest::StatusCode::OK => {
info!("OPA is ready to accept requests.");
}
_ => {
return Err(format!("OPA returned an error: {}", resp.status()));
}
};
}
Err(err) => {
return Err(format!("Error connecting to OPA: {}", err));
}
};
Ok(())
}
pub async fn lookup_config() -> Result<Args, String> {
let args = Args::default();
let get_cfg =
|cfg: &str| -> Result<String, String> { env::var(cfg).map_err(|e| format!("Error getting env var {}: {:?}", cfg, e)) };
let url = get_cfg(ENV_POLICY_PLUGIN_OPA_URL);
if url.is_err() {
info!("OPA is not enabled.");
return Ok(args);
}
check()?;
let args = Args {
url: url.ok().unwrap(),
auth_token: get_cfg(ENV_POLICY_PLUGIN_AUTH_TOKEN).unwrap_or_default(),
};
validate(&args).await?;
Ok(args)
}
impl AuthZPlugin {
pub fn new(config: Args) -> Self {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(1))
.pool_max_idle_per_host(10)
.pool_idle_timeout(Some(Duration::from_secs(60)))
.tcp_keepalive(Some(Duration::from_secs(30)))
.tcp_nodelay(true)
.http2_keep_alive_interval(Some(Duration::from_secs(30)))
.http2_keep_alive_timeout(Duration::from_secs(15))
.build()
.unwrap();
Self { client, args: config }
}
pub async fn is_allowed(&self, args: &PArgs<'_>) -> bool {
let payload = self.build_opa_input(args);
let mut request = self.client.post(self.args.url.clone()).json(&payload);
if !self.args.auth_token.is_empty() {
request = request.header("Authorization", format!("Bearer {}", self.args.auth_token));
}
match request.send().await {
Ok(resp) => {
let status = resp.status();
if !status.is_success() {
error!("OPA returned non-success status: {}", status);
return false;
}
match resp.json::<OpaResponseEnum>().await {
Ok(response_enum) => match response_enum {
OpaResponseEnum::SimpleResult(result) => result.result,
OpaResponseEnum::AllowResult(response) => response.result.allow,
},
Err(err) => {
error!("Error parsing OPA response: {:?}", err);
false
}
}
}
Err(err) => {
error!("Error sending request to OPA: {:?}", err);
false
}
}
}
fn build_opa_input(&self, args: &PArgs<'_>) -> serde_json::Value {
let groups = match args.groups {
Some(g) => g.clone(),
None => vec![],
};
let action_str: &str = (&args.action).into();
json!({
// Core authorization parameters for OPA policy evaluation
"input":{
"identity": {
"account": args.account,
"groups": groups,
"is_owner": args.is_owner,
"claims": args.claims
},
"resource": {
"bucket": args.bucket,
"object": args.object,
"arn": if args.object.is_empty() {
format!("arn:aws:s3:::{}", args.bucket)
} else {
format!("arn:aws:s3:::{}/{}", args.bucket, args.object)
}
},
"action": action_str,
"context": {
"conditions": args.conditions,
"deny_only": args.deny_only,
"timestamp": chrono::Utc::now().to_rfc3339()
}
}
})
}
}
#[derive(Deserialize, Default)]
struct OpaResultAllow {
allow: bool,
}
#[derive(Deserialize, Default)]
struct OpaResult {
result: bool,
}
#[derive(Deserialize, Default)]
struct OpaResponse {
result: OpaResultAllow,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum OpaResponseEnum {
SimpleResult(OpaResult),
AllowResult(OpaResponse),
}
#[cfg(test)]
mod tests {
use super::*;
use temp_env;
#[test]
fn test_check_valid_config() {
// Use temp_env to temporarily set environment variables
temp_env::with_vars(
[
("RUSTFS_POLICY_PLUGIN_URL", Some("http://localhost:8181/v1/data/rustfs/authz/allow")),
("RUSTFS_POLICY_PLUGIN_AUTH_TOKEN", Some("test-token")),
],
|| {
assert!(check().is_ok());
},
);
}
#[test]
fn test_check_missing_required_env() {
temp_env::with_var_unset("RUSTFS_POLICY_PLUGIN_URL", || {
temp_env::with_var("RUSTFS_POLICY_PLUGIN_AUTH_TOKEN", Some("test-token"), || {
let result = check();
assert!(result.is_err());
assert!(result.unwrap_err().contains("Missing required env var"));
});
});
}
#[test]
fn test_check_invalid_env_vars() {
temp_env::with_vars(
[
("RUSTFS_POLICY_PLUGIN_URL", Some("http://localhost:8181/v1/data/rustfs/authz/allow")),
("RUSTFS_POLICY_PLUGIN_INVALID", Some("invalid-value")),
],
|| {
let result = check();
assert!(result.is_err());
assert!(result.unwrap_err().contains("Invalid env vars"));
},
);
}
#[test]
fn test_lookup_config_not_enabled() {
temp_env::with_var_unset("RUSTFS_POLICY_PLUGIN_URL", || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { lookup_config().await });
// Should return the default empty Args
assert!(result.is_ok());
let args = result.unwrap();
assert!(!args.enable());
assert_eq!(args.url, "");
assert_eq!(args.auth_token, "");
});
}
#[test]
fn test_args_enable() {
// Test Args enable method
let args_enabled = Args {
url: "http://localhost:8181".to_string(),
auth_token: "token".to_string(),
};
assert!(args_enabled.enable());
let args_disabled = Args {
url: "".to_string(),
auth_token: "".to_string(),
};
assert!(!args_disabled.enable());
}
}

View File

@@ -17,7 +17,6 @@ use crate::error::{Error, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
/// DEFAULT_VERSION is the default version.
/// https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html
pub const DEFAULT_VERSION: &str = "2012-10-17";

View File

@@ -102,7 +102,7 @@ pub fn load_all_certs_from_directory(
let path = entry.path();
if path.is_dir() {
let domain_name = path
let domain_name: &str = path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| certs_error(format!("invalid domain name directory:{path:?}")))?;

View File

@@ -1854,20 +1854,26 @@ impl S3 for FS {
.await
.is_err()
{
bucket_infos.retain(|info| {
let req_info = req.extensions.get_mut::<ReqInfo>().expect("ReqInfo not found");
bucket_infos = futures::stream::iter(bucket_infos)
.filter_map(|info| async {
let mut req_clone = req.clone();
let req_info = req_clone.extensions.get_mut::<ReqInfo>().expect("ReqInfo not found");
req_info.bucket = Some(info.name.clone());
req_info.bucket = Some(info.name.clone());
futures::executor::block_on(async {
authorize_request(&mut req, Action::S3Action(S3Action::ListBucketAction))
if authorize_request(&mut req_clone, Action::S3Action(S3Action::ListBucketAction))
.await
.is_ok()
|| authorize_request(&mut req, Action::S3Action(S3Action::GetBucketLocationAction))
|| authorize_request(&mut req_clone, Action::S3Action(S3Action::GetBucketLocationAction))
.await
.is_ok()
{
Some(info)
} else {
None
}
})
});
.collect()
.await;
}
let buckets: Vec<Bucket> = bucket_infos

View File

@@ -115,6 +115,9 @@ export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook noti
export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook notification address
export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify"
# export RUSTFS_POLICY_PLUGIN_URL="http://localhost:8181/v1/data/rustfs/authz/allow" # The URL of the OPA system
# export RUSTFS_POLICY_PLUGIN_AUTH_TOKEN="your-opa-token" # The authentication token for the OPA system is optional
export RUSTFS_NS_SCANNER_INTERVAL=60 # Object scanning interval in seconds
# exportRUSTFS_SKIP_BACKGROUND_TASK=true