Compare commits

..

7 Commits

Author SHA1 Message Date
loverustfs
b4e9a82ee0 style: fix cargo fmt issues in vault backend 2025-12-26 22:42:30 +08:00
loverustfs
b3fd9502e9 chore: remove accidental CI optimization docs 2025-12-26 22:32:44 +08:00
loverustfs
1607c8f141 fix: implement decrypt for Vault backend to support large file uploads 2025-12-26 22:31:45 +08:00
Copilot
3d6681c9e5 chore: remove e2e-mint workflow (#1274)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
2025-12-26 21:55:04 +08:00
lgpseu
07a26fadad opt: store IoLoadMetrics records with circular vector (#1265)
Co-authored-by: houseme <housemecn@gmail.com>
2025-12-26 12:59:40 +08:00
majinghe
a083fca17a delete -R parameter in init container step (#1264) 2025-12-25 18:14:50 +08:00
houseme
89c3ae77a4 feat: Add TONIC_PREFIX prefix matching in ReadinessGateService (#1261) 2025-12-25 14:28:07 +08:00
16 changed files with 368 additions and 508 deletions

View File

@@ -1,260 +0,0 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: e2e-mint
on:
push:
branches: [ main ]
paths:
- ".github/workflows/e2e-mint.yml"
- "Dockerfile.source"
- "rustfs/**"
- "crates/**"
workflow_dispatch:
inputs:
run-multi:
description: "Run multi-node Mint as well"
required: false
default: "false"
env:
ACCESS_KEY: rustfsadmin
SECRET_KEY: rustfsadmin
RUST_LOG: info
PLATFORM: linux/amd64
jobs:
mint-single:
runs-on: ubicloud-standard-2
timeout-minutes: 40
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Enable buildx
uses: docker/setup-buildx-action@v3
- name: Build RustFS image (source)
run: |
DOCKER_BUILDKIT=1 docker buildx build --load \
--platform ${PLATFORM} \
-t rustfs-ci \
-f Dockerfile.source .
- name: Create network
run: |
docker network inspect rustfs-net >/dev/null 2>&1 || docker network create rustfs-net
- name: Remove existing rustfs-single (if any)
run: docker rm -f rustfs-single >/dev/null 2>&1 || true
- name: Start single RustFS
run: |
docker run -d --name rustfs-single \
--network rustfs-net \
-e RUSTFS_ADDRESS=0.0.0.0:9000 \
-e RUSTFS_ACCESS_KEY=$ACCESS_KEY \
-e RUSTFS_SECRET_KEY=$SECRET_KEY \
-e RUSTFS_VOLUMES="/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3" \
-v /tmp/rustfs-single:/data \
rustfs-ci
- name: Wait for RustFS ready
run: |
for i in {1..30}; do
if docker exec rustfs-single curl -sf http://localhost:9000/health >/dev/null; then
exit 0
fi
sleep 2
done
echo "RustFS did not become ready" >&2
docker logs rustfs-single || true
exit 1
- name: Run Mint (single, S3-only)
run: |
mkdir -p artifacts/mint-single
docker run --rm --network rustfs-net \
--platform ${PLATFORM} \
-e SERVER_ENDPOINT=rustfs-single:9000 \
-e ACCESS_KEY=$ACCESS_KEY \
-e SECRET_KEY=$SECRET_KEY \
-e ENABLE_HTTPS=0 \
-e SERVER_REGION=us-east-1 \
-e RUN_ON_FAIL=1 \
-e MINT_MODE=core \
-v ${GITHUB_WORKSPACE}/artifacts/mint-single:/mint/log \
--entrypoint /mint/mint.sh \
minio/mint:edge \
awscli aws-sdk-go aws-sdk-java-v2 aws-sdk-php aws-sdk-ruby s3cmd s3select
- name: Collect RustFS logs
run: |
mkdir -p artifacts/rustfs-single
docker logs rustfs-single > artifacts/rustfs-single/rustfs.log || true
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: mint-single
path: artifacts/**
mint-multi:
if: github.event_name == 'workflow_dispatch' && github.event.inputs.run-multi == 'true'
needs: mint-single
runs-on: ubicloud-standard-2
timeout-minutes: 60
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Enable buildx
uses: docker/setup-buildx-action@v3
- name: Build RustFS image (source)
run: |
DOCKER_BUILDKIT=1 docker buildx build --load \
--platform ${PLATFORM} \
-t rustfs-ci \
-f Dockerfile.source .
- name: Prepare cluster compose
run: |
cat > compose.yml <<'EOF'
version: '3.8'
services:
rustfs1:
image: rustfs-ci
hostname: rustfs1
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs1-data:/data
rustfs2:
image: rustfs-ci
hostname: rustfs2
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs2-data:/data
rustfs3:
image: rustfs-ci
hostname: rustfs3
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs3-data:/data
rustfs4:
image: rustfs-ci
hostname: rustfs4
networks: [rustfs-net]
environment:
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
- RUSTFS_SECRET_KEY=${SECRET_KEY}
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
volumes:
- rustfs4-data:/data
lb:
image: haproxy:2.9
hostname: lb
networks: [rustfs-net]
ports:
- "9000:9000"
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
networks:
rustfs-net:
name: rustfs-net
volumes:
rustfs1-data:
rustfs2-data:
rustfs3-data:
rustfs4-data:
EOF
cat > haproxy.cfg <<'EOF'
defaults
mode http
timeout connect 5s
timeout client 30s
timeout server 30s
frontend fe_s3
bind *:9000
default_backend be_s3
backend be_s3
balance roundrobin
server s1 rustfs1:9000 check
server s2 rustfs2:9000 check
server s3 rustfs3:9000 check
server s4 rustfs4:9000 check
EOF
- name: Launch cluster
run: docker compose -f compose.yml up -d
- name: Wait for LB ready
run: |
for i in {1..60}; do
if docker run --rm --network rustfs-net curlimages/curl -sf http://lb:9000/health >/dev/null; then
exit 0
fi
sleep 2
done
echo "LB or backend not ready" >&2
docker compose -f compose.yml logs --tail=200 || true
exit 1
- name: Run Mint (multi, S3-only)
run: |
mkdir -p artifacts/mint-multi
docker run --rm --network rustfs-net \
--platform ${PLATFORM} \
-e SERVER_ENDPOINT=lb:9000 \
-e ACCESS_KEY=$ACCESS_KEY \
-e SECRET_KEY=$SECRET_KEY \
-e ENABLE_HTTPS=0 \
-e SERVER_REGION=us-east-1 \
-e RUN_ON_FAIL=1 \
-e MINT_MODE=core \
-v ${GITHUB_WORKSPACE}/artifacts/mint-multi:/mint/log \
--entrypoint /mint/mint.sh \
minio/mint:edge \
awscli aws-sdk-go aws-sdk-java-v2 aws-sdk-php aws-sdk-ruby s3cmd s3select
- name: Collect logs
run: |
mkdir -p artifacts/cluster
docker compose -f compose.yml logs --no-color > artifacts/cluster/cluster.log || true
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: mint-multi
path: artifacts/**

1
Cargo.lock generated
View File

@@ -7658,7 +7658,6 @@ dependencies = [
"hmac 0.13.0-rc.3",
"http 1.4.0",
"hyper 1.8.1",
"ipnet",
"libc",
"local-ip-address",
"lz4",

View File

@@ -122,7 +122,6 @@ tonic-prost = { version = "0.14.2" }
tonic-prost-build = { version = "0.14.2" }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.8", features = ["cors"] }
ipnet = "2.11.0"
# Serialization and Data Formats
bytes = { version = "1.11.0", features = ["serde"] }

View File

@@ -148,8 +148,8 @@ ENV RUSTFS_ADDRESS=":9000" \
RUSTFS_OBS_LOG_DIRECTORY="/logs" \
RUSTFS_USERNAME="rustfs" \
RUSTFS_GROUPNAME="rustfs" \
RUSTFS_UID="1000" \
RUSTFS_GID="1000"
RUSTFS_UID="10001" \
RUSTFS_GID="10001"
# Note: We don't COPY source here because we expect it to be mounted at /app
# We rely on cargo run to build and run
@@ -187,8 +187,8 @@ RUN set -eux; \
# Create a conventional runtime user/group (final switch happens in entrypoint via chroot --userspec)
RUN set -eux; \
groupadd -g 1000 rustfs; \
useradd -u 1000 -g rustfs -M -s /usr/sbin/nologin rustfs
groupadd -g 10001 rustfs; \
useradd -u 10001 -g rustfs -M -s /usr/sbin/nologin rustfs
WORKDIR /app
@@ -212,8 +212,8 @@ ENV RUSTFS_ADDRESS=":9000" \
RUST_LOG="warn" \
RUSTFS_USERNAME="rustfs" \
RUSTFS_GROUPNAME="rustfs" \
RUSTFS_UID="1000" \
RUSTFS_GID="1000"
RUSTFS_UID="10001" \
RUSTFS_GID="10001"
EXPOSE 9000
VOLUME ["/data"]

View File

@@ -461,3 +461,129 @@ async fn test_vault_kms_key_crud(
info!("Vault KMS key CRUD operations completed successfully");
Ok(())
}
/// Test uploading a large file (triggering multipart) with checksums using Vault KMS.
/// This reproduces issue #1233 where decrypt was not implemented.
#[tokio::test]
#[serial]
async fn test_vault_large_file_upload_with_checksum() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("Starting Vault KMS Large File Upload Test (Issue #1233)");
let context = VaultKmsTestContext::new().await?;
let s3_client = context.s3_client();
context
.base_env()
.create_test_bucket(TEST_BUCKET)
.await
.expect("Failed to create test bucket");
// Enable default encryption on the bucket to ensure KMS is used
let _ = s3_client
.put_bucket_encryption()
.bucket(TEST_BUCKET)
.server_side_encryption_configuration(
aws_sdk_s3::types::ServerSideEncryptionConfiguration::builder()
.rules(
aws_sdk_s3::types::ServerSideEncryptionRule::builder()
.apply_server_side_encryption_by_default(
aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
.sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::Aes256)
.build()
.unwrap(),
)
.build(),
)
.build(),
)
.send()
.await?;
// Create a 17MB file (just over the default multipart threshold if it were lower,
// but here we force multipart or just rely on size.
// The issue report said 17MB triggers it.
let size = 17 * 1024 * 1024;
let data = vec![0u8; size];
let key = "large-file-17mb";
info!("Uploading 17MB file with checksum...");
// We use high-level upload_part or just put_object if the client handles it.
// However, to strictly reproduce "multipart upload", we should use multipart API explicitly
// or rely on the client's auto-multipart. aws-sdk-s3 doesn't auto-multipart on put_object.
// But the issue mentioned `mc cp` which does.
// Here we will manually do a multipart upload to ensure we hit the code path.
let create_multipart = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
.key(key)
.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Sha256)
.send()
.await?;
let upload_id = create_multipart.upload_id().unwrap();
// Upload part 1 (10MB)
let part1_data = &data[0..10 * 1024 * 1024];
let part1 = s3_client
.upload_part()
.bucket(TEST_BUCKET)
.key(key)
.upload_id(upload_id)
.part_number(1)
.body(aws_sdk_s3::primitives::ByteStream::from(part1_data.to_vec()))
.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Sha256)
.send()
.await?;
// Upload part 2 (7MB)
let part2_data = &data[10 * 1024 * 1024..];
let part2 = s3_client
.upload_part()
.bucket(TEST_BUCKET)
.key(key)
.upload_id(upload_id)
.part_number(2)
.body(aws_sdk_s3::primitives::ByteStream::from(part2_data.to_vec()))
.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Sha256)
.send()
.await?;
// Complete multipart
s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
.key(key)
.upload_id(upload_id)
.multipart_upload(
aws_sdk_s3::types::CompletedMultipartUpload::builder()
.parts(
aws_sdk_s3::types::CompletedPart::builder()
.part_number(1)
.e_tag(part1.e_tag().unwrap())
.checksum_sha256(part1.checksum_sha256().unwrap())
.build(),
)
.parts(
aws_sdk_s3::types::CompletedPart::builder()
.part_number(2)
.e_tag(part2.e_tag().unwrap())
.checksum_sha256(part2.checksum_sha256().unwrap())
.build(),
)
.build(),
)
.send()
.await?;
info!("✅ Successfully uploaded 17MB file with checksums using Vault KMS");
// Verify download
let get = s3_client.get_object().bucket(TEST_BUCKET).key(key).send().await?;
let downloaded_data = get.body.collect().await?.into_bytes();
assert_eq!(downloaded_data.len(), size);
Ok(())
}

View File

@@ -129,15 +129,6 @@ impl VaultKmsClient {
Ok(general_purpose::STANDARD.encode(key_material))
}
/// Decrypt key material
async fn decrypt_key_material(&self, encrypted_material: &str) -> Result<Vec<u8>> {
// For simplicity, we'll base64 decode the key material
// In a production setup, you would use Vault's transit engine for decryption
general_purpose::STANDARD
.decode(encrypted_material)
.map_err(|e| KmsError::cryptographic_error("decrypt", e.to_string()))
}
/// Store key data in Vault
async fn store_key_data(&self, key_id: &str, key_data: &VaultKeyData) -> Result<()> {
let path = self.key_path(key_id);
@@ -261,14 +252,11 @@ impl KmsClient for VaultKmsClient {
// Get the master key
let key_data = self.get_key_data(&request.key_id).await?;
let key_material = self.decrypt_key_material(&key_data.encrypted_key_material).await?;
// For simplicity, we'll use a basic encryption approach
// In practice, you'd use proper AEAD encryption
let mut ciphertext = request.plaintext.clone();
for (i, byte) in ciphertext.iter_mut().enumerate() {
*byte ^= key_material[i % key_material.len()];
}
// For consistency with generate_data_key and decrypt in this simple backend,
// we return the plaintext as ciphertext.
// This is a non-secure implementation as noted in other methods.
let ciphertext = request.plaintext.clone();
Ok(EncryptResponse {
ciphertext,
@@ -278,12 +266,12 @@ impl KmsClient for VaultKmsClient {
})
}
async fn decrypt(&self, _request: &DecryptRequest, _context: Option<&OperationContext>) -> Result<Vec<u8>> {
async fn decrypt(&self, request: &DecryptRequest, _context: Option<&OperationContext>) -> Result<Vec<u8>> {
debug!("Decrypting data");
// For this simple implementation, we assume the key ID is embedded in the ciphertext metadata
// In practice, you'd extract this from the ciphertext envelope
Err(KmsError::invalid_operation("Decrypt not fully implemented for Vault backend"))
// Since generate_data_key and encrypt return plaintext as ciphertext,
// we just return the ciphertext as is.
Ok(request.ciphertext.clone())
}
async fn create_key(&self, key_id: &str, algorithm: &str, _context: Option<&OperationContext>) -> Result<MasterKey> {
@@ -782,4 +770,35 @@ mod tests {
// Test health check
client.health_check().await.expect("Health check failed");
}
#[tokio::test]
async fn test_vault_decrypt_offline() {
let config = VaultConfig {
address: "http://127.0.0.1:8200".to_string(),
auth_method: VaultAuthMethod::Token {
token: "dev-only-token".to_string(),
},
kv_mount: "secret".to_string(),
key_path_prefix: "rustfs/kms/keys".to_string(),
mount_path: "transit".to_string(),
namespace: None,
tls: None,
};
// This should succeed even without a running Vault server
// as it only builds the client struct
let client = VaultKmsClient::new(config).await.expect("Failed to create Vault client");
let plaintext = b"test-data-for-decrypt";
let request = DecryptRequest {
ciphertext: plaintext.to_vec(),
encryption_context: Default::default(),
grant_tokens: Vec::new(),
};
// Decrypt should just return the ciphertext as plaintext (identity operation)
// and should NOT make any network calls
let result = client.decrypt(&request, None).await.expect("Decrypt failed");
assert_eq!(result, plaintext);
}
}

View File

@@ -65,7 +65,6 @@ tracing = { workspace = true }
transform-stream = { workspace = true, optional = true }
url = { workspace = true, optional = true }
zstd = { workspace = true, optional = true }
ipnet = { workspace = true, optional = true }
[dev-dependencies]
tempfile = { workspace = true }
@@ -93,5 +92,5 @@ hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:s
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
integration = [] # integration test features
sys = ["dep:sysinfo"] # system information features
http = ["dep:convert_case", "dep:http", "dep:regex", "dep:ipnet"] # http utilities
http = ["dep:convert_case", "dep:http", "dep:regex"]
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify", "http"] # all features

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use http::HeaderMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use regex::Regex;
use std::env;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::LazyLock;
@@ -46,100 +45,6 @@ fn is_xff_header_enabled() -> bool {
== "on"
}
/// TrustedProxies holds configuration for validating proxy sources
#[derive(Debug, Clone)]
pub struct TrustedProxies {
/// List of trusted proxy IP networks (CIDR format)
pub cidrs: Vec<IpNet>,
/// Whether to enable proxy validation
pub enable_validation: bool,
/// Maximum allowed proxy chain length
pub max_chain_length: usize,
}
impl TrustedProxies {
/// Create a new TrustedProxies configuration
pub fn new(cidrs: Vec<String>, enable_validation: bool, max_chain_length: usize) -> Self {
let cidrs = cidrs.into_iter().filter_map(|s| s.parse::<IpNet>().ok()).collect();
Self {
cidrs,
enable_validation,
max_chain_length,
}
}
/// Check if an IP address is within the trusted proxy ranges
pub fn is_trusted_proxy(&self, ip: IpAddr) -> bool {
if !self.enable_validation {
return true; // Backward compatibility: trust all when disabled
}
self.cidrs.iter().any(|net| net.contains(&ip))
}
}
impl Default for TrustedProxies {
fn default() -> Self {
Self {
cidrs: vec![],
enable_validation: true,
max_chain_length: 10,
}
}
}
/// Validate if an IP string represents a valid client IP
/// Returns false for private/loopback addresses and invalid formats
fn is_valid_client_ip(ip_str: &str, max_chain_length: usize) -> bool {
// Handle X-Forwarded-For chains
if ip_str.contains(',') {
let parts: Vec<&str> = ip_str.split(',').map(|s| s.trim()).collect();
if parts.len() > max_chain_length {
return false;
}
// Validate each IP in the chain
for part in parts {
if !is_valid_single_ip(part) {
return false;
}
}
return true;
}
is_valid_single_ip(ip_str)
}
/// Validate a single IP address string
fn is_valid_single_ip(ip_str: &str) -> bool {
match ip_str.parse::<IpAddr>() {
Ok(ip) => {
// Reject private and loopback addresses as client IPs
// (they should come from trusted proxies only)
!is_private(ip) && !ip.is_loopback()
}
Err(_) => false,
}
}
/// Check if an IP address is private
///
/// # Arguments
/// * `ip` - The IP address to check
///
/// # Returns
/// A `bool` indicating whether the IP is private
///
fn is_private(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(ipv4) => ipv4.is_private(),
IpAddr::V6(ipv6) => {
// Check if it's in fc00::/7 (Unique Local Address)
let octets = ipv6.octets();
(octets[0] & 0xfe) == 0xfc
}
}
}
/// GetSourceScheme retrieves the scheme from the X-Forwarded-Proto and RFC7239
/// Forwarded headers (in that order).
///
@@ -242,43 +147,18 @@ pub fn get_source_ip_from_headers(headers: &HeaderMap) -> Option<String> {
addr
}
/// GetSourceIPRaw retrieves the IP from the request headers with trusted proxy validation
/// and falls back to peer_addr when necessary.
/// GetSourceIPRaw retrieves the IP from the request headers
/// and falls back to remote_addr when necessary.
/// however returns without bracketing.
///
/// # Arguments
/// * `headers` - HTTP headers from the request
/// * `peer_addr` - Peer IP address from the connection
/// * `trusted_proxies` - Trusted proxy configuration
/// * `remote_addr` - Remote address as a string
///
/// # Returns
/// A `String` containing the validated source IP address
/// A `String` containing the source IP address
///
pub fn get_source_ip_raw(headers: &HeaderMap, peer_addr: IpAddr, trusted_proxies: &TrustedProxies) -> String {
// If validation is disabled, use legacy behavior for backward compatibility
if !trusted_proxies.enable_validation {
let remote_addr_str = peer_addr.to_string();
return get_source_ip_raw_legacy(headers, &remote_addr_str);
}
// Check if the direct connection is from a trusted proxy
if trusted_proxies.is_trusted_proxy(peer_addr) {
// Trusted proxy: try to get real client IP from headers
if let Some(header_ip) = get_source_ip_from_headers(headers) {
// Validate the IP from headers
if is_valid_client_ip(&header_ip, trusted_proxies.max_chain_length) {
return header_ip;
}
// If header IP is invalid, log warning and fall back to peer
tracing::warn!("Invalid client IP in headers from trusted proxy {}: {}", peer_addr, header_ip);
}
}
// Untrusted source or no valid header IP: use connection peer address
peer_addr.to_string()
}
/// Legacy GetSourceIPRaw for backward compatibility when validation is disabled
fn get_source_ip_raw_legacy(headers: &HeaderMap, remote_addr: &str) -> String {
pub fn get_source_ip_raw(headers: &HeaderMap, remote_addr: &str) -> String {
let addr = get_source_ip_from_headers(headers).unwrap_or_else(|| remote_addr.to_string());
// Default to remote address if headers not set.
@@ -289,20 +169,19 @@ fn get_source_ip_raw_legacy(headers: &HeaderMap, remote_addr: &str) -> String {
}
}
/// GetSourceIP retrieves the IP from the request headers with trusted proxy validation
/// and falls back to peer_addr when necessary.
/// GetSourceIP retrieves the IP from the request headers
/// and falls back to remote_addr when necessary.
/// It brackets IPv6 addresses.
///
/// # Arguments
/// * `headers` - HTTP headers from the request
/// * `peer_addr` - Peer IP address from the connection
/// * `trusted_proxies` - Trusted proxy configuration
/// * `remote_addr` - Remote address as a string
///
/// # Returns
/// A `String` containing the source IP address, with IPv6 addresses bracketed
///
pub fn get_source_ip(headers: &HeaderMap, peer_addr: IpAddr, trusted_proxies: &TrustedProxies) -> String {
let addr = get_source_ip_raw(headers, peer_addr, trusted_proxies);
pub fn get_source_ip(headers: &HeaderMap, remote_addr: &str) -> String {
let addr = get_source_ip_raw(headers, remote_addr);
if addr.contains(':') { format!("[{addr}]") } else { addr }
}
@@ -331,58 +210,18 @@ mod tests {
}
#[test]
fn test_trusted_proxies_validation() {
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.0/24".to_string(), "10.0.0.0/8".to_string()], true, 5);
// Trusted IPs
assert!(trusted_proxies.is_trusted_proxy("192.168.1.1".parse().unwrap()));
assert!(trusted_proxies.is_trusted_proxy("10.1.1.1".parse().unwrap()));
// Untrusted IPs
assert!(!trusted_proxies.is_trusted_proxy("203.0.113.1".parse().unwrap()));
}
#[test]
fn test_get_source_ip_raw_with_trusted_proxy() {
let mut headers = HeaderMap::new();
headers.insert("x-forwarded-for", HeaderValue::from_static("203.0.113.1"));
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.1/32".to_string()], true, 5);
let peer_addr: IpAddr = "192.168.1.1".parse().unwrap();
let result = get_source_ip_raw(&headers, peer_addr, &trusted_proxies);
assert_eq!(result, "203.0.113.1");
}
#[test]
fn test_get_source_ip_raw_with_untrusted_proxy() {
let mut headers = HeaderMap::new();
headers.insert("x-forwarded-for", HeaderValue::from_static("203.0.113.1"));
let trusted_proxies = TrustedProxies::new(vec![], true, 5);
let peer_addr: IpAddr = "203.0.113.2".parse().unwrap();
let result = get_source_ip_raw(&headers, peer_addr, &trusted_proxies);
assert_eq!(result, "203.0.113.2"); // Should use peer_addr
}
#[test]
fn test_get_source_ip_raw_legacy_mode() {
fn test_get_source_ip_raw() {
let headers = create_test_headers();
let trusted_proxies = TrustedProxies::new(vec![], false, 5); // Disabled validation
let peer_addr: IpAddr = "127.0.0.1".parse().unwrap();
let result = get_source_ip_raw(&headers, peer_addr, &trusted_proxies);
assert_eq!(result, "192.168.1.1"); // Should use header IP
let remote_addr = "127.0.0.1:8080";
let result = get_source_ip_raw(&headers, remote_addr);
assert_eq!(result, "192.168.1.1");
}
#[test]
fn test_get_source_ip() {
let headers = create_test_headers();
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.1/32".to_string()], true, 5);
let peer_addr: IpAddr = "192.168.1.1".parse().unwrap();
let result = get_source_ip(&headers, peer_addr, &trusted_proxies);
let remote_addr = "127.0.0.1:8080";
let result = get_source_ip(&headers, remote_addr);
assert_eq!(result, "192.168.1.1");
}
@@ -390,32 +229,8 @@ mod tests {
fn test_get_source_ip_ipv6() {
let mut headers = HeaderMap::new();
headers.insert("x-forwarded-for", HeaderValue::from_static("2001:db8::1"));
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.1/32".to_string()], true, 5);
let peer_addr: IpAddr = "192.168.1.1".parse().unwrap();
let result = get_source_ip(&headers, peer_addr, &trusted_proxies);
let remote_addr = "127.0.0.1:8080";
let result = get_source_ip(&headers, remote_addr);
assert_eq!(result, "[2001:db8::1]");
}
#[test]
fn test_is_valid_client_ip() {
// Valid public IPs
assert!(is_valid_client_ip("203.0.113.1", 5));
assert!(is_valid_client_ip("2001:db8::1", 5));
// Invalid private IPs
assert!(!is_valid_client_ip("192.168.1.1", 5));
assert!(!is_valid_client_ip("10.0.0.1", 5));
assert!(!is_valid_client_ip("127.0.0.1", 5));
// Valid chain
assert!(is_valid_client_ip("203.0.113.1, 198.51.100.1", 5));
// Invalid chain (too long)
assert!(!is_valid_client_ip(
"203.0.113.1, 198.51.100.1, 192.0.2.1, 192.0.2.2, 192.0.2.3, 192.0.2.4",
5
));
}
}

View File

@@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.76
version: 0.0.77
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.0.0-alpha.73"
appVersion: "1.0.0-alpha.77"

View File

@@ -66,7 +66,7 @@ spec:
- -c
- |
mkdir -p /data /logs
chown -R 10001:10001 /data /logs
chown 10001:10001 /data /logs
volumeMounts:
- name: data
mountPath: /data

View File

@@ -87,7 +87,7 @@ spec:
mkdir -p /data
fi
mkdir -p {{ $logDir }}
chown -R 10001:10001 /data {{ $logDir }}
chown 10001:10001 /data {{ $logDir }}
volumeMounts:
{{- if eq (int .Values.replicaCount) 4 }}
{{- range $i := until (int .Values.replicaCount) }}

View File

@@ -329,14 +329,14 @@ async fn run(opt: config::Opt) -> Result<()> {
init_update_check();
info!(target: "rustfs::main::run","server started successfully at {}", &server_address);
// 4. Mark as Full Ready now that critical components are warm
readiness.mark_stage(SystemStage::FullReady);
println!(
"RustFS server started successfully at {}, current time: {}",
&server_address,
chrono::offset::Utc::now().to_string()
);
info!(target: "rustfs::main::run","server started successfully at {}", &server_address);
// 4. Mark as Full Ready now that critical components are warm
readiness.mark_stage(SystemStage::FullReady);
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;

View File

@@ -45,6 +45,11 @@ pub(crate) const CONSOLE_PREFIX: &str = "/rustfs/console";
/// This prefix is used for endpoints that handle remote procedure calls (RPC).
pub(crate) const RPC_PREFIX: &str = "/rustfs/rpc";
/// Predefined gRPC service prefix for RustFS server.
/// This prefix is used for gRPC service endpoints.
/// For example, the full gRPC method path would be "/node_service.NodeService/MethodName".
pub(crate) const TONIC_PREFIX: &str = "/node_service.NodeService";
/// LOGO art for RustFS server.
pub(crate) const LOGO: &str = r#"

View File

@@ -23,6 +23,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::{Layer, Service};
use tracing::debug;
/// ReadinessGateLayer ensures that the system components (IAM, Storage)
/// are fully initialized before allowing any request to proceed.
@@ -88,6 +89,7 @@ where
let readiness = self.readiness.clone();
Box::pin(async move {
let path = req.uri().path();
debug!("ReadinessGateService: Received request for path: {}", path);
// 1) Exact match: fixed probe/resource path
let is_exact_probe = matches!(
path,
@@ -101,7 +103,8 @@ where
let is_prefix_probe = path.starts_with(crate::server::RUSTFS_ADMIN_PREFIX)
|| path.starts_with(crate::server::CONSOLE_PREFIX)
|| path.starts_with(crate::server::RPC_PREFIX)
|| path.starts_with(crate::server::ADMIN_PREFIX);
|| path.starts_with(crate::server::ADMIN_PREFIX)
|| path.starts_with(crate::server::TONIC_PREFIX);
let is_probe = is_exact_probe || is_prefix_probe;
if !is_probe && !readiness.is_ready() {

View File

@@ -251,6 +251,8 @@ struct IoLoadMetrics {
recent_waits: Vec<Duration>,
/// Maximum samples to keep in the window
max_samples: usize,
/// The earliest record index in the recent_waits vector
earliest_index: usize,
/// Total wait time observed (for averaging)
total_wait_ns: AtomicU64,
/// Total number of observations
@@ -263,6 +265,7 @@ impl IoLoadMetrics {
Self {
recent_waits: Vec::with_capacity(max_samples),
max_samples,
earliest_index: 0,
total_wait_ns: AtomicU64::new(0),
observation_count: AtomicU64::new(0),
}
@@ -271,10 +274,12 @@ impl IoLoadMetrics {
/// Record a new permit wait observation
fn record(&mut self, wait: Duration) {
// Add to recent waits (with eviction if full)
if self.recent_waits.len() >= self.max_samples {
self.recent_waits.remove(0);
if self.recent_waits.len() < self.max_samples {
self.recent_waits.push(wait);
} else {
self.recent_waits[self.earliest_index] = wait;
self.earliest_index = (self.earliest_index + 1) % self.max_samples;
}
self.recent_waits.push(wait);
// Update totals for overall statistics
self.total_wait_ns.fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
@@ -1867,4 +1872,154 @@ mod tests {
assert!(manager.is_cached("warm2").await);
assert!(manager.is_cached("warm3").await);
}
#[test]
fn test_io_load_metrics_record_not_full() {
let mut metrics = IoLoadMetrics::new(5);
metrics.record(Duration::from_millis(10));
metrics.record(Duration::from_millis(20));
assert_eq!(metrics.recent_waits.len(), 2);
assert_eq!(metrics.recent_waits[0], Duration::from_millis(10));
assert_eq!(metrics.recent_waits[1], Duration::from_millis(20));
assert_eq!(metrics.observation_count(), 2);
}
#[test]
fn test_io_load_metrics_record_full_and_circular() {
let mut metrics = IoLoadMetrics::new(3);
metrics.record(Duration::from_millis(10));
metrics.record(Duration::from_millis(20));
metrics.record(Duration::from_millis(30));
assert_eq!(metrics.recent_waits.len(), 3);
assert_eq!(metrics.earliest_index, 0);
// This should overwrite the first element (10ms)
metrics.record(Duration::from_millis(40));
assert_eq!(metrics.recent_waits.len(), 3);
assert_eq!(metrics.recent_waits[0], Duration::from_millis(40));
assert_eq!(metrics.recent_waits[1], Duration::from_millis(20));
assert_eq!(metrics.recent_waits[2], Duration::from_millis(30));
assert_eq!(metrics.earliest_index, 1);
assert_eq!(metrics.observation_count(), 4);
// This should overwrite the second element (20ms)
metrics.record(Duration::from_millis(50));
assert_eq!(metrics.recent_waits.len(), 3);
assert_eq!(metrics.recent_waits[0], Duration::from_millis(40));
assert_eq!(metrics.recent_waits[1], Duration::from_millis(50));
assert_eq!(metrics.recent_waits[2], Duration::from_millis(30));
assert_eq!(metrics.earliest_index, 2);
assert_eq!(metrics.observation_count(), 5);
// This should overwrite the third element (30ms)
metrics.record(Duration::from_millis(60));
assert_eq!(metrics.recent_waits.len(), 3);
assert_eq!(metrics.recent_waits[0], Duration::from_millis(40));
assert_eq!(metrics.recent_waits[1], Duration::from_millis(50));
assert_eq!(metrics.recent_waits[2], Duration::from_millis(60));
assert_eq!(metrics.earliest_index, 0);
assert_eq!(metrics.observation_count(), 6);
}
#[test]
fn test_io_load_metrics_average_wait() {
let mut metrics = IoLoadMetrics::new(3);
metrics.record(Duration::from_millis(10));
metrics.record(Duration::from_millis(20));
metrics.record(Duration::from_millis(30));
assert_eq!(metrics.average_wait(), Duration::from_millis(20));
// Overwrite 10ms with 40ms, new avg = (20+30+40)/3 = 30
metrics.record(Duration::from_millis(40));
assert_eq!(metrics.average_wait(), Duration::from_millis(30));
}
#[test]
fn test_io_load_metrics_max_wait() {
let mut metrics = IoLoadMetrics::new(3);
assert_eq!(metrics.max_wait(), Duration::ZERO);
metrics.record(Duration::from_millis(40));
metrics.record(Duration::from_millis(30));
metrics.record(Duration::from_millis(20));
assert_eq!(metrics.max_wait(), Duration::from_millis(40));
// Overwrite 40ms with 5ms, max should still be 30
metrics.record(Duration::from_millis(5));
assert_eq!(metrics.max_wait(), Duration::from_millis(30));
// Overwrite 30ms with 10ms
metrics.record(Duration::from_millis(10));
assert_eq!(metrics.max_wait(), Duration::from_millis(20));
}
#[test]
fn test_io_load_metrics_p95_wait() {
let mut metrics = IoLoadMetrics::new(20);
for i in 1..=20 {
metrics.record(Duration::from_millis(i * 5)); // 5, 10, ..., 100
}
assert_eq!(metrics.p95_wait(), Duration::from_millis(100));
// Test with different values
let mut metrics = IoLoadMetrics::new(10);
metrics.record(Duration::from_millis(10));
metrics.record(Duration::from_millis(20));
metrics.record(Duration::from_millis(30));
metrics.record(Duration::from_millis(40));
metrics.record(Duration::from_millis(50));
metrics.record(Duration::from_millis(60));
metrics.record(Duration::from_millis(70));
metrics.record(Duration::from_millis(80));
metrics.record(Duration::from_millis(90));
metrics.record(Duration::from_millis(1000)); // outlier
assert_eq!(metrics.p95_wait(), Duration::from_millis(1000));
}
#[test]
fn test_io_load_metrics_smoothed_load_level() {
let mut metrics = IoLoadMetrics::new(3);
// Average is low
metrics.record(Duration::from_millis(5));
metrics.record(Duration::from_millis(8));
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Low);
// Average is medium
metrics.record(Duration::from_millis(40)); // avg = (5+8+40)/3 = 17.6 -> Medium
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Medium);
// Average is High
metrics.record(Duration::from_millis(100)); // avg = (8+40+100)/3 = 49.3 -> Medium
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Medium);
metrics.record(Duration::from_millis(100)); // avg = (40+100+100)/3 = 80 -> High
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::High);
// Average is Critical
metrics.record(Duration::from_millis(300)); // avg = (100+100+300)/3 = 166.6 -> High
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::High);
metrics.record(Duration::from_millis(300)); // avg = (100+300+300)/3 = 233.3 -> Critical
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Critical);
}
#[test]
fn test_io_load_metrics_lifetime_average() {
let mut metrics = IoLoadMetrics::new(2);
metrics.record(Duration::from_millis(10));
metrics.record(Duration::from_millis(20));
// total = 30, count = 2, avg = 15
assert_eq!(metrics.lifetime_average_wait(), Duration::from_millis(15));
metrics.record(Duration::from_millis(30)); // recent=(20, 30), but lifetime avg is over all records
// total = 10+20+30=60, count = 3, avg = 20
let total_ns = metrics.total_wait_ns.load(Ordering::Relaxed);
let count = metrics.observation_count.load(Ordering::Relaxed);
assert_eq!(total_ns, 60_000_000);
assert_eq!(count, 3);
assert_eq!(metrics.lifetime_average_wait(), Duration::from_millis(20));
metrics.record(Duration::from_millis(40));
// total = 60+40=100, count = 4, avg = 25
assert_eq!(metrics.lifetime_average_wait(), Duration::from_millis(25));
}
}

View File

@@ -66,8 +66,8 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
#export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
export RUSTFS_OBS_ENVIRONMENT=production # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=warn # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOG_STDOUT_ENABLED=false # Whether to enable local stdout logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"