mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
add rustfs-trusted-proxies package
This commit is contained in:
4
.github/actions/setup/action.yml
vendored
4
.github/actions/setup/action.yml
vendored
@@ -52,6 +52,10 @@ runs:
|
||||
sudo apt-get install -y \
|
||||
musl-tools \
|
||||
build-essential \
|
||||
cmake \
|
||||
libclang-dev \
|
||||
golang \
|
||||
perl \
|
||||
pkg-config \
|
||||
libssl-dev
|
||||
|
||||
|
||||
380
Cargo.lock
generated
380
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
25
Cargo.toml
25
Cargo.toml
@@ -15,8 +15,10 @@
|
||||
[workspace]
|
||||
members = [
|
||||
"rustfs", # Core file system implementation
|
||||
"crates/ahm", # Asynchronous Hash Map for concurrent data structures
|
||||
"crates/appauth", # Application authentication and authorization
|
||||
"crates/audit", # Audit target management system with multi-target fan-out
|
||||
"crates/checksums", # client checksums
|
||||
"crates/common", # Shared utilities and data structures
|
||||
"crates/config", # Configuration management
|
||||
"crates/credentials", # Credential management system
|
||||
@@ -25,8 +27,10 @@ members = [
|
||||
"crates/e2e_test", # End-to-end test suite
|
||||
"crates/filemeta", # File metadata management
|
||||
"crates/iam", # Identity and Access Management
|
||||
"crates/kms", # Key Management Service
|
||||
"crates/lock", # Distributed locking implementation
|
||||
"crates/madmin", # Management dashboard and admin API interface
|
||||
"crates/mcp", # MCP server for S3 operations
|
||||
"crates/notify", # Notification system for events
|
||||
"crates/obs", # Observability utilities
|
||||
"crates/policy", # Policy management
|
||||
@@ -36,13 +40,10 @@ members = [
|
||||
"crates/s3select-api", # S3 Select API interface
|
||||
"crates/s3select-query", # S3 Select query engine
|
||||
"crates/signer", # client signer
|
||||
"crates/checksums", # client checksums
|
||||
"crates/trusted-proxies", # Trusted proxies management
|
||||
"crates/utils", # Utility functions and helpers
|
||||
"crates/workers", # Worker thread pools and task scheduling
|
||||
"crates/zip", # ZIP file handling and compression
|
||||
"crates/ahm", # Asynchronous Hash Map for concurrent data structures
|
||||
"crates/mcp", # MCP server for S3 operations
|
||||
"crates/kms", # Key Management Service
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
@@ -90,6 +91,7 @@ rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
|
||||
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
|
||||
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
|
||||
rustfs-targets = { path = "crates/targets", version = "0.0.5" }
|
||||
rustfs-trusted-proxies = { path = "crates/trusted-proxies", version = "0.0.5" }
|
||||
rustfs-utils = { path = "crates/utils", version = "0.0.5" }
|
||||
rustfs-workers = { path = "crates/workers", version = "0.0.5" }
|
||||
rustfs-zip = { path = "./crates/zip", version = "0.0.5" }
|
||||
@@ -100,21 +102,22 @@ async-compression = { version = "0.4.19" }
|
||||
async-recursion = "1.1.1"
|
||||
async-trait = "0.1.89"
|
||||
axum = "0.8.8"
|
||||
axum-server = { version = "0.8.0", features = ["tls-rustls-no-provider"], default-features = false }
|
||||
axum-server = { version = "0.8.0", features = ["tls-rustls"], default-features = false }
|
||||
futures = "0.3.31"
|
||||
futures-core = "0.3.31"
|
||||
futures-util = "0.3.31"
|
||||
pollster = "0.4.0"
|
||||
hyper = { version = "1.8.1", features = ["http2", "http1", "server"] }
|
||||
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "ring", "webpki-roots"] }
|
||||
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "aws-lc-rs", "webpki-roots"] }
|
||||
hyper-util = { version = "0.1.19", features = ["tokio", "server-auto", "server-graceful"] }
|
||||
http = "1.4.0"
|
||||
http-body = "1.0.1"
|
||||
http-body-util = "0.1.3"
|
||||
reqwest = { version = "0.12.28", default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
|
||||
#reqwest = { version = "0.13.1", default-features = false, features = ["rustls", "charset", "http2", "system-proxy", "stream", "json", "blocking", "query", "form"] }
|
||||
reqwest = { version = "0.12.28", default-features = false, features = ["rustls-tls-no-provider", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
|
||||
socket2 = "0.6.1"
|
||||
tokio = { version = "1.48.0", features = ["fs", "rt-multi-thread"] }
|
||||
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "ring"] }
|
||||
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "aws-lc-rs"] }
|
||||
tokio-stream = { version = "0.1.17" }
|
||||
tokio-test = "0.4.4"
|
||||
tokio-util = { version = "0.7.17", features = ["io", "compat"] }
|
||||
@@ -150,7 +153,7 @@ hmac = { version = "0.13.0-rc.3" }
|
||||
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
|
||||
pbkdf2 = "0.13.0-rc.5"
|
||||
rsa = { version = "0.10.0-rc.10" }
|
||||
rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false }
|
||||
rustls = { version = "0.23.35" }
|
||||
rustls-pemfile = "2.2.0"
|
||||
rustls-pki-types = "1.13.2"
|
||||
sha1 = "0.11.0-rc.3"
|
||||
@@ -171,14 +174,14 @@ atoi = "2.0.0"
|
||||
atomic_enum = "0.3.0"
|
||||
aws-config = { version = "1.8.12" }
|
||||
aws-credential-types = { version = "1.2.11" }
|
||||
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
|
||||
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
|
||||
aws-smithy-types = { version = "1.3.5" }
|
||||
base64 = "0.22.1"
|
||||
base64-simd = "0.8.0"
|
||||
brotli = "8.0.2"
|
||||
cfg-if = "1.0.4"
|
||||
clap = { version = "4.5.53", features = ["derive", "env"] }
|
||||
const-str = { version = "0.7.1", features = ["std", "proc"] }
|
||||
const-str = { version = "1.0.0", features = ["std", "proc"] }
|
||||
convert_case = "0.10.0"
|
||||
criterion = { version = "0.8", features = ["html_reports"] }
|
||||
crossbeam-queue = "0.3.12"
|
||||
|
||||
@@ -35,6 +35,10 @@ RUN set -eux; \
|
||||
ca-certificates \
|
||||
curl \
|
||||
git \
|
||||
cmake \
|
||||
libclang-dev \
|
||||
golang \
|
||||
perl \
|
||||
pkg-config \
|
||||
libssl-dev \
|
||||
lld \
|
||||
|
||||
@@ -20,6 +20,7 @@ pub(crate) mod env;
|
||||
pub(crate) mod heal;
|
||||
pub(crate) mod object;
|
||||
pub(crate) mod profiler;
|
||||
pub(crate) mod proxy;
|
||||
pub(crate) mod runtime;
|
||||
pub(crate) mod targets;
|
||||
pub(crate) mod tls;
|
||||
|
||||
28
crates/config/src/constants/proxy.rs
Normal file
28
crates/config/src/constants/proxy.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// RUSTFS_HTTP_TRUSTED_PROXIES
|
||||
/// Environment variable name for trusted proxies configuration
|
||||
/// Example: RUSTFS_HTTP_TRUSTED_PROXIES="127.0.0.1,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fc00::/7"
|
||||
/// If not set, defaults to local loopback and common private networks
|
||||
/// Used in proxy configuration loading
|
||||
/// Refer to `TrustedProxiesConfig` for details
|
||||
pub const ENV_TRUSTED_PROXIES: &str = "RUSTFS_HTTP_TRUSTED_PROXIES";
|
||||
|
||||
/// Default trusted proxies: Local loopback and common private networks
|
||||
/// Used when the environment variable is not set
|
||||
/// Format: Comma-separated list of IPs and CIDR blocks
|
||||
/// Example: RUSTFS_HTTP_TRUSTED_PROXIES="127.0.0.1,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fc00::/7"
|
||||
/// Refer to `TrustedProxiesConfig` for details
|
||||
pub const DEFAULT_TRUSTED_PROXIES: &str = "127.0.0.1,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fc00::/7";
|
||||
@@ -31,6 +31,8 @@ pub use constants::object::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::profiler::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::proxy::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::runtime::*;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::targets::*;
|
||||
|
||||
@@ -161,7 +161,7 @@ impl TransitionClient {
|
||||
async fn private_new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
|
||||
let endpoint_url = get_endpoint_url(endpoint, opts.secure)?;
|
||||
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
let scheme = endpoint_url.scheme();
|
||||
let client;
|
||||
let tls = if let Some(store) = load_root_store_from_tls_path() {
|
||||
|
||||
32
crates/trusted-proxies/Cargo.toml
Normal file
32
crates/trusted-proxies/Cargo.toml
Normal file
@@ -0,0 +1,32 @@
|
||||
[package]
|
||||
name = "rustfs-trusted-proxies"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
rust-version.workspace = true
|
||||
version.workspace = true
|
||||
homepage.workspace = true
|
||||
description = " RustFS Trusted Proxies module provides secure and efficient management of trusted proxy servers within the RustFS ecosystem, enhancing network security and performance."
|
||||
keywords = ["trusted-proxies", "network-security", "rustfs", "proxy-management"]
|
||||
categories = ["network-programming", "security", "web-programming"]
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
http = { workspace = true }
|
||||
ipnetwork = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
moka = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
rustfs-config = { workspace = true, features = ["constants"] }
|
||||
rustfs-utils = { workspace = true }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time", "test-util"] }
|
||||
tower = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
0
crates/trusted-proxies/README.md
Normal file
0
crates/trusted-proxies/README.md
Normal file
201
crates/trusted-proxies/src/advanced.rs
Normal file
201
crates/trusted-proxies/src/advanced.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::cloud::fetch_cloud_provider_ips;
|
||||
use crate::{ENV_PROXY_VALIDATION_MODE, MultiProxyConfig, TrustedProxiesConfig, TrustedProxy};
|
||||
use ipnetwork::IpNetwork;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
/// Agent handling error types
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ProxyError {
|
||||
#[error("Invalid IP address format: {0}")]
|
||||
InvalidIpFormat(String),
|
||||
|
||||
#[error("Proxy Chain Verification Failed: {0}")]
|
||||
ChainValidationFailed(String),
|
||||
|
||||
#[error("Head parsing error: {0}")]
|
||||
HeaderParseError(String),
|
||||
}
|
||||
|
||||
/// Proxy chain validation mode
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
|
||||
pub enum ValidationMode {
|
||||
/// Loose mode: Accept the entire chain as long as the last agent is trusted
|
||||
Lenient,
|
||||
/// Strict mode: All agents in the chain are required to be trustworthy
|
||||
Strict,
|
||||
/// Hop Validation: Find the first untrusted agent from right to left
|
||||
HopByHop,
|
||||
}
|
||||
|
||||
/// RFC 7239 Forwarded head parser
|
||||
/// Format:Forwarded: for=192.0.2.60;proto=http;by=203.0.113.43
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ForwardedHeader {
|
||||
/// Client address
|
||||
pub for_client: Option<IpAddr>,
|
||||
/// Proxy identification
|
||||
pub by_proxy: Option<IpAddr>,
|
||||
/// protocol
|
||||
pub proto: Option<String>,
|
||||
/// Host
|
||||
pub host: Option<String>,
|
||||
}
|
||||
|
||||
/// Proxy chain analysis results
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProxyChainAnalysis {
|
||||
/// Trusted client IP (verified)
|
||||
pub trusted_client_ip: IpAddr,
|
||||
/// Complete proxy chain (from client to current agent)
|
||||
pub full_proxy_chain: Vec<IpAddr>,
|
||||
/// Trusted Proxy Chain section
|
||||
pub trusted_proxy_chain: Vec<IpAddr>,
|
||||
/// Verify the number of hops that pass
|
||||
pub validated_hops: usize,
|
||||
/// Whether it passes the integrity check
|
||||
pub chain_integrity: bool,
|
||||
/// The validation mode used
|
||||
pub validation_mode_used: ValidationMode,
|
||||
/// Possible safety warnings
|
||||
pub security_warnings: Vec<String>,
|
||||
}
|
||||
|
||||
/// Optimized CIDR matcher
|
||||
pub struct CidrMatcher {
|
||||
ipv4_networks: Vec<IpNetwork>,
|
||||
ipv6_networks: Vec<IpNetwork>,
|
||||
single_ips: HashSet<IpAddr>,
|
||||
}
|
||||
|
||||
impl CidrMatcher {
|
||||
pub fn new(config: &TrustedProxiesConfig) -> Self {
|
||||
let mut ipv4_networks = Vec::new();
|
||||
let mut ipv6_networks = Vec::new();
|
||||
let mut single_ips = HashSet::new();
|
||||
|
||||
for proxy in &config.proxies {
|
||||
match proxy {
|
||||
TrustedProxy::Single(ip) => {
|
||||
single_ips.insert(*ip);
|
||||
}
|
||||
TrustedProxy::Cidr(network) => match network {
|
||||
IpNetwork::V4(_) => ipv4_networks.push(*network),
|
||||
IpNetwork::V6(_) => ipv6_networks.push(*network),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by prefix length, with more specific networks taking precedence
|
||||
ipv4_networks.sort_by(|a, b| b.prefix().cmp(&a.prefix()));
|
||||
ipv6_networks.sort_by(|a, b| b.prefix().cmp(&a.prefix()));
|
||||
|
||||
Self {
|
||||
ipv4_networks,
|
||||
ipv6_networks,
|
||||
single_ips,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, ip: &IpAddr) -> bool {
|
||||
// Start by checking individual IPs
|
||||
if self.single_ips.contains(ip) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Then check the CIDR range
|
||||
match ip {
|
||||
IpAddr::V4(ipv4) => {
|
||||
for network in &self.ipv4_networks {
|
||||
if let IpNetwork::V4(v4_net) = network {
|
||||
if v4_net.contains(*ipv4) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
IpAddr::V6(ipv6) => {
|
||||
for network in &self.ipv6_networks {
|
||||
if let IpNetwork::V6(v6_net) = network {
|
||||
if v6_net.contains(*ipv6) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Environment-specific configuration loading
|
||||
pub fn load_production_config() -> MultiProxyConfig {
|
||||
let mut config = MultiProxyConfig::default();
|
||||
|
||||
// Read from environment variables
|
||||
if let Ok(mode_str) = std::env::var(ENV_PROXY_VALIDATION_MODE) {
|
||||
config.validation_mode = match mode_str.as_str() {
|
||||
"strict" => ValidationMode::Strict,
|
||||
"lenient" => ValidationMode::Lenient,
|
||||
_ => ValidationMode::HopByHop,
|
||||
};
|
||||
}
|
||||
|
||||
// Add trusted agents from cloud provider metadata
|
||||
if let Ok(cloud_ips) = fetch_cloud_provider_ips() {
|
||||
for ip_range in cloud_ips {
|
||||
if let Ok(network) = ip_range.parse() {
|
||||
config.allowed_private_nets.push(network);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
|
||||
/// Dynamic configuration updates
|
||||
pub struct DynamicConfigManager {
|
||||
current_config: Arc<std::sync::RwLock<MultiProxyConfig>>,
|
||||
config_watcher: tokio::sync::watch::Sender<MultiProxyConfig>,
|
||||
}
|
||||
|
||||
impl DynamicConfigManager {
|
||||
pub fn new(initial_config: MultiProxyConfig) -> Self {
|
||||
let (sender, _) = tokio::sync::watch::channel(initial_config.clone());
|
||||
|
||||
Self {
|
||||
current_config: Arc::new(std::sync::RwLock::new(initial_config)),
|
||||
config_watcher: sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_config(&self, new_config: MultiProxyConfig) {
|
||||
let mut config = self.current_config.write().unwrap();
|
||||
*config = new_config.clone();
|
||||
|
||||
// Notify all listeners
|
||||
let _ = self.config_watcher.send(new_config);
|
||||
}
|
||||
|
||||
pub fn get_config(&self) -> MultiProxyConfig {
|
||||
self.current_config.read().unwrap().clone()
|
||||
}
|
||||
}
|
||||
51
crates/trusted-proxies/src/cache.rs
Normal file
51
crates/trusted-proxies/src/cache.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use moka::future::Cache;
|
||||
use std::net::IpAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
/// High-performance IP verification cache
|
||||
pub struct IpValidationCache {
|
||||
/// Moka cache
|
||||
cache: Cache<IpAddr, bool>,
|
||||
/// Default cache expiration time
|
||||
default_ttl: Duration,
|
||||
}
|
||||
|
||||
impl IpValidationCache {
|
||||
pub fn new(capacity: usize, default_ttl: Duration) -> Self {
|
||||
Self {
|
||||
cache: Cache::builder()
|
||||
.max_capacity(capacity as u64)
|
||||
.time_to_live(default_ttl)
|
||||
.build(),
|
||||
default_ttl,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the IP is trusted (with cache)
|
||||
pub async fn is_trusted(&self, ip: &IpAddr, validator: impl FnOnce(&IpAddr) -> bool) -> bool {
|
||||
if let Some(is_trusted) = self.cache.get(ip).await {
|
||||
return is_trusted;
|
||||
}
|
||||
|
||||
// Call the validation function
|
||||
let is_trusted = validator(ip);
|
||||
|
||||
// Update the cache
|
||||
self.cache.insert(*ip, is_trusted).await;
|
||||
is_trusted
|
||||
}
|
||||
}
|
||||
860
crates/trusted-proxies/src/cloud/metadata.rs
Normal file
860
crates/trusted-proxies/src/cloud/metadata.rs
Normal file
@@ -0,0 +1,860 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// src/cloud/metadata.rs
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ipnetwork::IpNetwork;
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Error in obtaining cloud service provider metadata
|
||||
#[derive(Error, Debug)]
|
||||
pub enum CloudMetadataError {
|
||||
#[error("HTTP request failed: {0}")]
|
||||
HttpRequestFailed(#[from] reqwest::Error),
|
||||
|
||||
#[error("JSON parsing fails: {0}")]
|
||||
JsonParseError(#[from] serde_json::Error),
|
||||
|
||||
#[error("Metadata Service Unavailable: {0}")]
|
||||
MetadataUnavailable(String),
|
||||
|
||||
#[error("IP address resolution failed: {0}")]
|
||||
IpParseError(String),
|
||||
|
||||
#[error("Unsupported cloud service providers: {0}")]
|
||||
UnsupportedProvider(String),
|
||||
|
||||
#[error("Misconfiguration: {0}")]
|
||||
ConfigurationError(String),
|
||||
}
|
||||
|
||||
/// Cloud service provider type
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum CloudProvider {
|
||||
Aws,
|
||||
Azure,
|
||||
Gcp,
|
||||
DigitalOcean,
|
||||
Vultr,
|
||||
Linode,
|
||||
Oracle,
|
||||
Alibaba,
|
||||
Tencent,
|
||||
/// Unknown or customized
|
||||
Unknown(String),
|
||||
}
|
||||
|
||||
impl CloudProvider {
|
||||
/// Automatically detect cloud service providers from environment variables
|
||||
pub fn detect_from_env() -> Option<Self> {
|
||||
// Check various cloud service provider-specific environment variables
|
||||
|
||||
// AWS
|
||||
if std::env::var("AWS_EXECUTION_ENV").is_ok()
|
||||
|| std::env::var("AWS_REGION").is_ok()
|
||||
|| std::env::var("EC2_INSTANCE_ID").is_ok()
|
||||
{
|
||||
return Some(Self::Aws);
|
||||
}
|
||||
|
||||
// Azure
|
||||
if std::env::var("WEBSITE_SITE_NAME").is_ok()
|
||||
|| std::env::var("WEBSITE_INSTANCE_ID").is_ok()
|
||||
|| std::env::var("APPSETTING_WEBSITE_SITE_NAME").is_ok()
|
||||
{
|
||||
return Some(Self::Azure);
|
||||
}
|
||||
|
||||
// GCP
|
||||
if std::env::var("GCP_PROJECT").is_ok()
|
||||
|| std::env::var("GOOGLE_CLOUD_PROJECT").is_ok()
|
||||
|| std::env::var("GAE_INSTANCE").is_ok()
|
||||
{
|
||||
return Some(Self::Gcp);
|
||||
}
|
||||
|
||||
// DigitalOcean
|
||||
if std::env::var("DIGITALOCEAN_REGION").is_ok() {
|
||||
return Some(Self::DigitalOcean);
|
||||
}
|
||||
|
||||
// Vultr
|
||||
if std::env::var("VULTR_REGION").is_ok() {
|
||||
return Some(Self::Vultr);
|
||||
}
|
||||
|
||||
// Linode
|
||||
if std::env::var("LINODE_REGION").is_ok() {
|
||||
return Some(Self::Linode);
|
||||
}
|
||||
|
||||
// Oracle
|
||||
if std::env::var("OCI_REGION").is_ok() {
|
||||
return Some(Self::Oracle);
|
||||
}
|
||||
|
||||
// Alibaba Cloud
|
||||
if std::env::var("ALIBABA_CLOUD_REGION").is_ok() {
|
||||
return Some(Self::Alibaba);
|
||||
}
|
||||
|
||||
// Tencent Cloud
|
||||
if std::env::var("TENCENTCLOUD_REGION").is_ok() {
|
||||
return Some(Self::Tencent);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Get the cloud service provider name
|
||||
pub fn name(&self) -> &str {
|
||||
match self {
|
||||
Self::Aws => "aws",
|
||||
Self::Azure => "azure",
|
||||
Self::Gcp => "gcp",
|
||||
Self::DigitalOcean => "digitalocean",
|
||||
Self::Vultr => "vultr",
|
||||
Self::Linode => "linode",
|
||||
Self::Oracle => "oracle",
|
||||
Self::Alibaba => "alibaba",
|
||||
Self::Tencent => "tencent",
|
||||
Self::Unknown(name) => name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cloud metadata fetcher characteristics
|
||||
#[async_trait]
|
||||
pub trait CloudMetadataFetcher {
|
||||
/// Get the cloud service provider name
|
||||
fn provider_name(&self) -> &str;
|
||||
|
||||
/// Gets the CIDR range of the network where the instance is located
|
||||
async fn fetch_network_cidrs(&self) -> Result<Vec<IpNetwork>, CloudMetadataError>;
|
||||
|
||||
/// Get the cloud service provider's public IP range (e.g., load balancer, NAT gateway, etc.)
|
||||
async fn fetch_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError>;
|
||||
|
||||
/// Get the IP range of a trusted proxy
|
||||
async fn fetch_trusted_proxy_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
// Default implementation: Merge network CIDR and public IP ranges
|
||||
let mut ranges = Vec::new();
|
||||
|
||||
match self.fetch_network_cidrs().await {
|
||||
Ok(cidrs) => ranges.extend(cidrs),
|
||||
Err(e) => warn!("Get network CIDR failed: {}", e),
|
||||
}
|
||||
|
||||
match self.fetch_public_ip_ranges().await {
|
||||
Ok(public_ranges) => ranges.extend(public_ranges),
|
||||
Err(e) => warn!("Failed to get public IP range: {}", e),
|
||||
}
|
||||
|
||||
Ok(ranges)
|
||||
}
|
||||
}
|
||||
|
||||
/// AWS Metadata Fetcher
|
||||
pub struct AwsMetadataFetcher {
|
||||
client: Client,
|
||||
metadata_endpoint: String,
|
||||
}
|
||||
|
||||
impl AwsMetadataFetcher {
|
||||
pub fn new() -> Self {
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(2))
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::new());
|
||||
|
||||
Self {
|
||||
client,
|
||||
metadata_endpoint: "http://169.254.169.254".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an IMDSv2 token
|
||||
async fn get_metadata_token(&self) -> Result<String, CloudMetadataError> {
|
||||
let url = format!("{}/latest/api/token", self.metadata_endpoint);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.put(&url)
|
||||
.header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
debug!("AWS IMDSv2 token acquisition failed, try IMDSv1: {}", e);
|
||||
CloudMetadataError::MetadataUnavailable(format!("IMDSv2 failed: {}", e))
|
||||
})?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let token = response.text().await?;
|
||||
Ok(token)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable("Unable to obtain IMDSv2 tokens".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Use tokens to get metadata
|
||||
async fn get_metadata_with_token(&self, path: &str, token: Option<&str>) -> Result<String, CloudMetadataError> {
|
||||
let url = format!("{}/latest/{}", self.metadata_endpoint, path);
|
||||
|
||||
let mut request = self.client.get(&url);
|
||||
|
||||
if let Some(t) = token {
|
||||
request = request.header("X-aws-ec2-metadata-token", t);
|
||||
}
|
||||
|
||||
let response = request.send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let text = response.text().await?;
|
||||
Ok(text)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable(format!(
|
||||
"Metadata path {} returns status:{}",
|
||||
path,
|
||||
response.status()
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a list of MAC addresses
|
||||
async fn get_mac_addresses(&self, token: Option<&str>) -> Result<Vec<String>, CloudMetadataError> {
|
||||
let text = self
|
||||
.get_metadata_with_token("meta-data/network/interfaces/macs/", token)
|
||||
.await?;
|
||||
|
||||
let macs: Vec<String> = text
|
||||
.lines()
|
||||
.map(|line| line.trim().trim_end_matches('/'))
|
||||
.filter(|mac| !mac.is_empty())
|
||||
.map(String::from)
|
||||
.collect();
|
||||
|
||||
Ok(macs)
|
||||
}
|
||||
|
||||
/// Get the VPC CIDR block
|
||||
async fn get_vpc_cidr_blocks(&self, token: Option<&str>) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
let macs = self.get_mac_addresses(token).await?;
|
||||
let mut cidrs = Vec::new();
|
||||
|
||||
for mac in macs {
|
||||
let path = format!("meta-data/network/interfaces/macs/{}/vpc-ipv4-cidr-block", mac);
|
||||
|
||||
match self.get_metadata_with_token(&path, token).await {
|
||||
Ok(cidr_text) => {
|
||||
let cidr_text = cidr_text.trim();
|
||||
if let Ok(network) = cidr_text.parse::<IpNetwork>() {
|
||||
cidrs.push(network);
|
||||
debug!("To get a VPC CIDR: {}", network);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Unable to obtain VPC CIDR for MAC {}: {}", mac, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cidrs)
|
||||
}
|
||||
|
||||
/// Get the subnet CIDR block
|
||||
async fn get_subnet_cidr_blocks(&self, token: Option<&str>) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
let macs = self.get_mac_addresses(token).await?;
|
||||
let mut cidrs = Vec::new();
|
||||
|
||||
for mac in macs {
|
||||
let path = format!("meta-data/network/interfaces/macs/{}/subnet-ipv4-cidr-block", mac);
|
||||
|
||||
match self.get_metadata_with_token(&path, token).await {
|
||||
Ok(cidr_text) => {
|
||||
let cidr_text = cidr_text.trim();
|
||||
if let Ok(network) = cidr_text.parse::<IpNetwork>() {
|
||||
cidrs.push(network);
|
||||
debug!("Get the subnet CIDR: {}", network);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("The subnet CIDR for MAC cannot be obtained {}: {}", mac, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cidrs)
|
||||
}
|
||||
|
||||
/// Get public IP ranges for AWS (from official sources)
|
||||
async fn get_aws_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
let url = "https://ip-ranges.amazonaws.com/ip-ranges.json";
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AwsIpRanges {
|
||||
prefixes: Vec<AwsPrefix>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AwsPrefix {
|
||||
ip_prefix: String,
|
||||
region: String,
|
||||
service: String,
|
||||
}
|
||||
|
||||
let response = self.client.get(url).timeout(Duration::from_secs(5)).send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let ip_ranges: AwsIpRanges = response.json().await?;
|
||||
|
||||
let mut ranges = Vec::new();
|
||||
for prefix in ip_ranges.prefixes {
|
||||
// Include only service-specific IP ranges (e.g., EC2, CLOUDFRONT, etc.)
|
||||
if matches!(prefix.service.as_str(), "EC2" | "CLOUDFRONT" | "ROUTE53" | "ROUTE53_HEALTHCHECKS") {
|
||||
if let Ok(network) = prefix.ip_prefix.parse::<IpNetwork>() {
|
||||
ranges.push(network);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("{} public IP ranges are obtained from AWS officially", ranges.len());
|
||||
Ok(ranges)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable(format!(
|
||||
"AWS IP Ranges API returns status:{}",
|
||||
response.status()
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CloudMetadataFetcher for AwsMetadataFetcher {
|
||||
fn provider_name(&self) -> &str {
|
||||
"aws"
|
||||
}
|
||||
|
||||
async fn fetch_network_cidrs(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
let mut cidrs = Vec::new();
|
||||
|
||||
// 尝试获取 IMDSv2 令牌
|
||||
let token = match self.get_metadata_token().await {
|
||||
Ok(t) => Some(t),
|
||||
Err(_) => {
|
||||
debug!("Using IMDSv1 (no token)");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// 获取 VPC CIDR
|
||||
match self.get_vpc_cidr_blocks(token.as_deref()).await {
|
||||
Ok(vpc_cidrs) => cidrs.extend(vpc_cidrs),
|
||||
Err(e) => debug!("Failed to obtain VPC CIDR:{}", e),
|
||||
}
|
||||
|
||||
// 获取子网 CIDR
|
||||
match self.get_subnet_cidr_blocks(token.as_deref()).await {
|
||||
Ok(subnet_cidrs) => cidrs.extend(subnet_cidrs),
|
||||
Err(e) => debug!("Failed to get subnet CIDR: {}", e),
|
||||
}
|
||||
|
||||
if cidrs.is_empty() {
|
||||
Err(CloudMetadataError::MetadataUnavailable("No network CIDR can be obtained".to_string()))
|
||||
} else {
|
||||
info!("{} network CIDRs were obtained from AWS metadata", cidrs.len());
|
||||
Ok(cidrs)
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
self.get_aws_public_ip_ranges().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Azure Metadata Fetcher
|
||||
pub struct AzureMetadataFetcher {
|
||||
client: Client,
|
||||
metadata_endpoint: String,
|
||||
}
|
||||
|
||||
impl AzureMetadataFetcher {
|
||||
pub fn new() -> Self {
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(2))
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::new());
|
||||
|
||||
Self {
|
||||
client,
|
||||
metadata_endpoint: "http://169.254.169.254".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get Azure metadata
|
||||
async fn get_metadata(&self, path: &str) -> Result<String, CloudMetadataError> {
|
||||
let url = format!("{}/metadata/{}?api-version=2021-05-01", self.metadata_endpoint, path);
|
||||
|
||||
let response = self.client.get(&url).header("Metadata", "true").send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let text = response.text().await?;
|
||||
Ok(text)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable(format!(
|
||||
"Azure metadata path {} Return status: {}",
|
||||
path,
|
||||
response.status()
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get Azure public IP ranges
|
||||
async fn get_azure_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
// Azure Public IP Range Download URL
|
||||
let urls = [
|
||||
"https://www.microsoft.com/en-us/download/confirmation.aspx?id=56519",
|
||||
"https://download.microsoft.com/download/7/1/D/71D86715-5596-4529-9B13-DA13A5DE5B63/ServiceTags_Public_20231211.json",
|
||||
];
|
||||
|
||||
for url in urls.iter() {
|
||||
match self.fetch_azure_ip_ranges_from_url(url).await {
|
||||
Ok(ranges) => {
|
||||
info!("{} public IP ranges are downloaded from Azure", ranges.len());
|
||||
return Ok(ranges);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to get Azure IP range from {}: {}", url, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(CloudMetadataError::MetadataUnavailable(
|
||||
"Azure public IP ranges cannot be obtained".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn fetch_azure_ip_ranges_from_url(&self, url: &str) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AzureServiceTags {
|
||||
values: Vec<AzureServiceTag>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AzureServiceTag {
|
||||
properties: AzureServiceTagProperties,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AzureServiceTagProperties {
|
||||
address_prefixes: Vec<String>,
|
||||
}
|
||||
|
||||
let response = self.client.get(url).timeout(Duration::from_secs(10)).send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let service_tags: AzureServiceTags = response.json().await?;
|
||||
|
||||
let mut ranges = Vec::new();
|
||||
for tag in service_tags.values {
|
||||
for prefix in tag.properties.address_prefixes {
|
||||
if let Ok(network) = prefix.parse::<IpNetwork>() {
|
||||
ranges.push(network);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ranges)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable(format!(
|
||||
"Azure IP range URL returns status:{}",
|
||||
response.status()
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CloudMetadataFetcher for AzureMetadataFetcher {
|
||||
fn provider_name(&self) -> &str {
|
||||
"azure"
|
||||
}
|
||||
|
||||
async fn fetch_network_cidrs(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
// Azure metadata provides network interface information
|
||||
let metadata = self.get_metadata("instance/network/interface").await?;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AzureNetworkInterface {
|
||||
ipv4: AzureIpv4Info,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AzureIpv4Info {
|
||||
subnet: Vec<AzureSubnet>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AzureSubnet {
|
||||
address: String,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
let interfaces: Vec<AzureNetworkInterface> = serde_json::from_str(&metadata)?;
|
||||
|
||||
let mut cidrs = Vec::new();
|
||||
for interface in interfaces {
|
||||
for subnet in interface.ipv4.subnet {
|
||||
let cidr = format!("{}/{}", subnet.address, subnet.prefix);
|
||||
if let Ok(network) = cidr.parse::<IpNetwork>() {
|
||||
cidrs.push(network);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cidrs.is_empty() {
|
||||
Err(CloudMetadataError::MetadataUnavailable(
|
||||
"Azure network CIDR can't be obtained".to_string(),
|
||||
))
|
||||
} else {
|
||||
info!("{} network CIDRs were obtained from Azure metadata", cidrs.len());
|
||||
Ok(cidrs)
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
self.get_azure_public_ip_ranges().await
|
||||
}
|
||||
}
|
||||
|
||||
/// GCP metadata fetcher
|
||||
pub struct GcpMetadataFetcher {
|
||||
client: Client,
|
||||
metadata_endpoint: String,
|
||||
}
|
||||
|
||||
impl GcpMetadataFetcher {
|
||||
pub fn new() -> Self {
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(2))
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::new());
|
||||
|
||||
Self {
|
||||
client,
|
||||
metadata_endpoint: "http://metadata.google.internal".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get GCP metadata
|
||||
async fn get_metadata(&self, path: &str) -> Result<String, CloudMetadataError> {
|
||||
let url = format!("{}/computeMetadata/v1/{}", self.metadata_endpoint, path);
|
||||
|
||||
let response = self.client.get(&url).header("Metadata-Flavor", "Google").send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let text = response.text().await?;
|
||||
Ok(text)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable(format!(
|
||||
"GCP metadata path {} returns status:{}",
|
||||
path,
|
||||
response.status()
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get GCP public IP ranges
|
||||
async fn get_gcp_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
let url = "https://www.gstatic.com/ipranges/cloud.json";
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct GcpIpRanges {
|
||||
prefixes: Vec<GcpPrefix>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct GcpPrefix {
|
||||
ipv4_prefix: Option<String>,
|
||||
ipv6_prefix: Option<String>,
|
||||
}
|
||||
|
||||
let response = self.client.get(url).timeout(Duration::from_secs(5)).send().await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let ip_ranges: GcpIpRanges = response.json().await?;
|
||||
|
||||
let mut ranges = Vec::new();
|
||||
for prefix in ip_ranges.prefixes {
|
||||
if let Some(ipv4_prefix) = prefix.ipv4_prefix {
|
||||
if let Ok(network) = ipv4_prefix.parse::<IpNetwork>() {
|
||||
ranges.push(network);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("{} public IP ranges were obtained from GCP officials", ranges.len());
|
||||
Ok(ranges)
|
||||
} else {
|
||||
Err(CloudMetadataError::MetadataUnavailable(format!(
|
||||
"GCP IP Range API returns status:{}",
|
||||
response.status()
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CloudMetadataFetcher for GcpMetadataFetcher {
|
||||
fn provider_name(&self) -> &str {
|
||||
"gcp"
|
||||
}
|
||||
|
||||
async fn fetch_network_cidrs(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
// Get network interface information
|
||||
let metadata = self.get_metadata("instance/network-interfaces/").await?;
|
||||
|
||||
let interface_indices: Vec<usize> = metadata
|
||||
.lines()
|
||||
.filter_map(|line| {
|
||||
let line = line.trim().trim_end_matches('/');
|
||||
if line.chars().all(|c| c.is_digit(10)) {
|
||||
line.parse().ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut cidrs = Vec::new();
|
||||
|
||||
for index in interface_indices {
|
||||
// Get the subnet range
|
||||
let subnet_path = format!("instance/network-interfaces/{}/subnetworks", index);
|
||||
match self.get_metadata(&subnet_path).await {
|
||||
Ok(_subnet_metadata) => {
|
||||
// Subnet metadata may contain CIDR information
|
||||
// Simplified processing: we get IP addresses and netmasks
|
||||
let ip_path = format!("instance/network-interfaces/{}/ip", index);
|
||||
let mask_path = format!("instance/network-interfaces/{}/subnetmask", index);
|
||||
|
||||
if let (Ok(ip), Ok(mask)) = tokio::join!(self.get_metadata(&ip_path), self.get_metadata(&mask_path)) {
|
||||
if let (Ok(ip_addr), Ok(mask_len)) = (ip.trim().parse::<Ipv4Addr>(), mask_to_prefix_length(&mask)) {
|
||||
if let Ok(network) = format!("{}/{}", ip_addr, mask_len).parse::<IpNetwork>() {
|
||||
cidrs.push(network);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to get GCP subnet information: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cidrs.is_empty() {
|
||||
Err(CloudMetadataError::MetadataUnavailable("GCP network CIDR is not available".to_string()))
|
||||
} else {
|
||||
info!("{} network CIDRs were obtained from GCP metadata", cidrs.len());
|
||||
Ok(cidrs)
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_public_ip_ranges(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
self.get_gcp_public_ip_ranges().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert the subnet mask to the prefix length
|
||||
pub fn mask_to_prefix_length(mask: &str) -> Result<u8, CloudMetadataError> {
|
||||
let mask_parts: Vec<&str> = mask.split('.').collect();
|
||||
if mask_parts.len() != 4 {
|
||||
return Err(CloudMetadataError::IpParseError(format!("Invalid subnet masks:{}", mask)));
|
||||
}
|
||||
|
||||
let mut prefix_length = 0;
|
||||
for part in mask_parts {
|
||||
let octet: u8 = part
|
||||
.parse()
|
||||
.map_err(|_| CloudMetadataError::IpParseError(format!("Invalid mask octet:{}", part)))?;
|
||||
|
||||
let mut remaining = octet;
|
||||
while remaining > 0 {
|
||||
if remaining & 0x80 == 0x80 {
|
||||
prefix_length += 1;
|
||||
remaining <<= 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if remaining != 0 {
|
||||
return Err(CloudMetadataError::IpParseError("Non-contiguous subnet masks".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(prefix_length)
|
||||
}
|
||||
|
||||
/// Universal Cloud Metadata Fetcher (Auto-Detect)
|
||||
pub struct CloudMetadataDetector {
|
||||
client: Client,
|
||||
provider: Option<CloudProvider>,
|
||||
}
|
||||
|
||||
impl CloudMetadataDetector {
|
||||
pub fn new() -> Self {
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(3))
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::new());
|
||||
|
||||
let provider = CloudProvider::detect_from_env();
|
||||
|
||||
if let Some(p) = &provider {
|
||||
info!("Cloud service provider detected:{}", p.name());
|
||||
} else {
|
||||
info!("The cloud service provider is not detected, and it may be running on-premises or in an unknown environment");
|
||||
}
|
||||
|
||||
Self { client, provider }
|
||||
}
|
||||
|
||||
/// Create a fetcher for a specific cloud service provider
|
||||
pub fn create_fetcher(&self) -> Option<Box<dyn CloudMetadataFetcher + Send + Sync>> {
|
||||
match self.provider {
|
||||
Some(CloudProvider::Aws) => Some(Box::new(AwsMetadataFetcher::new())),
|
||||
Some(CloudProvider::Azure) => Some(Box::new(AzureMetadataFetcher::new())),
|
||||
Some(CloudProvider::Gcp) => Some(Box::new(GcpMetadataFetcher::new())),
|
||||
Some(CloudProvider::DigitalOcean) => {
|
||||
// DigitalOcean has a similar implementation
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try metadata endpoints for all cloud providers
|
||||
pub async fn try_all_providers(&self) -> Result<Vec<IpNetwork>, CloudMetadataError> {
|
||||
let providers: Vec<Box<dyn CloudMetadataFetcher + Send + Sync>> = vec![
|
||||
Box::new(AwsMetadataFetcher::new()),
|
||||
Box::new(AzureMetadataFetcher::new()),
|
||||
Box::new(GcpMetadataFetcher::new()),
|
||||
];
|
||||
|
||||
for provider in providers {
|
||||
let provider_name = provider.provider_name();
|
||||
debug!("Try getting metadata from {}", provider_name);
|
||||
|
||||
match provider.fetch_trusted_proxy_ranges().await {
|
||||
Ok(ranges) => {
|
||||
if !ranges.is_empty() {
|
||||
info!("{} IP ranges are obtained from {}", provider_name, ranges.len());
|
||||
return Ok(ranges);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to get metadata from {}: {}", provider_name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(CloudMetadataError::MetadataUnavailable(
|
||||
"All cloud service provider metadata fetching fails".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Main export function - Get the IP range from the cloud service provider
|
||||
pub async fn fetch_cloud_provider_ips() -> Result<Vec<String>, CloudMetadataError> {
|
||||
let detector = CloudMetadataDetector::new();
|
||||
|
||||
let ip_ranges = if let Some(fetcher) = detector.create_fetcher() {
|
||||
// Use a detected cloud service provider
|
||||
fetcher.fetch_trusted_proxy_ranges().await?
|
||||
} else {
|
||||
// Try all cloud service providers
|
||||
detector.try_all_providers().await?
|
||||
};
|
||||
|
||||
// Convert to a list of strings
|
||||
let result: Vec<String> = ip_ranges.into_iter().map(|network| network.to_string()).collect();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Asynchronously Obtaining CSP IP Ranges (with Timeout)
|
||||
pub async fn fetch_cloud_provider_ips_with_timeout(timeout_secs: u64) -> Result<Vec<String>, CloudMetadataError> {
|
||||
tokio::time::timeout(Duration::from_secs(timeout_secs), fetch_cloud_provider_ips())
|
||||
.await
|
||||
.map_err(|_| CloudMetadataError::MetadataUnavailable("Metadata fetch timeout".to_string()))?
|
||||
}
|
||||
|
||||
/// Synchronous version (used in a synchronous context)
|
||||
pub fn fetch_cloud_provider_ips_sync(timeout_secs: u64) -> Result<Vec<String>, CloudMetadataError> {
|
||||
let runtime = tokio::runtime::Runtime::new()
|
||||
.map_err(|e| CloudMetadataError::ConfigurationError(format!("Unable to create runtime: {}", e)))?;
|
||||
|
||||
runtime.block_on(fetch_cloud_provider_ips_with_timeout(timeout_secs))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn test_mask_to_prefix_length() {
|
||||
use crate::cloud::mask_to_prefix_length;
|
||||
|
||||
assert_eq!(mask_to_prefix_length("255.255.255.0").unwrap(), 24);
|
||||
assert_eq!(mask_to_prefix_length("255.255.0.0").unwrap(), 16);
|
||||
assert_eq!(mask_to_prefix_length("255.0.0.0").unwrap(), 8);
|
||||
assert_eq!(mask_to_prefix_length("255.255.255.252").unwrap(), 30);
|
||||
|
||||
// Invalid masks should fail
|
||||
assert!(mask_to_prefix_length("255.255.255.1").is_err());
|
||||
assert!(mask_to_prefix_length("invalid").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cloud_metadata_fallback() {
|
||||
use crate::cloud::metadata::CloudMetadataDetector;
|
||||
|
||||
// In a test environment, the metadata service should not be available
|
||||
let detector = CloudMetadataDetector::new();
|
||||
|
||||
// Trying all providers should fail (unless running tests in a real cloud environment)
|
||||
let result = detector.try_all_providers().await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cloud_ip_parsing() {
|
||||
use ipnetwork::IpNetwork;
|
||||
|
||||
// Test IP range resolution
|
||||
let cidr: IpNetwork = "10.0.0.0/8".parse().unwrap();
|
||||
assert_eq!(cidr.prefix(), 8);
|
||||
|
||||
let cidr: IpNetwork = "192.168.1.0/24".parse().unwrap();
|
||||
assert_eq!(cidr.prefix(), 24);
|
||||
|
||||
// IPv6
|
||||
let cidr: IpNetwork = "2001:db8::/32".parse().unwrap();
|
||||
assert_eq!(cidr.prefix(), 32);
|
||||
}
|
||||
}
|
||||
17
crates/trusted-proxies/src/cloud/mod.rs
Normal file
17
crates/trusted-proxies/src/cloud/mod.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod metadata;
|
||||
|
||||
pub use metadata::*;
|
||||
16
crates/trusted-proxies/src/config/mod.rs
Normal file
16
crates/trusted-proxies/src/config/mod.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod proxy;
|
||||
pub use proxy::*;
|
||||
293
crates/trusted-proxies/src/config/proxy.rs
Normal file
293
crates/trusted-proxies/src/config/proxy.rs
Normal file
@@ -0,0 +1,293 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Environment variable definitions for trusted agent configurations
|
||||
//!
|
||||
//! All configuration items are read by environment variables and support the following priorities:
|
||||
//! 1. Environment Variables (Highest Priority)
|
||||
//! 2. The default value set in the code
|
||||
//! 3. Hard-coded values in the default implementation of the struct
|
||||
|
||||
use crate::TrustedProxy;
|
||||
use crate::cloud::fetch_cloud_provider_ips_sync;
|
||||
use ipnetwork::IpNetwork;
|
||||
use std::str::FromStr;
|
||||
use tracing::{debug, info, warn};
|
||||
// Environment variable key constant definition
|
||||
// Format: RUSTFS_HTTP_{SECTION}_{KEY}, all caps, separated by underscores
|
||||
|
||||
// ==================== Agent configuration ====================
|
||||
/// Agent verification mode
|
||||
pub const ENV_PROXY_VALIDATION_MODE: &str = "RUSTFS_HTTP_PROXY_VALIDATION_MODE";
|
||||
pub const DEFAULT_PROXY_VALIDATION_MODE: &str = "hop_by_hop";
|
||||
|
||||
/// Whether to enable RFC 7239 Forwarded headers
|
||||
pub const ENV_PROXY_ENABLE_RFC7239: &str = "RUSTFS_HTTP_PROXY_ENABLE_RFC7239";
|
||||
pub const DEFAULT_PROXY_ENABLE_RFC7239: bool = true;
|
||||
|
||||
/// Maximum number of proxy hops
|
||||
pub const ENV_PROXY_MAX_PROXY_HOPS: &str = "RUSTFS_HTTP_PROXY_MAX_PROXY_HOPS";
|
||||
pub const DEFAULT_PROXY_MAX_PROXY_HOPS: usize = 10;
|
||||
|
||||
/// whether chain continuity checking is enabled
|
||||
pub const ENV_PROXY_ENABLE_CHAIN_CONTINUITY_CHECK: &str = "RUSTFS_HTTP_PROXY_ENABLE_CHAIN_CONTINUITY_CHECK";
|
||||
pub const DEFAULT_PROXY_ENABLE_CHAIN_CONTINUITY_CHECK: bool = true;
|
||||
|
||||
// ==================== Trusted agent configuration ====================
|
||||
/// Underlying Trusted Agent List (Comma-Separated IP/CIDR)
|
||||
pub const ENV_TRUSTED_PROXIES: &str = "RUSTFS_HTTP_TRUSTED_PROXIES";
|
||||
pub const DEFAULT_TRUSTED_PROXIES: &str = "127.0.0.1,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fd00::/8";
|
||||
|
||||
/// Additional Trusted Agent List (production only, can be overridden)
|
||||
pub const ENV_ADDITIONAL_TRUSTED_PROXIES: &str = "RUSTFS_HTTP_ADDITIONAL_TRUSTED_PROXIES";
|
||||
pub const DEFAULT_ADDITIONAL_TRUSTED_PROXIES: &str = "";
|
||||
|
||||
/// Allowed private networks (for internal proxy authentication)
|
||||
pub const ENV_ALLOWED_PRIVATE_NETS: &str = "RUSTFS_HTTP_ALLOWED_PRIVATE_NETS";
|
||||
pub const DEFAULT_ALLOWED_PRIVATE_NETS: &str = "10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fd00::/8";
|
||||
|
||||
// ==================== Cache configuration ====================
|
||||
/// Cache capacity
|
||||
pub const ENV_CACHE_CAPACITY: &str = "RUSTFS_HTTP_CACHE_CAPACITY";
|
||||
pub const DEFAULT_CACHE_CAPACITY: usize = 10000;
|
||||
|
||||
/// Cache TTL (seconds)
|
||||
pub const ENV_CACHE_TTL_SECONDS: &str = "RUSTFS_HTTP_CACHE_TTL_SECONDS";
|
||||
pub const DEFAULT_CACHE_TTL_SECONDS: u64 = 300;
|
||||
|
||||
/// Cache Cleanup Interval (Seconds)
|
||||
pub const ENV_CACHE_CLEANUP_INTERVAL_SECONDS: &str = "RUSTFS_HTTP_CACHE_CLEANUP_INTERVAL_SECONDS";
|
||||
pub const DEFAULT_CACHE_CLEANUP_INTERVAL_SECONDS: u64 = 60;
|
||||
|
||||
// ==================== Monitor configuration ====================
|
||||
/// Whether monitoring metrics are enabled
|
||||
pub const ENV_MONITORING_ENABLE_METRICS: &str = "RUSTFS_HTTP_MONITORING_ENABLE_METRICS";
|
||||
pub const DEFAULT_MONITORING_ENABLE_METRICS: bool = true;
|
||||
|
||||
/// Log level
|
||||
pub const ENV_MONITORING_LOG_LEVEL: &str = "RUSTFS_HTTP_MONITORING_LOG_LEVEL";
|
||||
pub const DEFAULT_MONITORING_LOG_LEVEL: &str = "info";
|
||||
|
||||
/// Whether to log validation failures
|
||||
pub const ENV_MONITORING_LOG_FAILED_VALIDATIONS: &str = "RUSTFS_HTTP_MONITORING_LOG_FAILED_VALIDATIONS";
|
||||
pub const DEFAULT_MONITORING_LOG_FAILED_VALIDATIONS: bool = true;
|
||||
|
||||
/// Cloud service provider-specific IP ranges (Cloudflare, etc.)
|
||||
pub const ENV_CLOUDFLARE_IPS_ENABLED: &str = "RUSTFS_HTTP_CLOUDFLARE_IPS_ENABLED";
|
||||
pub const DEFAULT_CLOUDFLARE_IPS_ENABLED: bool = false;
|
||||
|
||||
/// Cloud metadata configuration
|
||||
pub const ENV_CLOUD_METADATA_ENABLED: &str = "RUSTFS_HTTP_CLOUD_METADATA_ENABLED";
|
||||
pub const DEFAULT_CLOUD_METADATA_ENABLED: bool = true;
|
||||
|
||||
pub const ENV_CLOUD_METADATA_TIMEOUT_SECS: &str = "RUSTFS_HTTP_CLOUD_METADATA_TIMEOUT_SECS";
|
||||
pub const DEFAULT_CLOUD_METADATA_TIMEOUT_SECS: u64 = 5;
|
||||
|
||||
pub const ENV_CLOUD_PROVIDER_FORCE: &str = "RUSTFS_HTTP_CLOUD_PROVIDER_FORCE";
|
||||
pub const DEFAULT_CLOUD_PROVIDER_FORCE: &str = ""; // Null means automatic detection
|
||||
|
||||
/// Environment variables resolve error types
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ConfigError {
|
||||
#[error("Environment variable resolution failed: {0}")]
|
||||
EnvParseError(String),
|
||||
|
||||
#[error("IP/CIDR Format Error: {0}")]
|
||||
IpFormatError(String),
|
||||
|
||||
#[error("Boolean parsing failed: {0}")]
|
||||
BoolParseError(String),
|
||||
|
||||
#[error("Numeric parsing failed: {0}")]
|
||||
NumberParseError(String),
|
||||
|
||||
#[error("Enum value parsing failed: {0}")]
|
||||
EnumParseError(String),
|
||||
}
|
||||
|
||||
/// Environment variables configure loaders
|
||||
pub struct EnvConfigLoader;
|
||||
|
||||
impl EnvConfigLoader {
|
||||
/// Get the string value from the environment variable
|
||||
pub fn get_string(key: &str, default: &str) -> String {
|
||||
std::env::var(key).unwrap_or_else(|_| default.to_string())
|
||||
}
|
||||
|
||||
/// Get Boolean values from environment variables
|
||||
pub fn get_bool(key: &str, default: bool) -> Result<bool, ConfigError> {
|
||||
let value = Self::get_string(key, if default { "true" } else { "false" });
|
||||
value
|
||||
.parse()
|
||||
.map_err(|_| ConfigError::BoolParseError(format!("{}={}", key, value)))
|
||||
}
|
||||
|
||||
/// Get an integer value from an environment variable
|
||||
pub fn get_usize(key: &str, default: usize) -> Result<usize, ConfigError> {
|
||||
let value = Self::get_string(key, &default.to_string());
|
||||
value
|
||||
.parse()
|
||||
.map_err(|e| ConfigError::NumberParseError(format!("{}={}: {}", key, value, e)))
|
||||
}
|
||||
|
||||
/// Get the u64 value from the environment variable
|
||||
pub fn get_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
|
||||
let value = Self::get_string(key, &default.to_string());
|
||||
value
|
||||
.parse()
|
||||
.map_err(|e| ConfigError::NumberParseError(format!("{}={}: {}", key, value, e)))
|
||||
}
|
||||
|
||||
/// Parsing comma-separated IP/CIDR lists from environment variables
|
||||
pub fn parse_ip_list(key: &str, default: &str) -> Result<Vec<TrustedProxy>, ConfigError> {
|
||||
let value = Self::get_string(key, default);
|
||||
if value.trim().is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let mut proxies = Vec::new();
|
||||
for item in value.split(',') {
|
||||
let item = item.trim();
|
||||
if item.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Attempt to resolve to CIDR
|
||||
if item.contains('/') {
|
||||
match IpNetwork::from_str(item) {
|
||||
Ok(network) => proxies.push(TrustedProxy::Cidr(network)),
|
||||
Err(e) => return Err(ConfigError::IpFormatError(format!("{}: {}: {}", key, item, e))),
|
||||
}
|
||||
} else {
|
||||
// Attempt to resolve to a single IP
|
||||
match item.parse() {
|
||||
Ok(ip) => proxies.push(TrustedProxy::Single(ip)),
|
||||
Err(e) => return Err(ConfigError::IpFormatError(format!("{}: {}: {}", key, item, e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(proxies)
|
||||
}
|
||||
|
||||
/// Parses comma-separated CIDR lists from environment variables
|
||||
pub fn parse_cidr_list(key: &str, default: &str) -> Result<Vec<IpNetwork>, ConfigError> {
|
||||
let value = Self::get_string(key, default);
|
||||
if value.trim().is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let mut networks = Vec::new();
|
||||
for item in value.split(',') {
|
||||
let item = item.trim();
|
||||
if item.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
match IpNetwork::from_str(item) {
|
||||
Ok(network) => networks.push(network),
|
||||
Err(e) => return Err(ConfigError::IpFormatError(format!("{}: {}: {}", key, item, e))),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(networks)
|
||||
}
|
||||
|
||||
/// Get the validation schema enumeration value
|
||||
pub fn get_validation_mode(key: &str, default: &str) -> Result<crate::advanced::ValidationMode, ConfigError> {
|
||||
let value = Self::get_string(key, default);
|
||||
match value.to_lowercase().as_str() {
|
||||
"lenient" => Ok(crate::advanced::ValidationMode::Lenient),
|
||||
"strict" => Ok(crate::advanced::ValidationMode::Strict),
|
||||
"hop_by_hop" => Ok(crate::advanced::ValidationMode::HopByHop),
|
||||
_ => Err(ConfigError::EnumParseError(format!(
|
||||
"{}: Must be 'lenient', 'strict' or 'hop_by_hop'",
|
||||
value
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the log level
|
||||
pub fn get_log_level(key: &str, default: &str) -> String {
|
||||
let value = Self::get_string(key, default).to_lowercase();
|
||||
match value.as_str() {
|
||||
"trace" | "debug" | "info" | "warn" | "error" => value,
|
||||
_ => default.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the cloud metadata IP range
|
||||
pub fn fetch_cloud_metadata_ips() -> Result<Vec<String>, ConfigError> {
|
||||
// Check if cloud metadata is enabled
|
||||
let enabled = Self::get_bool(ENV_CLOUD_METADATA_ENABLED, DEFAULT_CLOUD_METADATA_ENABLED)?;
|
||||
|
||||
if !enabled {
|
||||
debug!("Cloud metadata fetching is disabled");
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let timeout_secs = Self::get_u64(ENV_CLOUD_METADATA_TIMEOUT_SECS, DEFAULT_CLOUD_METADATA_TIMEOUT_SECS)?;
|
||||
|
||||
// If there is a mandatory designation of a cloud service provider, set the environment variable
|
||||
let forced_provider = Self::get_string(ENV_CLOUD_PROVIDER_FORCE, DEFAULT_CLOUD_PROVIDER_FORCE);
|
||||
if !forced_provider.is_empty() {
|
||||
// std::env::set_var("CLOUD_PROVIDER_FORCE", forced_provider);
|
||||
}
|
||||
|
||||
match fetch_cloud_provider_ips_sync(timeout_secs) {
|
||||
Ok(ips) => {
|
||||
info!("{} IP ranges were obtained from cloud metadata", ips.len());
|
||||
if !ips.is_empty() {
|
||||
debug!("Cloud IP range: {:?}", ips);
|
||||
}
|
||||
Ok(ips)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Cloud metadata fetch failed: {}", e);
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cloud service provider IP range configuration
|
||||
pub struct CloudProviderIps {
|
||||
/// Cloudflare IP range
|
||||
pub cloudflare: Vec<IpNetwork>,
|
||||
}
|
||||
|
||||
impl CloudProviderIps {
|
||||
/// Get the Cloudflare IP range
|
||||
pub fn cloudflare_ranges() -> Vec<IpNetwork> {
|
||||
let ranges = vec![
|
||||
"103.21.244.0/22",
|
||||
"103.22.200.0/22",
|
||||
"103.31.4.0/22",
|
||||
"104.16.0.0/13",
|
||||
"104.24.0.0/14",
|
||||
"108.162.192.0/18",
|
||||
"131.0.72.0/22",
|
||||
"141.101.64.0/18",
|
||||
"162.158.0.0/15",
|
||||
"172.64.0.0/13",
|
||||
"173.245.48.0/20",
|
||||
"188.114.96.0/20",
|
||||
"190.93.240.0/20",
|
||||
"197.234.240.0/22",
|
||||
"198.41.128.0/17",
|
||||
];
|
||||
|
||||
ranges.into_iter().filter_map(|s| IpNetwork::from_str(s).ok()).collect()
|
||||
}
|
||||
}
|
||||
125
crates/trusted-proxies/src/globals.rs
Normal file
125
crates/trusted-proxies/src/globals.rs
Normal file
@@ -0,0 +1,125 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{DEFAULT_TRUSTED_PROXIES, ENV_TRUSTED_PROXIES, TrustedProxiesConfig, parse_proxy_list};
|
||||
use std::sync::LazyLock;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::error;
|
||||
|
||||
static TRUSTED_PROXIES_CONFIG: LazyLock<RwLock<TrustedProxiesConfig>> =
|
||||
LazyLock::new(|| RwLock::new(TrustedProxiesConfig::default()));
|
||||
|
||||
static TRUSTED_PROXIES_CONFIG_ENABLE: LazyLock<RwLock<bool>> = LazyLock::new(|| RwLock::new(false));
|
||||
|
||||
/// Check if the trusted proxies configuration is enabled
|
||||
///
|
||||
/// # Returns
|
||||
/// A boolean indicating whether the trusted proxies configuration is enabled
|
||||
pub async fn is_trusted_proxies_config_enabled() -> bool {
|
||||
let guard = TRUSTED_PROXIES_CONFIG_ENABLE.read().await;
|
||||
*guard
|
||||
}
|
||||
|
||||
/// Enable or disable the trusted proxies configuration
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `enabled` - A boolean indicating whether to enable (true) or disable (false) the trusted proxies configuration
|
||||
pub async fn set_trusted_proxies_config_enabled(enabled: bool) {
|
||||
let mut guard = TRUSTED_PROXIES_CONFIG_ENABLE.write().await;
|
||||
*guard = enabled;
|
||||
}
|
||||
|
||||
/// Get the global trusted proxies configuration
|
||||
///
|
||||
/// # Returns
|
||||
/// An Option containing the TrustedProxiesConfig if set, or None if not set
|
||||
pub async fn get_trusted_proxies_config() -> TrustedProxiesConfig {
|
||||
let guard = TRUSTED_PROXIES_CONFIG.read().await;
|
||||
guard.clone()
|
||||
}
|
||||
|
||||
/// Set the global trusted proxies configuration
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `config` - The TrustedProxiesConfig to set as the global configuration
|
||||
pub async fn set_trusted_proxies_config(config: TrustedProxiesConfig) {
|
||||
let mut guard = TRUSTED_PROXIES_CONFIG.write().await;
|
||||
*guard = config;
|
||||
}
|
||||
|
||||
/// Initialize the trusted proxies configuration from environment variables or config file path
|
||||
/// If both are provided, environment variables take precedence over the config file
|
||||
/// Returns an error if loading from both sources fails
|
||||
///
|
||||
/// # Returns
|
||||
/// Ok(()) if the configuration is successfully loaded and set
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if loading from both environment variables and config file fails
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use rustfs::proxy::config_loader::initialize;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// if let Err(e) = initialize().await {
|
||||
/// eprintln!("Failed to initialize trusted proxies configuration: {:?}", e);
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn initialize() -> anyhow::Result<()> {
|
||||
match from_env() {
|
||||
Ok(env_config) => {
|
||||
set_trusted_proxies_config(env_config).await;
|
||||
set_trusted_proxies_config_enabled(true).await;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to load trusted proxies configuration from environment variables, falling back to config file if provided. error: {:?}",
|
||||
e
|
||||
);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Load configurations from environment variables
|
||||
pub fn from_env() -> anyhow::Result<TrustedProxiesConfig> {
|
||||
// Read environment variables, e.g.: TRUSTED_PROXIES="127.0.0.1,10.0.0.0/8,::1"
|
||||
let proxies_str = rustfs_utils::get_env_str(ENV_TRUSTED_PROXIES, DEFAULT_TRUSTED_PROXIES);
|
||||
|
||||
let proxy_list = parse_proxy_list(&proxies_str);
|
||||
|
||||
TrustedProxiesConfig::from_strs(&proxy_list)
|
||||
}
|
||||
|
||||
/// Loading from the configuration file
|
||||
pub fn from_config_file(path: &str) -> anyhow::Result<TrustedProxiesConfig> {
|
||||
use serde_json;
|
||||
use std::fs;
|
||||
|
||||
let content = fs::read_to_string(path)?;
|
||||
let config: serde_json::Value = serde_json::from_str(&content)?;
|
||||
|
||||
let proxies = config["rustfs_http_trusted_proxies"]
|
||||
.as_array()
|
||||
.ok_or_else(|| anyhow::anyhow!("The trusted_proxies array is missing from the configuration file"))?
|
||||
.iter()
|
||||
.filter_map(|v| v.as_str())
|
||||
.collect::<Vec<&str>>();
|
||||
|
||||
TrustedProxiesConfig::from_strs(&proxies)
|
||||
}
|
||||
32
crates/trusted-proxies/src/lib.rs
Normal file
32
crates/trusted-proxies/src/lib.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod advanced;
|
||||
mod cache;
|
||||
mod cloud;
|
||||
mod config;
|
||||
mod globals;
|
||||
mod metrics;
|
||||
mod middleware;
|
||||
mod processor;
|
||||
mod proxy;
|
||||
|
||||
pub use advanced::*;
|
||||
pub use cache::*;
|
||||
pub(crate) use cloud::*;
|
||||
pub use config::*;
|
||||
pub use globals::*;
|
||||
pub use middleware::*;
|
||||
pub use processor::*;
|
||||
pub use proxy::*;
|
||||
118
crates/trusted-proxies/src/metrics.rs
Normal file
118
crates/trusted-proxies/src/metrics.rs
Normal file
@@ -0,0 +1,118 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{MultiProxyProcessor, ProxyChainAnalysis, ProxyError};
|
||||
use metrics::{Counter, Gauge, Histogram, counter, gauge, histogram};
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Add cloud metadata monitoring
|
||||
pub struct CloudMetadataMetrics {
|
||||
fetch_success: Counter,
|
||||
fetch_failure: Counter,
|
||||
fetch_duration: Histogram,
|
||||
ip_ranges_count: Gauge,
|
||||
}
|
||||
|
||||
impl CloudMetadataMetrics {
|
||||
pub fn record_fetch(&self, success: bool, duration: Duration, count: usize) {
|
||||
if success {
|
||||
self.fetch_success.increment(1);
|
||||
self.ip_ranges_count.set(count as f64);
|
||||
} else {
|
||||
self.fetch_failure.increment(1);
|
||||
}
|
||||
self.fetch_duration.record(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
/// Agents process monitoring metrics
|
||||
pub struct ProxyMetrics {
|
||||
/// The total number of requests processed
|
||||
pub total_requests: Counter,
|
||||
/// Number of requests from trusted agents
|
||||
pub trusted_proxy_requests: Counter,
|
||||
/// Number of requests from untrusted sources
|
||||
pub untrusted_requests: Counter,
|
||||
/// The number of requests that failed to validate
|
||||
pub validation_failed: Counter,
|
||||
/// Proxy chain length distribution
|
||||
pub chain_length: Histogram,
|
||||
/// Validation is time-consuming
|
||||
pub validation_duration: Histogram,
|
||||
/// Cache hit rate
|
||||
pub cache_hit_ratio: Gauge,
|
||||
}
|
||||
|
||||
impl ProxyMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
total_requests: counter!("proxy.total_requests", "The total number of requests processed"),
|
||||
trusted_proxy_requests: counter!("proxy.trusted_requests", "Requests from trusted agents"),
|
||||
untrusted_requests: counter!("proxy.untrusted_requests", "Requests from untrusted sources"),
|
||||
validation_failed: counter!("proxy.validation_failed", "Validate failed requests"),
|
||||
chain_length: histogram!("proxy.chain_length", "Proxy chain length distribution"),
|
||||
validation_duration: histogram!("proxy.validation_duration_ms", "Validation Time (ms)"),
|
||||
cache_hit_ratio: gauge!("proxy.cache_hit_ratio", "Cache hit rate"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_request(&self, analysis: &ProxyChainAnalysis, duration_ms: f64) {
|
||||
self.total_requests.increment(1);
|
||||
self.chain_length.record(analysis.full_proxy_chain.len() as f64);
|
||||
self.validation_duration.record(duration_ms);
|
||||
|
||||
if analysis.validated_hops > 0 {
|
||||
self.trusted_proxy_requests.increment(1);
|
||||
} else {
|
||||
self.untrusted_requests.increment(1);
|
||||
}
|
||||
|
||||
if !analysis.security_warnings.is_empty() {
|
||||
self.validation_failed.increment(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Add monitoring in the MultiProxyProcessor
|
||||
pub struct MonitoredMultiProxyProcessor {
|
||||
processor: MultiProxyProcessor,
|
||||
metrics: ProxyMetrics,
|
||||
}
|
||||
|
||||
impl MonitoredMultiProxyProcessor {
|
||||
pub fn process_request_with_metrics(
|
||||
&self,
|
||||
peer_addr: &SocketAddr,
|
||||
headers: &http::HeaderMap,
|
||||
) -> Result<ProxyChainAnalysis, ProxyError> {
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
let result = self.processor.process_request(peer_addr, headers);
|
||||
|
||||
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
|
||||
|
||||
match &result {
|
||||
Ok(analysis) => {
|
||||
self.metrics.record_request(analysis, duration_ms);
|
||||
}
|
||||
Err(_) => {
|
||||
self.metrics.validation_failed.increment(1);
|
||||
self.metrics.total_requests.increment(1);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
298
crates/trusted-proxies/src/middleware.rs
Normal file
298
crates/trusted-proxies/src/middleware.rs
Normal file
@@ -0,0 +1,298 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{ClientInfo, MultiProxyConfig, MultiProxyProcessor, TrustedProxiesConfig};
|
||||
use axum::extract::Request;
|
||||
use axum::response::Response;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::task::{Context, Poll};
|
||||
use tower::{Layer, Service};
|
||||
use tracing::{info_span, instrument};
|
||||
|
||||
/// Enhanced client information structure
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EnhancedClientInfo {
|
||||
pub real_ip: IpAddr,
|
||||
pub forwarded_host: Option<String>,
|
||||
pub forwarded_proto: Option<String>,
|
||||
pub is_from_trusted_proxy: bool,
|
||||
pub proxy_ip: Option<IpAddr>,
|
||||
pub proxy_chain_analysis: Option<crate::ProxyChainAnalysis>,
|
||||
}
|
||||
|
||||
/// Trusted agent middleware layer
|
||||
#[derive(Clone)]
|
||||
pub struct TrustedProxiesLayer {
|
||||
config: TrustedProxiesConfig,
|
||||
}
|
||||
|
||||
impl TrustedProxiesLayer {
|
||||
pub fn new(config: TrustedProxiesConfig) -> Self {
|
||||
Self { config }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for TrustedProxiesLayer {
|
||||
type Service = TrustedProxiesMiddleware<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> Self::Service {
|
||||
TrustedProxiesMiddleware {
|
||||
inner,
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trusted agent middleware service
|
||||
#[derive(Clone)]
|
||||
pub struct TrustedProxiesMiddleware<S> {
|
||||
inner: S,
|
||||
config: TrustedProxiesConfig,
|
||||
}
|
||||
|
||||
impl<S> Service<Request> for TrustedProxiesMiddleware<S>
|
||||
where
|
||||
S: Service<Request, Response = Response> + Clone + Send + 'static,
|
||||
S::Future: Send,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
/// Updated the call method of TrustedProxiesMiddleware
|
||||
#[instrument(name = "trusted_proxy_middleware", skip_all, fields(peer_addr, trusted))]
|
||||
fn call(&mut self, mut req: Request) -> Self::Future {
|
||||
let span = info_span!(
|
||||
"trusted_proxy_check",
|
||||
peer_addr = ?req.extensions().get::<SocketAddr>().map(|a| a.to_string())
|
||||
);
|
||||
|
||||
let _guard = span.enter();
|
||||
|
||||
// Use advanced processors
|
||||
let processor = MultiProxyProcessor::new(self.config.clone(), Some(MultiProxyConfig::default()));
|
||||
|
||||
let peer_addr = req.extensions().get::<SocketAddr>().copied();
|
||||
let headers = req.headers();
|
||||
|
||||
match peer_addr {
|
||||
Some(addr) => {
|
||||
match processor.process_request(&addr, headers) {
|
||||
Ok(analysis) => {
|
||||
tracing::debug!(
|
||||
"Proxy chain analysis completed: client_ip={}, hops={}, integrity={}",
|
||||
analysis.trusted_client_ip,
|
||||
analysis.validated_hops,
|
||||
analysis.chain_integrity
|
||||
);
|
||||
|
||||
// 创建增强的客户端信息
|
||||
let client_info = EnhancedClientInfo {
|
||||
real_ip: analysis.trusted_client_ip,
|
||||
forwarded_host: headers
|
||||
.get("x-forwarded-host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(String::from),
|
||||
forwarded_proto: headers
|
||||
.get("x-forwarded-proto")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(String::from),
|
||||
is_from_trusted_proxy: analysis.validated_hops > 0,
|
||||
proxy_ip: Some(addr.ip()),
|
||||
proxy_chain_analysis: Some(analysis),
|
||||
};
|
||||
|
||||
req.extensions_mut().insert(client_info.clone());
|
||||
|
||||
// Record safety warnings
|
||||
if !client_info
|
||||
.proxy_chain_analysis
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.security_warnings
|
||||
.is_empty()
|
||||
{
|
||||
tracing::warn!(
|
||||
"Proxy Chain Security Warning: {:?}",
|
||||
client_info.proxy_chain_analysis.as_ref().unwrap().security_warnings
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!("Proxy chain validation failed: {}", err);
|
||||
|
||||
// Fallback to basic processing when validation fails
|
||||
let client_info = if self.config.is_trusted(&addr) {
|
||||
extract_client_info_from_headers(&req)
|
||||
} else {
|
||||
ClientInfo::direct(addr)
|
||||
};
|
||||
|
||||
req.extensions_mut().insert(client_info);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("The peer address cannot be obtained");
|
||||
let client_info = ClientInfo {
|
||||
real_ip: std::net::Ipv4Addr::UNSPECIFIED.into(),
|
||||
forwarded_host: None,
|
||||
forwarded_proto: None,
|
||||
is_from_trusted_proxy: false,
|
||||
proxy_ip: None,
|
||||
};
|
||||
req.extensions_mut().insert(client_info);
|
||||
}
|
||||
}
|
||||
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle client-side information extraction logic
|
||||
fn process_client_info(peer_addr: Option<SocketAddr>, config: &TrustedProxiesConfig, req: &Request) -> ClientInfo {
|
||||
match peer_addr {
|
||||
Some(addr) => {
|
||||
if config.is_trusted(&addr) {
|
||||
// From Trusted Agent: Parse the forwarding header
|
||||
extract_from_trusted_proxy(&addr, req)
|
||||
} else {
|
||||
// From an untrusted proxy or direct connection: Use the connection address
|
||||
tracing::debug!(
|
||||
"Request from untrusted proxy or direct connection: {}, ignore x-forwarded-*header",
|
||||
addr.ip()
|
||||
);
|
||||
ClientInfo::direct(addr)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Unable to get peer address (shouldn't happen in theory)
|
||||
tracing::warn!("Unable to get the peer address of the request");
|
||||
ClientInfo::direct(SocketAddr::from(([0, 0, 0, 0], 0)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract client information from HTTP headers
|
||||
fn extract_client_info_from_headers(req: &Request) -> ClientInfo {
|
||||
let headers = req.headers();
|
||||
|
||||
// Parsing X-Forwarded-For: Take the first IP (original client)
|
||||
let real_ip = headers
|
||||
.get("x-forwarded-for")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|s| s.split(',').next())
|
||||
.and_then(|s| s.trim().parse().ok())
|
||||
.unwrap_or_else(|| {
|
||||
// Fallback to X-Real-IP or Connector IP
|
||||
headers
|
||||
.get("x-real-ip")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(std::net::Ipv4Addr::UNSPECIFIED.into())
|
||||
});
|
||||
|
||||
let forwarded_host = headers
|
||||
.get("x-forwarded-host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
let forwarded_proto = headers
|
||||
.get("x-forwarded-proto")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
ClientInfo {
|
||||
real_ip,
|
||||
forwarded_host,
|
||||
forwarded_proto,
|
||||
is_from_trusted_proxy: false,
|
||||
proxy_ip: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract client information from requests from trusted agents
|
||||
fn extract_from_trusted_proxy(proxy_addr: &SocketAddr, req: &Request) -> ClientInfo {
|
||||
let headers = req.headers();
|
||||
let proxy_ip = proxy_addr.ip();
|
||||
|
||||
// Resolve X-Forwarded-For Link
|
||||
let real_ip = headers
|
||||
.get("x-forwarded-for")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|xff| {
|
||||
// Handle possible IP chains: client, proxy1, proxy2
|
||||
parse_x_forwarded_for(xff, proxy_ip)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
// Fall back to X-Real-IP or use a proxy IP
|
||||
headers
|
||||
.get("x-real-ip")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(proxy_ip) // Finally retreated
|
||||
});
|
||||
|
||||
// Extract other forwarding heads
|
||||
let forwarded_host = headers
|
||||
.get("x-forwarded-host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
let forwarded_proto = headers
|
||||
.get("x-forwarded-proto")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(String::from);
|
||||
|
||||
tracing::debug!(
|
||||
"Extract client information from trusted proxy {}: real_ip={}, host={:?}, proto={:?}",
|
||||
proxy_ip,
|
||||
real_ip,
|
||||
forwarded_host,
|
||||
forwarded_proto
|
||||
);
|
||||
|
||||
ClientInfo::from_trusted_proxy(real_ip, forwarded_host, forwarded_proto, proxy_ip)
|
||||
}
|
||||
|
||||
/// Parse a comma-separated list of proxies/IPs
|
||||
/// Processing format: "ip1, ip2, ip3"
|
||||
///
|
||||
/// #Arguments
|
||||
/// * `proxies_str` - A string slice containing the comma-separated list of proxies/IPs
|
||||
///
|
||||
/// #Returns
|
||||
/// A vector of string slices representing the individual proxies/IPs
|
||||
pub fn parse_proxy_list(proxies_str: &str) -> Vec<&str> {
|
||||
proxies_str.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect()
|
||||
}
|
||||
|
||||
/// Securely parses X-Forwarded-For heads
|
||||
/// Processing format: "client, proxy1, proxy2" or "client"
|
||||
fn parse_x_forwarded_for(xff: &str, _current_proxy_ip: IpAddr) -> Option<IpAddr> {
|
||||
// Split and clean up IP addresses
|
||||
let ips: Vec<&str> = parse_proxy_list(xff);
|
||||
|
||||
if ips.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Take the first IP (original client)
|
||||
// Note: In a production environment, more complex logic may be required to handle multi-layer proxy chains
|
||||
ips.first().and_then(|ip_str| ip_str.parse().ok())
|
||||
}
|
||||
709
crates/trusted-proxies/src/processor.rs
Normal file
709
crates/trusted-proxies/src/processor.rs
Normal file
@@ -0,0 +1,709 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{
|
||||
CidrMatcher, ForwardedHeader, IpValidationCache, MultiProxyConfig, ProxyChainAnalysis, ProxyError, TrustedProxiesConfig,
|
||||
TrustedProxy, ValidationMode,
|
||||
};
|
||||
use ipnetwork::IpNetwork;
|
||||
use std::collections::HashSet;
|
||||
use std::mem::take;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
/// Securely handle X-Forwarded-For headers for multi-layer agents
|
||||
pub struct ProxyChainProcessor {
|
||||
config: TrustedProxiesConfig,
|
||||
}
|
||||
|
||||
impl ProxyChainProcessor {
|
||||
pub fn new(config: TrustedProxiesConfig) -> Self {
|
||||
Self { config }
|
||||
}
|
||||
|
||||
/// Parse the X-Forwarded-For chain to find the rightmost untrusted IP
|
||||
/// Principle: Traverse from right to left to find the first IP that is not on the trusted list
|
||||
pub fn extract_client_ip_from_chain(&self, x_forwarded_for: &str, current_proxy_ip: IpAddr) -> Option<IpAddr> {
|
||||
// Split the IP chain
|
||||
let ip_chain: Vec<&str> = x_forwarded_for
|
||||
.split(',')
|
||||
.map(|s| s.trim())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
if ip_chain.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Build a complete chain: client, proxy1, proxy2, ..., current_proxy
|
||||
let mut full_chain: Vec<IpAddr> = ip_chain.iter().filter_map(|s| s.parse().ok()).collect();
|
||||
full_chain.push(current_proxy_ip);
|
||||
|
||||
// Find from right to left (starting with the agent closest to us)
|
||||
// Find the first untrusted IP
|
||||
for ip in full_chain.iter().rev() {
|
||||
if !self.is_ip_trusted(ip) {
|
||||
return Some(*ip);
|
||||
}
|
||||
}
|
||||
|
||||
// If all IPs are trusted, return the first IP of the original chain
|
||||
full_chain.first().copied()
|
||||
}
|
||||
|
||||
/// Check if a single IP is trustworthy
|
||||
fn is_ip_trusted(&self, ip: &IpAddr) -> bool {
|
||||
// Simplifying the implementation here, you should actually use the full SocketAddr
|
||||
// Port information needs to be considered in a production environment
|
||||
use std::net::SocketAddr;
|
||||
let dummy_port = 0;
|
||||
let addr = SocketAddr::new(*ip, dummy_port);
|
||||
self.config.is_trusted(&addr)
|
||||
}
|
||||
}
|
||||
|
||||
/// Multi-layer proxy processor core implementation
|
||||
pub struct MultiProxyProcessor {
|
||||
/// Basic trusted agent configuration
|
||||
base_config: TrustedProxiesConfig,
|
||||
/// Advanced configuration
|
||||
advanced_config: MultiProxyConfig,
|
||||
/// Known collection of trusted agents (cache, accelerated lookup)
|
||||
trusted_ips_cache: HashSet<IpAddr>,
|
||||
/// A collection of known private networks
|
||||
private_nets_cache: Vec<IpNetwork>,
|
||||
}
|
||||
|
||||
impl MultiProxyProcessor {
|
||||
/// Create a new processor
|
||||
pub fn new(base_config: TrustedProxiesConfig, advanced_config: Option<MultiProxyConfig>) -> Self {
|
||||
let config = advanced_config.unwrap_or_default();
|
||||
|
||||
// Build IP caches to improve performance
|
||||
let mut trusted_ips_cache = HashSet::new();
|
||||
for proxy in &base_config.proxies {
|
||||
match proxy {
|
||||
TrustedProxy::Single(ip) => {
|
||||
trusted_ips_cache.insert(*ip);
|
||||
}
|
||||
TrustedProxy::Cidr(network) => {
|
||||
// For large networks, we only cache the network itself, runtime checks
|
||||
// Here we cache the prefix of the network
|
||||
if network.prefix() >= 24 {
|
||||
// For small networks (/24 and above), we can cache all IPs
|
||||
// But for simplicity, only the network representation is cached here
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
base_config,
|
||||
advanced_config: config.clone(),
|
||||
trusted_ips_cache,
|
||||
private_nets_cache: config.allowed_private_nets.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Main processing function: Extracts and verifies the client IP from the request
|
||||
pub fn process_request(&self, peer_addr: &SocketAddr, headers: &http::HeaderMap) -> Result<ProxyChainAnalysis, ProxyError> {
|
||||
debug!("Start processing proxy request, peer address: {}", peer_addr);
|
||||
|
||||
// 1. Check if it's from a trusted agent
|
||||
if !self.base_config.is_trusted(peer_addr) {
|
||||
warn!("Request from Untrusted Agent: {}", peer_addr);
|
||||
return self.handle_untrusted_source(peer_addr);
|
||||
}
|
||||
|
||||
// 2. Try multiple head resolution strategies
|
||||
let analysis = self.analyze_proxy_chain(peer_addr.ip(), headers)?;
|
||||
|
||||
// 3. Perform security checks
|
||||
self.perform_security_checks(&analysis)?;
|
||||
|
||||
Ok(analysis)
|
||||
}
|
||||
|
||||
/// Analyze the core approach of proxy chains
|
||||
fn analyze_proxy_chain(&self, current_proxy_ip: IpAddr, headers: &http::HeaderMap) -> Result<ProxyChainAnalysis, ProxyError> {
|
||||
// Collect all available proxy chain information
|
||||
let mut proxy_chains = Vec::new();
|
||||
let mut security_warnings = Vec::new();
|
||||
|
||||
// Strategy 1: Prioritize the use of RFC 7239 Forwarded headers
|
||||
if self.advanced_config.enable_rfc7239 {
|
||||
if let Some(forwarded) = Self::parse_forwarded_header(headers) {
|
||||
if let Some(rfc_chain) = Self::extract_chain_from_forwarded(&forwarded) {
|
||||
proxy_chains.push(("rfc7239", rfc_chain));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Strategy 2: Use traditional X-Forwarded-For heads
|
||||
if let Some(xff_chain) = Self::parse_x_forwarded_for(headers) {
|
||||
proxy_chains.push(("xff", xff_chain));
|
||||
}
|
||||
|
||||
// Strategy 3: Use X-Real-IP as a backup
|
||||
if let Some(real_ip) = Self::parse_x_real_ip(headers) {
|
||||
proxy_chains.push(("x-real-ip", vec![real_ip]));
|
||||
}
|
||||
|
||||
// If no proxy information is found
|
||||
if proxy_chains.is_empty() {
|
||||
debug!("No proxy header found, using direct connection IP");
|
||||
return Ok(ProxyChainAnalysis {
|
||||
trusted_client_ip: current_proxy_ip,
|
||||
full_proxy_chain: vec![current_proxy_ip],
|
||||
trusted_proxy_chain: vec![current_proxy_ip],
|
||||
validated_hops: 0,
|
||||
chain_integrity: true,
|
||||
validation_mode_used: ValidationMode::Lenient,
|
||||
security_warnings: vec!["Agent header not used, possibly direct connection".to_string()],
|
||||
});
|
||||
}
|
||||
|
||||
// Choose the most reliable proxy chain (RFC 7239 preferred)
|
||||
let (source, mut full_chain) = proxy_chains
|
||||
.into_iter()
|
||||
.max_by_key(|(source, _)| match *source {
|
||||
"rfc7239" => 3,
|
||||
"xff" => 2,
|
||||
"x-real-ip" => 1,
|
||||
_ => 0,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
debug!("Use proxy chain source: {}, chain: {:?}", source, full_chain);
|
||||
|
||||
// Add the current proxy IP to the end of the chain
|
||||
full_chain.push(current_proxy_ip);
|
||||
|
||||
// Handle the proxy chain according to the configured validation mode
|
||||
let (trusted_client_ip, trusted_proxy_chain, validated_hops) = match self.advanced_config.validation_mode {
|
||||
ValidationMode::Lenient => self.validate_lenient(&full_chain),
|
||||
ValidationMode::Strict => self.validate_strict(&full_chain)?,
|
||||
ValidationMode::HopByHop => self.validate_hop_by_hop(&full_chain)?,
|
||||
};
|
||||
|
||||
// Check for chain continuity
|
||||
let chain_integrity = if self.advanced_config.enable_chain_continuity_check {
|
||||
self.check_chain_continuity(&full_chain, &trusted_proxy_chain)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
// Collect safety warnings
|
||||
if !chain_integrity {
|
||||
security_warnings.push("Proxy chain continuity check failed".to_string());
|
||||
}
|
||||
|
||||
if full_chain.len() > self.advanced_config.max_proxy_hops {
|
||||
security_warnings.push(format!(
|
||||
"Proxy chain length exceeds the limit:{}/{}",
|
||||
full_chain.len(),
|
||||
self.advanced_config.max_proxy_hops
|
||||
));
|
||||
}
|
||||
|
||||
Ok(ProxyChainAnalysis {
|
||||
trusted_client_ip,
|
||||
full_proxy_chain: full_chain,
|
||||
trusted_proxy_chain,
|
||||
validated_hops,
|
||||
chain_integrity,
|
||||
validation_mode_used: self.advanced_config.validation_mode,
|
||||
security_warnings,
|
||||
})
|
||||
}
|
||||
|
||||
/// Permissive validation mode: As long as the last agent is trusted, the entire chain is accepted
|
||||
fn validate_lenient(&self, chain: &[IpAddr]) -> (IpAddr, Vec<IpAddr>, usize) {
|
||||
if chain.is_empty() {
|
||||
return (IpAddr::from([0, 0, 0, 0]), vec![], 0);
|
||||
}
|
||||
|
||||
// Take the first IP in the chain as the client IP
|
||||
let client_ip = chain[0];
|
||||
|
||||
// Verify that the last agent is trustworthy
|
||||
let last_proxy = chain.last().unwrap();
|
||||
let is_last_trusted = self.is_ip_trusted(last_proxy);
|
||||
|
||||
if is_last_trusted {
|
||||
(client_ip, chain.iter().copied().collect(), chain.len())
|
||||
} else {
|
||||
// If the last proxy is not trusted, use the IP of the last proxy
|
||||
(*last_proxy, vec![*last_proxy], 0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Strict Verification Model: All agents in the chain are required to be trustworthy
|
||||
fn validate_strict(&self, chain: &[IpAddr]) -> Result<(IpAddr, Vec<IpAddr>, usize), ProxyError> {
|
||||
if chain.is_empty() {
|
||||
return Ok((IpAddr::from([0, 0, 0, 0]), vec![], 0));
|
||||
}
|
||||
|
||||
// Check if each agent is trustworthy
|
||||
for (i, ip) in chain.iter().enumerate() {
|
||||
if !self.is_ip_trusted(ip) {
|
||||
return Err(ProxyError::ChainValidationFailed(format!(
|
||||
"The {} agent ({}) in the chain is not trustworthy",
|
||||
i + 1,
|
||||
ip
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
chain[0], // The first IP is the client
|
||||
chain.to_vec(),
|
||||
chain.len(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Hop Validation Mode: Find the first untrusted agent from right to left
|
||||
fn validate_hop_by_hop(&self, chain: &[IpAddr]) -> Result<(IpAddr, Vec<IpAddr>, usize), ProxyError> {
|
||||
if chain.is_empty() {
|
||||
return Ok((IpAddr::from([0, 0, 0, 0]), vec![], 0));
|
||||
}
|
||||
|
||||
let mut trusted_chain = Vec::new();
|
||||
let mut validated_hops = 0;
|
||||
|
||||
// Traversing from right to left (starting with the agent closest to us)
|
||||
for ip in chain.iter().rev() {
|
||||
if self.is_ip_trusted(ip) {
|
||||
trusted_chain.insert(0, *ip);
|
||||
validated_hops += 1;
|
||||
} else {
|
||||
// Find the first untrusted agent and stop traversing
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if trusted_chain.is_empty() {
|
||||
// No trusted proxy, using the last IP of the chain
|
||||
let last_ip = *chain.last().unwrap();
|
||||
Ok((last_ip, vec![last_ip], 0))
|
||||
} else {
|
||||
// The client IP is the one that preceded the first IP of the trusted chain
|
||||
// Or if the entire chain is trustworthy, it is the first IP of the original chain
|
||||
let client_ip_index = chain.len().saturating_sub(trusted_chain.len());
|
||||
let client_ip = if client_ip_index > 0 {
|
||||
chain[client_ip_index - 1]
|
||||
} else {
|
||||
chain[0]
|
||||
};
|
||||
|
||||
Ok((client_ip, trusted_chain, validated_hops))
|
||||
}
|
||||
}
|
||||
|
||||
/// Check the continuity of the proxy chain
|
||||
/// Validation rules: The path from the client to the server should be continuous
|
||||
fn check_chain_continuity(&self, full_chain: &[IpAddr], trusted_chain: &[IpAddr]) -> bool {
|
||||
if full_chain.len() <= 1 || trusted_chain.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Verify that the trusted chain is indeed the tail continuous part of the complete chain
|
||||
let expected_tail = &full_chain[full_chain.len() - trusted_chain.len()..];
|
||||
expected_tail == trusted_chain
|
||||
}
|
||||
|
||||
/// Handle requests from untrusted sources
|
||||
fn handle_untrusted_source(&self, peer_addr: &SocketAddr) -> Result<ProxyChainAnalysis, ProxyError> {
|
||||
let ip = peer_addr.ip();
|
||||
|
||||
// Check if it's a private address (it could be an internal service call)
|
||||
let is_private = self.private_nets_cache.iter().any(|network| network.contains(ip));
|
||||
|
||||
let warnings = if is_private {
|
||||
vec!["From an internal network but not configured as a trusted agent".to_string()]
|
||||
} else {
|
||||
vec!["From an untrusted public address".to_string()]
|
||||
};
|
||||
|
||||
Ok(ProxyChainAnalysis {
|
||||
trusted_client_ip: ip,
|
||||
full_proxy_chain: vec![ip],
|
||||
trusted_proxy_chain: vec![ip],
|
||||
validated_hops: 0,
|
||||
chain_integrity: true,
|
||||
validation_mode_used: ValidationMode::Lenient,
|
||||
security_warnings: warnings,
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform security checks
|
||||
fn perform_security_checks(&self, analysis: &ProxyChainAnalysis) -> Result<(), ProxyError> {
|
||||
// Check the proxy chain length
|
||||
if analysis.full_proxy_chain.len() > self.advanced_config.max_proxy_hops {
|
||||
return Err(ProxyError::ChainValidationFailed(format!(
|
||||
"The proxy chain is too long:{} > {}",
|
||||
analysis.full_proxy_chain.len(),
|
||||
self.advanced_config.max_proxy_hops
|
||||
)));
|
||||
}
|
||||
|
||||
// Check if the client IP is valid
|
||||
if analysis.trusted_client_ip.is_unspecified() {
|
||||
return Err(ProxyError::ChainValidationFailed(
|
||||
"The client IP is an unspecified address (0.0.0.0 or::)".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// Check if the loopback address (it could be a misconfiguration or an attack)
|
||||
if analysis.trusted_client_ip.is_loopback() && analysis.validated_hops > 1 {
|
||||
warn!(
|
||||
"The client IP is a loopback address, but it goes through multiple layers of proxies: {}",
|
||||
analysis.trusted_client_ip
|
||||
);
|
||||
}
|
||||
|
||||
// Check the multicast address
|
||||
if analysis.trusted_client_ip.is_multicast() {
|
||||
return Err(ProxyError::ChainValidationFailed("The client IP is the multicast address".to_string()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if a single IP is trustworthy
|
||||
fn is_ip_trusted(&self, ip: &IpAddr) -> bool {
|
||||
// Check the cache first
|
||||
if self.trusted_ips_cache.contains(ip) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Then check the CIDR range
|
||||
let dummy_port = 0;
|
||||
let addr = SocketAddr::new(*ip, dummy_port);
|
||||
self.base_config.is_trusted(&addr)
|
||||
}
|
||||
|
||||
/// Parsing RFC 7239 Forwarded head
|
||||
fn parse_forwarded_header(headers: &http::HeaderMap) -> Option<ForwardedHeader> {
|
||||
let forwarded_value = headers.get("forwarded")?.to_str().ok()?;
|
||||
|
||||
// Forwarded headers can have multiple values, separated by commas
|
||||
// We take the first one
|
||||
let first_part = forwarded_value.split(',').next()?.trim();
|
||||
|
||||
let mut result = ForwardedHeader {
|
||||
for_client: None,
|
||||
by_proxy: None,
|
||||
proto: None,
|
||||
host: None,
|
||||
};
|
||||
|
||||
// Parsing key-value pairs, such as:for=192.0.2.60;proto=http;by=203.0.113.43
|
||||
for part in first_part.split(';') {
|
||||
let part = part.trim();
|
||||
if let Some((key, value)) = part.split_once('=') {
|
||||
match key.trim().to_lowercase().as_str() {
|
||||
"for" => {
|
||||
// Remove possible quotes and port numbers
|
||||
let clean_value = value.trim_matches('"');
|
||||
if let Ok(ip) = clean_value.split(':').next().unwrap_or(clean_value).parse() {
|
||||
result.for_client = Some(ip);
|
||||
}
|
||||
}
|
||||
"by" => {
|
||||
let clean_value = value.trim_matches('"');
|
||||
if let Ok(ip) = clean_value.split(':').next().unwrap_or(clean_value).parse() {
|
||||
result.by_proxy = Some(ip);
|
||||
}
|
||||
}
|
||||
"proto" => {
|
||||
result.proto = Some(value.trim_matches('"').to_string());
|
||||
}
|
||||
"host" => {
|
||||
result.host = Some(value.trim_matches('"').to_string());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(result)
|
||||
}
|
||||
|
||||
/// Extract the proxy chain from the Forwarded header
|
||||
fn extract_chain_from_forwarded(forwarded: &ForwardedHeader) -> Option<Vec<IpAddr>> {
|
||||
let mut chain = Vec::new();
|
||||
|
||||
// If there is a for field, it is the client IP
|
||||
if let Some(client_ip) = &forwarded.for_client {
|
||||
chain.push(*client_ip);
|
||||
}
|
||||
|
||||
// If there is a by field, it may be a proxy IP
|
||||
// Note: In RFC 7239, multiple agents will have multiple Forwarded header values
|
||||
// Simplify the processing here, just take one
|
||||
|
||||
if chain.is_empty() { None } else { Some(chain) }
|
||||
}
|
||||
|
||||
/// Parsing X-Forwarded-For head
|
||||
fn parse_x_forwarded_for(headers: &http::HeaderMap) -> Option<Vec<IpAddr>> {
|
||||
let xff_value = headers.get("x-forwarded-for")?.to_str().ok()?;
|
||||
|
||||
let ips: Vec<IpAddr> = xff_value
|
||||
.split(',')
|
||||
.map(|s| s.trim())
|
||||
.filter(|s| !s.is_empty())
|
||||
.filter_map(|s| {
|
||||
// May contain the port number, take only the IP part
|
||||
let ip_part = s.split(':').next().unwrap_or(s);
|
||||
ip_part.parse().ok()
|
||||
})
|
||||
.collect();
|
||||
|
||||
if ips.is_empty() { None } else { Some(ips) }
|
||||
}
|
||||
|
||||
/// Parsing the X-Real-IP head
|
||||
fn parse_x_real_ip(headers: &http::HeaderMap) -> Option<IpAddr> {
|
||||
let value = headers.get("x-real-ip")?.to_str().ok()?;
|
||||
value.parse().ok()
|
||||
}
|
||||
}
|
||||
|
||||
// Update the MultiProxyProcessor to use the cache
|
||||
pub struct OptimizedMultiProxyProcessor {
|
||||
base_config: Arc<TrustedProxiesConfig>,
|
||||
advanced_config: MultiProxyConfig,
|
||||
ip_cache: std::sync::Mutex<IpValidationCache>,
|
||||
cidr_matcher: CidrMatcher,
|
||||
}
|
||||
|
||||
impl OptimizedMultiProxyProcessor {
|
||||
pub fn new(base_config: TrustedProxiesConfig, advanced_config: Option<MultiProxyConfig>) -> Self {
|
||||
let config = advanced_config.unwrap_or_default();
|
||||
|
||||
// Pre-compiled CIDR matcher
|
||||
let cidr_matcher = CidrMatcher::new(&base_config);
|
||||
|
||||
Self {
|
||||
base_config: Arc::new(base_config),
|
||||
advanced_config: config,
|
||||
ip_cache: std::sync::Mutex::new(IpValidationCache::new(10000, Duration::from_secs(300))),
|
||||
cidr_matcher,
|
||||
}
|
||||
}
|
||||
|
||||
/// Optimized IP trusted checks
|
||||
fn is_ip_trusted_optimized(&self, ip: &IpAddr) -> bool {
|
||||
let mut cache = self.ip_cache.lock().unwrap();
|
||||
|
||||
cache.is_trusted(ip, |ip| {
|
||||
// Fast Path: Check individual IP caches
|
||||
// Slow path: Check the CIDR range
|
||||
self.cidr_matcher.contains(ip)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Unit tests
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{MultiProxyProcessor, ProxyChainAnalysis, TrustedProxiesConfig, ValidationMode};
|
||||
use axum::http::HeaderMap;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
|
||||
fn create_test_processor() -> MultiProxyProcessor {
|
||||
let base_config =
|
||||
TrustedProxiesConfig::from_strs(&["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "127.0.0.1"]).unwrap();
|
||||
|
||||
MultiProxyProcessor::new(base_config, None)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_x_forwarded_for() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("X-Forwarded-For", "203.0.113.195, 198.51.100.1, 10.0.1.100".parse().unwrap());
|
||||
|
||||
let result = MultiProxyProcessor::parse_x_forwarded_for(&headers).unwrap();
|
||||
assert_eq!(result.len(), 3);
|
||||
assert_eq!(result[0], IpAddr::from_str("203.0.113.195").unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_x_forwarded_for_with_ports() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("X-Forwarded-For", "203.0.113.195:1234, 198.51.100.1:80".parse().unwrap());
|
||||
|
||||
let result = MultiProxyProcessor::parse_x_forwarded_for(&headers).unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(result[0], IpAddr::from_str("203.0.113.195").unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_forwarded_header() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Forwarded", r#"for=192.0.2.60;proto=http;by=203.0.113.43"#.parse().unwrap());
|
||||
|
||||
let result = MultiProxyProcessor::parse_forwarded_header(&headers).unwrap();
|
||||
assert_eq!(result.for_client, Some(IpAddr::from_str("192.0.2.60").unwrap()));
|
||||
assert_eq!(result.by_proxy, Some(IpAddr::from_str("203.0.113.43").unwrap()));
|
||||
assert_eq!(result.proto, Some("http".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_hop_by_hop() {
|
||||
let processor = create_test_processor();
|
||||
|
||||
// Test chain: Client (public) -> Proxy 1 (private) -> Proxy 2 (private)
|
||||
let chain = vec![
|
||||
IpAddr::from_str("8.8.8.8").unwrap(), // Client (not trusted)
|
||||
IpAddr::from_str("10.0.1.100").unwrap(), // Agent 1 (Trusted)
|
||||
IpAddr::from_str("192.168.1.50").unwrap(), // Agent 2 (Trusted)
|
||||
];
|
||||
|
||||
let result = processor.validate_hop_by_hop(&chain).unwrap();
|
||||
assert_eq!(result.0, IpAddr::from_str("8.8.8.8").unwrap()); // Client IP
|
||||
assert_eq!(result.2, 2); // Verified 2 jumps
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chain_continuity_check() {
|
||||
let processor = create_test_processor();
|
||||
|
||||
// Complete chain
|
||||
let full_chain = vec![
|
||||
IpAddr::from_str("8.8.8.8").unwrap(),
|
||||
IpAddr::from_str("10.0.1.100").unwrap(),
|
||||
IpAddr::from_str("192.168.1.50").unwrap(),
|
||||
];
|
||||
|
||||
// The trusted chain should be the tail continuous part of the complete chain
|
||||
let trusted_chain = vec![
|
||||
IpAddr::from_str("10.0.1.100").unwrap(),
|
||||
IpAddr::from_str("192.168.1.50").unwrap(),
|
||||
];
|
||||
|
||||
assert!(processor.check_chain_continuity(&full_chain, &trusted_chain));
|
||||
|
||||
// Discontinuous cases should fail
|
||||
let bad_trusted_chain = vec![IpAddr::from_str("192.168.1.50").unwrap()];
|
||||
assert!(!processor.check_chain_continuity(&full_chain, &bad_trusted_chain));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_security_checks() {
|
||||
let processor = create_test_processor();
|
||||
|
||||
// Create test analysis results
|
||||
let analysis = ProxyChainAnalysis {
|
||||
trusted_client_ip: IpAddr::from([0, 0, 0, 0]), // Invalid address
|
||||
full_proxy_chain: Vec::new(),
|
||||
trusted_proxy_chain: Vec::new(),
|
||||
validated_hops: 0,
|
||||
chain_integrity: true,
|
||||
validation_mode_used: ValidationMode::Lenient,
|
||||
security_warnings: Vec::new(),
|
||||
};
|
||||
|
||||
// Should fail because the client IP is an unspecified address
|
||||
assert!(processor.perform_security_checks(&analysis).is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_complex_proxy_chain() {
|
||||
use crate::TrustedProxiesConfig;
|
||||
use crate::{MultiProxyConfig, MultiProxyProcessor, ValidationMode};
|
||||
use axum::http::HeaderMap;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
// Create a test configuration
|
||||
let base_config = TrustedProxiesConfig::from_strs(&["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]).unwrap();
|
||||
|
||||
let advanced_config = MultiProxyConfig {
|
||||
validation_mode: ValidationMode::HopByHop,
|
||||
enable_rfc7239: true,
|
||||
max_proxy_hops: 5,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let processor = MultiProxyProcessor::new(base_config, Some(advanced_config));
|
||||
|
||||
// Simulate complex proxy chains
|
||||
let peer_addr = SocketAddr::from(([192, 168, 1, 100], 8080));
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("X-Forwarded-For", "203.0.113.195, 10.0.1.50, 172.16.0.10, 192.168.1.1".parse().unwrap());
|
||||
headers.insert("X-Forwarded-Proto", "https".parse().unwrap());
|
||||
headers.insert("X-Forwarded-Host", "api.example.com".parse().unwrap());
|
||||
|
||||
// Test processing
|
||||
let result = processor.process_request(&peer_addr, &headers).unwrap();
|
||||
|
||||
assert_eq!(result.trusted_client_ip.to_string(), "203.0.113.195");
|
||||
assert_eq!(result.validated_hops, 3); // 192.168.1.1, 172.16.0.10, 10.0.1.50
|
||||
assert!(result.chain_integrity);
|
||||
|
||||
// Test RFC 7239 head
|
||||
let mut rfc_headers = HeaderMap::new();
|
||||
rfc_headers.insert(
|
||||
"Forwarded",
|
||||
r#"for=192.0.2.60;proto=https;by=203.0.113.43,for=198.51.100.17"#.parse().unwrap(),
|
||||
);
|
||||
|
||||
let rfc_result = processor.process_request(&peer_addr, &rfc_headers).unwrap();
|
||||
assert_eq!(rfc_result.trusted_client_ip.to_string(), "192.0.2.60");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_proxy_chain_attack_scenarios() {
|
||||
use crate::MultiProxyProcessor;
|
||||
|
||||
let base_config = TrustedProxiesConfig::from_strs(&["10.0.0.0/8"]).unwrap();
|
||||
let processor = MultiProxyProcessor::new(base_config, None);
|
||||
|
||||
// Scenario 1: Excessive Proxy Chain Attack
|
||||
let peer_addr = SocketAddr::from(([10, 0, 0, 1], 80));
|
||||
let mut headers = HeaderMap::new();
|
||||
|
||||
// Create a very long chain (10 hops over the default limit)
|
||||
let long_chain = (0..15).map(|i| format!("10.0.{}.1", i)).collect::<Vec<_>>().join(", ");
|
||||
|
||||
headers.insert("X-Forwarded-For", long_chain.parse().unwrap());
|
||||
|
||||
let result = processor.process_request(&peer_addr, &headers);
|
||||
assert!(result.is_err());
|
||||
|
||||
// Scenario 2: IP spoofing attack
|
||||
let mut attack_headers = HeaderMap::new();
|
||||
attack_headers.insert("X-Forwarded-For", "8.8.8.8".parse().unwrap());
|
||||
|
||||
let attack_result = processor.process_request(&peer_addr, &attack_headers).unwrap();
|
||||
// 8.8.8.8 should be correctly identified as the client IP (because the proxy is trusted)
|
||||
assert_eq!(attack_result.trusted_client_ip.to_string(), "8.8.8.8");
|
||||
|
||||
// Scenario 3: Untrustworthy agent attempts to spoof
|
||||
let untrusted_peer = SocketAddr::from(([8, 8, 8, 8], 80));
|
||||
let mut fake_headers = HeaderMap::new();
|
||||
fake_headers.insert("X-Forwarded-For", "10.0.0.100".parse().unwrap());
|
||||
|
||||
let fake_result = processor.process_request(&untrusted_peer, &fake_headers).unwrap();
|
||||
// X-Forwarded-For should be ignored and 8.8.8.8 is used as the client IP
|
||||
assert_eq!(fake_result.trusted_client_ip.to_string(), "8.8.8.8");
|
||||
}
|
||||
}
|
||||
258
crates/trusted-proxies/src/proxy.rs
Normal file
258
crates/trusted-proxies/src/proxy.rs
Normal file
@@ -0,0 +1,258 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{
|
||||
CloudProviderIps, ConfigError, DEFAULT_ADDITIONAL_TRUSTED_PROXIES, DEFAULT_CLOUDFLARE_IPS_ENABLED, DEFAULT_TRUSTED_PROXIES,
|
||||
ENV_ADDITIONAL_TRUSTED_PROXIES, ENV_CLOUDFLARE_IPS_ENABLED, ENV_TRUSTED_PROXIES, EnvConfigLoader, ValidationMode,
|
||||
parse_proxy_list,
|
||||
};
|
||||
use ipnetwork::IpNetwork;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
use tracing::debug;
|
||||
|
||||
/// Trusted Proxy Configuration: Supports a single IP or CIDR block
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum TrustedProxy {
|
||||
/// A single IP address
|
||||
Single(IpAddr),
|
||||
/// IP address segment (CIDR notation)
|
||||
Cidr(IpNetwork),
|
||||
}
|
||||
|
||||
impl TrustedProxy {
|
||||
/// Parsing trusted agent configurations from strings
|
||||
/// Support: "127.0.0.1", "10.0.0.0/8", "::1"
|
||||
pub fn from_str(s: &str) -> anyhow::Result<Self> {
|
||||
if s.contains('/') {
|
||||
// CIDR notation
|
||||
let network = IpNetwork::from_str(s).map_err(|e| anyhow::anyhow!("Invalid CIDR format '{}': {}", s, e))?;
|
||||
Ok(TrustedProxy::Cidr(network))
|
||||
} else {
|
||||
// Single IP
|
||||
let ip = IpAddr::from_str(s).map_err(|e| anyhow::anyhow!("Invalid IP address '{}': {}", s, e))?;
|
||||
Ok(TrustedProxy::Single(ip))
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the IP matches this configuration
|
||||
pub fn contains(&self, ip: &IpAddr) -> bool {
|
||||
match self {
|
||||
TrustedProxy::Single(proxy_ip) => ip == proxy_ip,
|
||||
TrustedProxy::Cidr(network) => network.contains(*ip),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trusted Agent Configuration Collection
|
||||
///
|
||||
/// Supports multiple trusted agents (single IP or CIDR blocks)
|
||||
/// Used to verify if a request comes from a trusted agent based on its SocketAddr
|
||||
/// Refer to `TrustedProxy` for individual configurations
|
||||
///
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TrustedProxiesConfig {
|
||||
pub proxies: Vec<TrustedProxy>,
|
||||
}
|
||||
|
||||
impl Default for TrustedProxiesConfig {
|
||||
fn default() -> Self {
|
||||
// Default trusted agents: Local loopback and common private networks
|
||||
let default_proxies = parse_proxy_list(DEFAULT_TRUSTED_PROXIES);
|
||||
Self::from_strs(&default_proxies).unwrap_or_else(|_| Self { proxies: vec![] })
|
||||
}
|
||||
}
|
||||
|
||||
impl TrustedProxiesConfig {
|
||||
/// Create a new configuration
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `proxies` - A vector of TrustedProxy configurations
|
||||
///
|
||||
/// # Returns
|
||||
/// A new TrustedProxiesConfig instance
|
||||
pub fn new(proxies: Vec<TrustedProxy>) -> Self {
|
||||
Self { proxies }
|
||||
}
|
||||
|
||||
/// Create a configuration from a string slice
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `proxy_strs` - A slice of string slices representing trusted agents
|
||||
///
|
||||
/// # Returns
|
||||
/// A Result containing the TrustedProxiesConfig or an error if parsing fails
|
||||
pub fn from_strs(proxy_strs: &[&str]) -> anyhow::Result<Self> {
|
||||
let mut proxies = Vec::new();
|
||||
|
||||
for s in proxy_strs {
|
||||
proxies.push(TrustedProxy::from_str(s)?);
|
||||
}
|
||||
|
||||
Ok(Self::new(proxies))
|
||||
}
|
||||
|
||||
/// Check if the SocketAddr is coming from a trusted agent
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `addr` - The SocketAddr to check
|
||||
///
|
||||
/// # Returns
|
||||
/// true if the address is from a trusted agent, false otherwise
|
||||
pub fn is_trusted(&self, addr: &SocketAddr) -> bool {
|
||||
let ip = addr.ip();
|
||||
self.proxies.iter().any(|proxy| proxy.contains(&ip))
|
||||
}
|
||||
|
||||
/// Get a list of trusted agents (for debugging/logging)
|
||||
///
|
||||
/// # Returns
|
||||
/// A vector of strings representing the trusted agents
|
||||
pub fn get_trusted_ranges(&self) -> Vec<String> {
|
||||
self.proxies
|
||||
.iter()
|
||||
.map(|p| match p {
|
||||
TrustedProxy::Single(ip) => ip.to_string(),
|
||||
TrustedProxy::Cidr(network) => network.to_string(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Create a configuration from an environment variable
|
||||
pub fn from_env() -> Result<Self, ConfigError> {
|
||||
// Load the underlying trusted agent
|
||||
let mut proxies = EnvConfigLoader::parse_ip_list(ENV_TRUSTED_PROXIES, DEFAULT_TRUSTED_PROXIES)?;
|
||||
|
||||
// Load additional trusted agents
|
||||
let additional_proxies =
|
||||
EnvConfigLoader::parse_ip_list(ENV_ADDITIONAL_TRUSTED_PROXIES, DEFAULT_ADDITIONAL_TRUSTED_PROXIES)?;
|
||||
proxies.extend(additional_proxies);
|
||||
|
||||
// Get IP ranges from cloud metadata
|
||||
let cloud_ips = EnvConfigLoader::fetch_cloud_metadata_ips()?;
|
||||
for ip_range in cloud_ips {
|
||||
if let Ok(network) = ip_range.parse::<IpNetwork>() {
|
||||
proxies.push(TrustedProxy::Cidr(network));
|
||||
debug!("Add cloud metadata IP range: {}", network);
|
||||
}
|
||||
}
|
||||
|
||||
// If Cloudflare IP is enabled, add
|
||||
if EnvConfigLoader::get_bool(ENV_CLOUDFLARE_IPS_ENABLED, DEFAULT_CLOUDFLARE_IPS_ENABLED)? {
|
||||
for network in CloudProviderIps::cloudflare_ranges() {
|
||||
proxies.push(TrustedProxy::Cidr(network));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self { proxies })
|
||||
}
|
||||
|
||||
/// Get the trusted agent scope configured in the environment variable (for debugging/logging)
|
||||
pub fn get_trusted_ranges_from_env() -> Result<Vec<String>, ConfigError> {
|
||||
let config = Self::from_env()?;
|
||||
Ok(config.get_trusted_ranges())
|
||||
}
|
||||
}
|
||||
|
||||
/// Client real information stored in the request extension
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ClientInfo {
|
||||
/// Real client IP address (verified)
|
||||
pub real_ip: IpAddr,
|
||||
/// Original request hostname (if from a trusted agent)
|
||||
pub forwarded_host: Option<String>,
|
||||
/// Original request agreement (if from a trusted agent)
|
||||
pub forwarded_proto: Option<String>,
|
||||
/// Whether the request comes from a trusted agent
|
||||
pub is_from_trusted_proxy: bool,
|
||||
/// Direct Connected Proxy IP (if Proxyed)
|
||||
pub proxy_ip: Option<IpAddr>,
|
||||
}
|
||||
|
||||
impl ClientInfo {
|
||||
/// Create client information for direct connections (no agents)
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `addr` - The SocketAddr of the direct client connection
|
||||
///
|
||||
/// # Returns
|
||||
/// A new ClientInfo instance representing a direct connection
|
||||
pub fn direct(addr: SocketAddr) -> Self {
|
||||
Self {
|
||||
real_ip: addr.ip(),
|
||||
forwarded_host: None,
|
||||
forwarded_proto: None,
|
||||
is_from_trusted_proxy: false,
|
||||
proxy_ip: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create client information from trusted agents
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `real_ip` - The real client IP address
|
||||
/// * `forwarded_host` - The original request hostname
|
||||
/// * `forwarded_proto` - The original request agreement
|
||||
/// * `proxy_ip` - The direct connected proxy IP
|
||||
///
|
||||
/// # Returns
|
||||
/// A new ClientInfo instance representing a proxied connection
|
||||
pub fn from_trusted_proxy(
|
||||
real_ip: IpAddr,
|
||||
forwarded_host: Option<String>,
|
||||
forwarded_proto: Option<String>,
|
||||
proxy_ip: IpAddr,
|
||||
) -> Self {
|
||||
Self {
|
||||
real_ip,
|
||||
forwarded_host,
|
||||
forwarded_proto,
|
||||
is_from_trusted_proxy: true,
|
||||
proxy_ip: Some(proxy_ip),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Multi-tier proxy processor configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MultiProxyConfig {
|
||||
/// Proxy chain validation mode
|
||||
pub validation_mode: ValidationMode,
|
||||
/// Is RFC 7239 Forwarded header support enabled?
|
||||
pub enable_rfc7239: bool,
|
||||
/// Maximum proxy hop limit
|
||||
pub max_proxy_hops: usize,
|
||||
/// Allowed private network CIDR (for internal proxy authentication)
|
||||
pub allowed_private_nets: Vec<IpNetwork>,
|
||||
/// Whether to enable proxy chain continuity checking
|
||||
pub enable_chain_continuity_check: bool,
|
||||
}
|
||||
|
||||
impl Default for MultiProxyConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
validation_mode: ValidationMode::HopByHop,
|
||||
enable_rfc7239: true,
|
||||
max_proxy_hops: 10,
|
||||
allowed_private_nets: vec![
|
||||
IpNetwork::from_str("10.0.0.0/8").unwrap(),
|
||||
IpNetwork::from_str("172.16.0.0/12").unwrap(),
|
||||
IpNetwork::from_str("192.168.0.0/16").unwrap(),
|
||||
IpNetwork::from_str("fd00::/8").unwrap(),
|
||||
],
|
||||
enable_chain_continuity_check: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -292,7 +292,7 @@ pub fn create_multi_cert_resolver(
|
||||
|
||||
for (domain, (certs, key)) in cert_key_pairs {
|
||||
// create a signature
|
||||
let signing_key = rustls::crypto::ring::sign::any_supported_type(&key)
|
||||
let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key)
|
||||
.map_err(|e| certs_error(format!("unsupported private key types:{domain}, err:{e:?}")))?;
|
||||
|
||||
// create a CertifiedKey
|
||||
|
||||
@@ -59,6 +59,7 @@ rustfs-rio.workspace = true
|
||||
rustfs-s3select-api = { workspace = true }
|
||||
rustfs-s3select-query = { workspace = true }
|
||||
rustfs-targets = { workspace = true }
|
||||
rustfs-trusted-proxies = { workspace = true }
|
||||
rustfs-utils = { workspace = true, features = ["full"] }
|
||||
rustfs-zip = { workspace = true }
|
||||
|
||||
@@ -82,7 +83,6 @@ tokio-util.workspace = true
|
||||
tonic = { workspace = true }
|
||||
tower.workspace = true
|
||||
tower-http = { workspace = true, features = ["trace", "compression-full", "cors", "catch-panic", "timeout", "limit", "request-id", "add-extension"] }
|
||||
ipnetwork = { workspace = true }
|
||||
|
||||
# Serialization and Data Formats
|
||||
bytes = { workspace = true }
|
||||
@@ -130,6 +130,7 @@ zip = { workspace = true }
|
||||
# Observability and Metrics
|
||||
metrics = { workspace = true }
|
||||
|
||||
|
||||
[target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
|
||||
sysctl = { workspace = true }
|
||||
|
||||
|
||||
@@ -367,7 +367,7 @@ async fn _setup_console_tls_config(tls_path: Option<&String>) -> Result<Option<R
|
||||
debug!("Found TLS directory for console, checking for certificates");
|
||||
|
||||
// Make sure to use a modern encryption suite
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
|
||||
// 1. Attempt to load all certificates in the directory (multi-certificate support, for SNI)
|
||||
if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) {
|
||||
|
||||
@@ -110,6 +110,17 @@ async fn async_main() -> Result<()> {
|
||||
// Initialize performance profiling if enabled
|
||||
profiling::init_from_env().await;
|
||||
|
||||
// rustfs-trusted-proxies
|
||||
match rustfs_trusted_proxies::initialize().await {
|
||||
Ok(_) => {
|
||||
info!(target: "rustfs::main", "Trusted proxies configuration initialized successfully.");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to initialize trusted proxies configuration: {:?}", e);
|
||||
return Err(Error::other(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize TLS if a certificate path is provided
|
||||
if let Some(tls_path) = &opt.tls_path {
|
||||
match init_cert(tls_path).await {
|
||||
|
||||
@@ -17,6 +17,7 @@ use super::compress::{CompressionConfig, CompressionPredicate};
|
||||
use crate::admin;
|
||||
use crate::auth::IAMAuth;
|
||||
use crate::config;
|
||||
use crate::proxy::{ClientInfo, TrustedProxiesLayer};
|
||||
use crate::server::{ReadinessGateLayer, RemoteAddr, ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer};
|
||||
use crate::storage;
|
||||
use crate::storage::tonic_service::make_server;
|
||||
@@ -434,7 +435,7 @@ async fn setup_tls_acceptor(tls_path: &str) -> Result<Option<TlsAcceptor>> {
|
||||
debug!("Found TLS directory, checking for certificates");
|
||||
|
||||
// Make sure to use a modern encryption suite
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
let mtls_verifier = rustfs_utils::build_webpki_client_verifier(tls_path)?;
|
||||
|
||||
// 1. Attempt to load all certificates in the directory (multi-certificate support, for SNI)
|
||||
@@ -552,8 +553,9 @@ fn process_connection(
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let trusted_proxies = crate::proxy::get_trusted_proxies_config().await;
|
||||
let hybrid_service = ServiceBuilder::new()
|
||||
.layer(TrustedProxiesLayer::new(trusted_proxies.clone()))
|
||||
.layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
|
||||
.layer(CatchPanicLayer::new())
|
||||
.layer(AddExtensionLayer::new(remote_addr))
|
||||
@@ -568,10 +570,15 @@ fn process_connection(
|
||||
.get(http::header::HeaderName::from_static("x-request-id"))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("unknown");
|
||||
let client_info = request.extensions().get::<ClientInfo>();
|
||||
let real_ip = client_info
|
||||
.map(|info| info.real_ip.to_string())
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
let span = tracing::info_span!("http-request",
|
||||
trace_id = %trace_id,
|
||||
status_code = tracing::field::Empty,
|
||||
method = %request.method(),
|
||||
real_ip = %real_ip,
|
||||
uri = %request.uri(),
|
||||
version = ?request.version(),
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user