mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
4 Commits
fix/issue-
...
sourceip-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ac9143165 | ||
|
|
a5695f35d2 | ||
|
|
cd9b5ad3f3 | ||
|
|
04833f35cf |
260
.github/workflows/e2e-mint.yml
vendored
Normal file
260
.github/workflows/e2e-mint.yml
vendored
Normal file
@@ -0,0 +1,260 @@
|
||||
# Copyright 2024 RustFS Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name: e2e-mint
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
paths:
|
||||
- ".github/workflows/e2e-mint.yml"
|
||||
- "Dockerfile.source"
|
||||
- "rustfs/**"
|
||||
- "crates/**"
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
run-multi:
|
||||
description: "Run multi-node Mint as well"
|
||||
required: false
|
||||
default: "false"
|
||||
|
||||
env:
|
||||
ACCESS_KEY: rustfsadmin
|
||||
SECRET_KEY: rustfsadmin
|
||||
RUST_LOG: info
|
||||
PLATFORM: linux/amd64
|
||||
|
||||
jobs:
|
||||
mint-single:
|
||||
runs-on: ubicloud-standard-2
|
||||
timeout-minutes: 40
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Enable buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Build RustFS image (source)
|
||||
run: |
|
||||
DOCKER_BUILDKIT=1 docker buildx build --load \
|
||||
--platform ${PLATFORM} \
|
||||
-t rustfs-ci \
|
||||
-f Dockerfile.source .
|
||||
|
||||
- name: Create network
|
||||
run: |
|
||||
docker network inspect rustfs-net >/dev/null 2>&1 || docker network create rustfs-net
|
||||
|
||||
- name: Remove existing rustfs-single (if any)
|
||||
run: docker rm -f rustfs-single >/dev/null 2>&1 || true
|
||||
|
||||
- name: Start single RustFS
|
||||
run: |
|
||||
docker run -d --name rustfs-single \
|
||||
--network rustfs-net \
|
||||
-e RUSTFS_ADDRESS=0.0.0.0:9000 \
|
||||
-e RUSTFS_ACCESS_KEY=$ACCESS_KEY \
|
||||
-e RUSTFS_SECRET_KEY=$SECRET_KEY \
|
||||
-e RUSTFS_VOLUMES="/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3" \
|
||||
-v /tmp/rustfs-single:/data \
|
||||
rustfs-ci
|
||||
|
||||
- name: Wait for RustFS ready
|
||||
run: |
|
||||
for i in {1..30}; do
|
||||
if docker exec rustfs-single curl -sf http://localhost:9000/health >/dev/null; then
|
||||
exit 0
|
||||
fi
|
||||
sleep 2
|
||||
done
|
||||
echo "RustFS did not become ready" >&2
|
||||
docker logs rustfs-single || true
|
||||
exit 1
|
||||
|
||||
- name: Run Mint (single, S3-only)
|
||||
run: |
|
||||
mkdir -p artifacts/mint-single
|
||||
docker run --rm --network rustfs-net \
|
||||
--platform ${PLATFORM} \
|
||||
-e SERVER_ENDPOINT=rustfs-single:9000 \
|
||||
-e ACCESS_KEY=$ACCESS_KEY \
|
||||
-e SECRET_KEY=$SECRET_KEY \
|
||||
-e ENABLE_HTTPS=0 \
|
||||
-e SERVER_REGION=us-east-1 \
|
||||
-e RUN_ON_FAIL=1 \
|
||||
-e MINT_MODE=core \
|
||||
-v ${GITHUB_WORKSPACE}/artifacts/mint-single:/mint/log \
|
||||
--entrypoint /mint/mint.sh \
|
||||
minio/mint:edge \
|
||||
awscli aws-sdk-go aws-sdk-java-v2 aws-sdk-php aws-sdk-ruby s3cmd s3select
|
||||
|
||||
- name: Collect RustFS logs
|
||||
run: |
|
||||
mkdir -p artifacts/rustfs-single
|
||||
docker logs rustfs-single > artifacts/rustfs-single/rustfs.log || true
|
||||
|
||||
- name: Upload artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: mint-single
|
||||
path: artifacts/**
|
||||
|
||||
mint-multi:
|
||||
if: github.event_name == 'workflow_dispatch' && github.event.inputs.run-multi == 'true'
|
||||
needs: mint-single
|
||||
runs-on: ubicloud-standard-2
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Enable buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Build RustFS image (source)
|
||||
run: |
|
||||
DOCKER_BUILDKIT=1 docker buildx build --load \
|
||||
--platform ${PLATFORM} \
|
||||
-t rustfs-ci \
|
||||
-f Dockerfile.source .
|
||||
|
||||
- name: Prepare cluster compose
|
||||
run: |
|
||||
cat > compose.yml <<'EOF'
|
||||
version: '3.8'
|
||||
services:
|
||||
rustfs1:
|
||||
image: rustfs-ci
|
||||
hostname: rustfs1
|
||||
networks: [rustfs-net]
|
||||
environment:
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
|
||||
- RUSTFS_SECRET_KEY=${SECRET_KEY}
|
||||
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
|
||||
volumes:
|
||||
- rustfs1-data:/data
|
||||
rustfs2:
|
||||
image: rustfs-ci
|
||||
hostname: rustfs2
|
||||
networks: [rustfs-net]
|
||||
environment:
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
|
||||
- RUSTFS_SECRET_KEY=${SECRET_KEY}
|
||||
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
|
||||
volumes:
|
||||
- rustfs2-data:/data
|
||||
rustfs3:
|
||||
image: rustfs-ci
|
||||
hostname: rustfs3
|
||||
networks: [rustfs-net]
|
||||
environment:
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
|
||||
- RUSTFS_SECRET_KEY=${SECRET_KEY}
|
||||
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
|
||||
volumes:
|
||||
- rustfs3-data:/data
|
||||
rustfs4:
|
||||
image: rustfs-ci
|
||||
hostname: rustfs4
|
||||
networks: [rustfs-net]
|
||||
environment:
|
||||
- RUSTFS_ADDRESS=0.0.0.0:9000
|
||||
- RUSTFS_ACCESS_KEY=${ACCESS_KEY}
|
||||
- RUSTFS_SECRET_KEY=${SECRET_KEY}
|
||||
- RUSTFS_VOLUMES=/data/rustfs0 /data/rustfs1 /data/rustfs2 /data/rustfs3
|
||||
volumes:
|
||||
- rustfs4-data:/data
|
||||
lb:
|
||||
image: haproxy:2.9
|
||||
hostname: lb
|
||||
networks: [rustfs-net]
|
||||
ports:
|
||||
- "9000:9000"
|
||||
volumes:
|
||||
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
|
||||
networks:
|
||||
rustfs-net:
|
||||
name: rustfs-net
|
||||
volumes:
|
||||
rustfs1-data:
|
||||
rustfs2-data:
|
||||
rustfs3-data:
|
||||
rustfs4-data:
|
||||
EOF
|
||||
|
||||
cat > haproxy.cfg <<'EOF'
|
||||
defaults
|
||||
mode http
|
||||
timeout connect 5s
|
||||
timeout client 30s
|
||||
timeout server 30s
|
||||
|
||||
frontend fe_s3
|
||||
bind *:9000
|
||||
default_backend be_s3
|
||||
|
||||
backend be_s3
|
||||
balance roundrobin
|
||||
server s1 rustfs1:9000 check
|
||||
server s2 rustfs2:9000 check
|
||||
server s3 rustfs3:9000 check
|
||||
server s4 rustfs4:9000 check
|
||||
EOF
|
||||
|
||||
- name: Launch cluster
|
||||
run: docker compose -f compose.yml up -d
|
||||
|
||||
- name: Wait for LB ready
|
||||
run: |
|
||||
for i in {1..60}; do
|
||||
if docker run --rm --network rustfs-net curlimages/curl -sf http://lb:9000/health >/dev/null; then
|
||||
exit 0
|
||||
fi
|
||||
sleep 2
|
||||
done
|
||||
echo "LB or backend not ready" >&2
|
||||
docker compose -f compose.yml logs --tail=200 || true
|
||||
exit 1
|
||||
|
||||
- name: Run Mint (multi, S3-only)
|
||||
run: |
|
||||
mkdir -p artifacts/mint-multi
|
||||
docker run --rm --network rustfs-net \
|
||||
--platform ${PLATFORM} \
|
||||
-e SERVER_ENDPOINT=lb:9000 \
|
||||
-e ACCESS_KEY=$ACCESS_KEY \
|
||||
-e SECRET_KEY=$SECRET_KEY \
|
||||
-e ENABLE_HTTPS=0 \
|
||||
-e SERVER_REGION=us-east-1 \
|
||||
-e RUN_ON_FAIL=1 \
|
||||
-e MINT_MODE=core \
|
||||
-v ${GITHUB_WORKSPACE}/artifacts/mint-multi:/mint/log \
|
||||
--entrypoint /mint/mint.sh \
|
||||
minio/mint:edge \
|
||||
awscli aws-sdk-go aws-sdk-java-v2 aws-sdk-php aws-sdk-ruby s3cmd s3select
|
||||
|
||||
- name: Collect logs
|
||||
run: |
|
||||
mkdir -p artifacts/cluster
|
||||
docker compose -f compose.yml logs --no-color > artifacts/cluster/cluster.log || true
|
||||
|
||||
- name: Upload artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: mint-multi
|
||||
path: artifacts/**
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7658,6 +7658,7 @@ dependencies = [
|
||||
"hmac 0.13.0-rc.3",
|
||||
"http 1.4.0",
|
||||
"hyper 1.8.1",
|
||||
"ipnet",
|
||||
"libc",
|
||||
"local-ip-address",
|
||||
"lz4",
|
||||
|
||||
@@ -122,6 +122,7 @@ tonic-prost = { version = "0.14.2" }
|
||||
tonic-prost-build = { version = "0.14.2" }
|
||||
tower = { version = "0.5.2", features = ["timeout"] }
|
||||
tower-http = { version = "0.6.8", features = ["cors"] }
|
||||
ipnet = "2.11.0"
|
||||
|
||||
# Serialization and Data Formats
|
||||
bytes = { version = "1.11.0", features = ["serde"] }
|
||||
|
||||
@@ -148,8 +148,8 @@ ENV RUSTFS_ADDRESS=":9000" \
|
||||
RUSTFS_OBS_LOG_DIRECTORY="/logs" \
|
||||
RUSTFS_USERNAME="rustfs" \
|
||||
RUSTFS_GROUPNAME="rustfs" \
|
||||
RUSTFS_UID="10001" \
|
||||
RUSTFS_GID="10001"
|
||||
RUSTFS_UID="1000" \
|
||||
RUSTFS_GID="1000"
|
||||
|
||||
# Note: We don't COPY source here because we expect it to be mounted at /app
|
||||
# We rely on cargo run to build and run
|
||||
@@ -187,8 +187,8 @@ RUN set -eux; \
|
||||
|
||||
# Create a conventional runtime user/group (final switch happens in entrypoint via chroot --userspec)
|
||||
RUN set -eux; \
|
||||
groupadd -g 10001 rustfs; \
|
||||
useradd -u 10001 -g rustfs -M -s /usr/sbin/nologin rustfs
|
||||
groupadd -g 1000 rustfs; \
|
||||
useradd -u 1000 -g rustfs -M -s /usr/sbin/nologin rustfs
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
@@ -212,8 +212,8 @@ ENV RUSTFS_ADDRESS=":9000" \
|
||||
RUST_LOG="warn" \
|
||||
RUSTFS_USERNAME="rustfs" \
|
||||
RUSTFS_GROUPNAME="rustfs" \
|
||||
RUSTFS_UID="10001" \
|
||||
RUSTFS_GID="10001"
|
||||
RUSTFS_UID="1000" \
|
||||
RUSTFS_GID="1000"
|
||||
|
||||
EXPOSE 9000
|
||||
VOLUME ["/data"]
|
||||
|
||||
@@ -461,129 +461,3 @@ async fn test_vault_kms_key_crud(
|
||||
info!("Vault KMS key CRUD operations completed successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test uploading a large file (triggering multipart) with checksums using Vault KMS.
|
||||
/// This reproduces issue #1233 where decrypt was not implemented.
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_vault_large_file_upload_with_checksum() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
info!("Starting Vault KMS Large File Upload Test (Issue #1233)");
|
||||
|
||||
let context = VaultKmsTestContext::new().await?;
|
||||
let s3_client = context.s3_client();
|
||||
|
||||
context
|
||||
.base_env()
|
||||
.create_test_bucket(TEST_BUCKET)
|
||||
.await
|
||||
.expect("Failed to create test bucket");
|
||||
|
||||
// Enable default encryption on the bucket to ensure KMS is used
|
||||
let _ = s3_client
|
||||
.put_bucket_encryption()
|
||||
.bucket(TEST_BUCKET)
|
||||
.server_side_encryption_configuration(
|
||||
aws_sdk_s3::types::ServerSideEncryptionConfiguration::builder()
|
||||
.rules(
|
||||
aws_sdk_s3::types::ServerSideEncryptionRule::builder()
|
||||
.apply_server_side_encryption_by_default(
|
||||
aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
|
||||
.sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::Aes256)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
// Create a 17MB file (just over the default multipart threshold if it were lower,
|
||||
// but here we force multipart or just rely on size.
|
||||
// The issue report said 17MB triggers it.
|
||||
let size = 17 * 1024 * 1024;
|
||||
let data = vec![0u8; size];
|
||||
let key = "large-file-17mb";
|
||||
|
||||
info!("Uploading 17MB file with checksum...");
|
||||
|
||||
// We use high-level upload_part or just put_object if the client handles it.
|
||||
// However, to strictly reproduce "multipart upload", we should use multipart API explicitly
|
||||
// or rely on the client's auto-multipart. aws-sdk-s3 doesn't auto-multipart on put_object.
|
||||
// But the issue mentioned `mc cp` which does.
|
||||
// Here we will manually do a multipart upload to ensure we hit the code path.
|
||||
|
||||
let create_multipart = s3_client
|
||||
.create_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
.key(key)
|
||||
.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Sha256)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let upload_id = create_multipart.upload_id().unwrap();
|
||||
|
||||
// Upload part 1 (10MB)
|
||||
let part1_data = &data[0..10 * 1024 * 1024];
|
||||
let part1 = s3_client
|
||||
.upload_part()
|
||||
.bucket(TEST_BUCKET)
|
||||
.key(key)
|
||||
.upload_id(upload_id)
|
||||
.part_number(1)
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from(part1_data.to_vec()))
|
||||
.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Sha256)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
// Upload part 2 (7MB)
|
||||
let part2_data = &data[10 * 1024 * 1024..];
|
||||
let part2 = s3_client
|
||||
.upload_part()
|
||||
.bucket(TEST_BUCKET)
|
||||
.key(key)
|
||||
.upload_id(upload_id)
|
||||
.part_number(2)
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from(part2_data.to_vec()))
|
||||
.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Sha256)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
// Complete multipart
|
||||
s3_client
|
||||
.complete_multipart_upload()
|
||||
.bucket(TEST_BUCKET)
|
||||
.key(key)
|
||||
.upload_id(upload_id)
|
||||
.multipart_upload(
|
||||
aws_sdk_s3::types::CompletedMultipartUpload::builder()
|
||||
.parts(
|
||||
aws_sdk_s3::types::CompletedPart::builder()
|
||||
.part_number(1)
|
||||
.e_tag(part1.e_tag().unwrap())
|
||||
.checksum_sha256(part1.checksum_sha256().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.parts(
|
||||
aws_sdk_s3::types::CompletedPart::builder()
|
||||
.part_number(2)
|
||||
.e_tag(part2.e_tag().unwrap())
|
||||
.checksum_sha256(part2.checksum_sha256().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
info!("✅ Successfully uploaded 17MB file with checksums using Vault KMS");
|
||||
|
||||
// Verify download
|
||||
let get = s3_client.get_object().bucket(TEST_BUCKET).key(key).send().await?;
|
||||
let downloaded_data = get.body.collect().await?.into_bytes();
|
||||
assert_eq!(downloaded_data.len(), size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -129,6 +129,15 @@ impl VaultKmsClient {
|
||||
Ok(general_purpose::STANDARD.encode(key_material))
|
||||
}
|
||||
|
||||
/// Decrypt key material
|
||||
async fn decrypt_key_material(&self, encrypted_material: &str) -> Result<Vec<u8>> {
|
||||
// For simplicity, we'll base64 decode the key material
|
||||
// In a production setup, you would use Vault's transit engine for decryption
|
||||
general_purpose::STANDARD
|
||||
.decode(encrypted_material)
|
||||
.map_err(|e| KmsError::cryptographic_error("decrypt", e.to_string()))
|
||||
}
|
||||
|
||||
/// Store key data in Vault
|
||||
async fn store_key_data(&self, key_id: &str, key_data: &VaultKeyData) -> Result<()> {
|
||||
let path = self.key_path(key_id);
|
||||
@@ -252,11 +261,14 @@ impl KmsClient for VaultKmsClient {
|
||||
|
||||
// Get the master key
|
||||
let key_data = self.get_key_data(&request.key_id).await?;
|
||||
let key_material = self.decrypt_key_material(&key_data.encrypted_key_material).await?;
|
||||
|
||||
// For consistency with generate_data_key and decrypt in this simple backend,
|
||||
// we return the plaintext as ciphertext.
|
||||
// This is a non-secure implementation as noted in other methods.
|
||||
let ciphertext = request.plaintext.clone();
|
||||
// For simplicity, we'll use a basic encryption approach
|
||||
// In practice, you'd use proper AEAD encryption
|
||||
let mut ciphertext = request.plaintext.clone();
|
||||
for (i, byte) in ciphertext.iter_mut().enumerate() {
|
||||
*byte ^= key_material[i % key_material.len()];
|
||||
}
|
||||
|
||||
Ok(EncryptResponse {
|
||||
ciphertext,
|
||||
@@ -266,12 +278,12 @@ impl KmsClient for VaultKmsClient {
|
||||
})
|
||||
}
|
||||
|
||||
async fn decrypt(&self, request: &DecryptRequest, _context: Option<&OperationContext>) -> Result<Vec<u8>> {
|
||||
async fn decrypt(&self, _request: &DecryptRequest, _context: Option<&OperationContext>) -> Result<Vec<u8>> {
|
||||
debug!("Decrypting data");
|
||||
|
||||
// Since generate_data_key and encrypt return plaintext as ciphertext,
|
||||
// we just return the ciphertext as is.
|
||||
Ok(request.ciphertext.clone())
|
||||
// For this simple implementation, we assume the key ID is embedded in the ciphertext metadata
|
||||
// In practice, you'd extract this from the ciphertext envelope
|
||||
Err(KmsError::invalid_operation("Decrypt not fully implemented for Vault backend"))
|
||||
}
|
||||
|
||||
async fn create_key(&self, key_id: &str, algorithm: &str, _context: Option<&OperationContext>) -> Result<MasterKey> {
|
||||
@@ -770,35 +782,4 @@ mod tests {
|
||||
// Test health check
|
||||
client.health_check().await.expect("Health check failed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_vault_decrypt_offline() {
|
||||
let config = VaultConfig {
|
||||
address: "http://127.0.0.1:8200".to_string(),
|
||||
auth_method: VaultAuthMethod::Token {
|
||||
token: "dev-only-token".to_string(),
|
||||
},
|
||||
kv_mount: "secret".to_string(),
|
||||
key_path_prefix: "rustfs/kms/keys".to_string(),
|
||||
mount_path: "transit".to_string(),
|
||||
namespace: None,
|
||||
tls: None,
|
||||
};
|
||||
|
||||
// This should succeed even without a running Vault server
|
||||
// as it only builds the client struct
|
||||
let client = VaultKmsClient::new(config).await.expect("Failed to create Vault client");
|
||||
|
||||
let plaintext = b"test-data-for-decrypt";
|
||||
let request = DecryptRequest {
|
||||
ciphertext: plaintext.to_vec(),
|
||||
encryption_context: Default::default(),
|
||||
grant_tokens: Vec::new(),
|
||||
};
|
||||
|
||||
// Decrypt should just return the ciphertext as plaintext (identity operation)
|
||||
// and should NOT make any network calls
|
||||
let result = client.decrypt(&request, None).await.expect("Decrypt failed");
|
||||
assert_eq!(result, plaintext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ tracing = { workspace = true }
|
||||
transform-stream = { workspace = true, optional = true }
|
||||
url = { workspace = true, optional = true }
|
||||
zstd = { workspace = true, optional = true }
|
||||
ipnet = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
@@ -92,5 +93,5 @@ hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:s
|
||||
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
|
||||
integration = [] # integration test features
|
||||
sys = ["dep:sysinfo"] # system information features
|
||||
http = ["dep:convert_case", "dep:http", "dep:regex"]
|
||||
http = ["dep:convert_case", "dep:http", "dep:regex", "dep:ipnet"] # http utilities
|
||||
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify", "http"] # all features
|
||||
|
||||
@@ -13,9 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use http::HeaderMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use regex::Regex;
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
@@ -45,6 +46,100 @@ fn is_xff_header_enabled() -> bool {
|
||||
== "on"
|
||||
}
|
||||
|
||||
/// TrustedProxies holds configuration for validating proxy sources
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TrustedProxies {
|
||||
/// List of trusted proxy IP networks (CIDR format)
|
||||
pub cidrs: Vec<IpNet>,
|
||||
/// Whether to enable proxy validation
|
||||
pub enable_validation: bool,
|
||||
/// Maximum allowed proxy chain length
|
||||
pub max_chain_length: usize,
|
||||
}
|
||||
|
||||
impl TrustedProxies {
|
||||
/// Create a new TrustedProxies configuration
|
||||
pub fn new(cidrs: Vec<String>, enable_validation: bool, max_chain_length: usize) -> Self {
|
||||
let cidrs = cidrs.into_iter().filter_map(|s| s.parse::<IpNet>().ok()).collect();
|
||||
Self {
|
||||
cidrs,
|
||||
enable_validation,
|
||||
max_chain_length,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if an IP address is within the trusted proxy ranges
|
||||
pub fn is_trusted_proxy(&self, ip: IpAddr) -> bool {
|
||||
if !self.enable_validation {
|
||||
return true; // Backward compatibility: trust all when disabled
|
||||
}
|
||||
self.cidrs.iter().any(|net| net.contains(&ip))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TrustedProxies {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cidrs: vec![],
|
||||
enable_validation: true,
|
||||
max_chain_length: 10,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate if an IP string represents a valid client IP
|
||||
/// Returns false for private/loopback addresses and invalid formats
|
||||
fn is_valid_client_ip(ip_str: &str, max_chain_length: usize) -> bool {
|
||||
// Handle X-Forwarded-For chains
|
||||
if ip_str.contains(',') {
|
||||
let parts: Vec<&str> = ip_str.split(',').map(|s| s.trim()).collect();
|
||||
if parts.len() > max_chain_length {
|
||||
return false;
|
||||
}
|
||||
// Validate each IP in the chain
|
||||
for part in parts {
|
||||
if !is_valid_single_ip(part) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
is_valid_single_ip(ip_str)
|
||||
}
|
||||
|
||||
/// Validate a single IP address string
|
||||
fn is_valid_single_ip(ip_str: &str) -> bool {
|
||||
match ip_str.parse::<IpAddr>() {
|
||||
Ok(ip) => {
|
||||
// Reject private and loopback addresses as client IPs
|
||||
// (they should come from trusted proxies only)
|
||||
!is_private(ip) && !ip.is_loopback()
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if an IP address is private
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `ip` - The IP address to check
|
||||
///
|
||||
/// # Returns
|
||||
/// A `bool` indicating whether the IP is private
|
||||
///
|
||||
|
||||
fn is_private(ip: IpAddr) -> bool {
|
||||
match ip {
|
||||
IpAddr::V4(ipv4) => ipv4.is_private(),
|
||||
IpAddr::V6(ipv6) => {
|
||||
// Check if it's in fc00::/7 (Unique Local Address)
|
||||
let octets = ipv6.octets();
|
||||
(octets[0] & 0xfe) == 0xfc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// GetSourceScheme retrieves the scheme from the X-Forwarded-Proto and RFC7239
|
||||
/// Forwarded headers (in that order).
|
||||
///
|
||||
@@ -147,18 +242,43 @@ pub fn get_source_ip_from_headers(headers: &HeaderMap) -> Option<String> {
|
||||
addr
|
||||
}
|
||||
|
||||
/// GetSourceIPRaw retrieves the IP from the request headers
|
||||
/// and falls back to remote_addr when necessary.
|
||||
/// however returns without bracketing.
|
||||
/// GetSourceIPRaw retrieves the IP from the request headers with trusted proxy validation
|
||||
/// and falls back to peer_addr when necessary.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `headers` - HTTP headers from the request
|
||||
/// * `remote_addr` - Remote address as a string
|
||||
/// * `peer_addr` - Peer IP address from the connection
|
||||
/// * `trusted_proxies` - Trusted proxy configuration
|
||||
///
|
||||
/// # Returns
|
||||
/// A `String` containing the source IP address
|
||||
/// A `String` containing the validated source IP address
|
||||
///
|
||||
pub fn get_source_ip_raw(headers: &HeaderMap, remote_addr: &str) -> String {
|
||||
pub fn get_source_ip_raw(headers: &HeaderMap, peer_addr: IpAddr, trusted_proxies: &TrustedProxies) -> String {
|
||||
// If validation is disabled, use legacy behavior for backward compatibility
|
||||
if !trusted_proxies.enable_validation {
|
||||
let remote_addr_str = peer_addr.to_string();
|
||||
return get_source_ip_raw_legacy(headers, &remote_addr_str);
|
||||
}
|
||||
|
||||
// Check if the direct connection is from a trusted proxy
|
||||
if trusted_proxies.is_trusted_proxy(peer_addr) {
|
||||
// Trusted proxy: try to get real client IP from headers
|
||||
if let Some(header_ip) = get_source_ip_from_headers(headers) {
|
||||
// Validate the IP from headers
|
||||
if is_valid_client_ip(&header_ip, trusted_proxies.max_chain_length) {
|
||||
return header_ip;
|
||||
}
|
||||
// If header IP is invalid, log warning and fall back to peer
|
||||
tracing::warn!("Invalid client IP in headers from trusted proxy {}: {}", peer_addr, header_ip);
|
||||
}
|
||||
}
|
||||
|
||||
// Untrusted source or no valid header IP: use connection peer address
|
||||
peer_addr.to_string()
|
||||
}
|
||||
|
||||
/// Legacy GetSourceIPRaw for backward compatibility when validation is disabled
|
||||
fn get_source_ip_raw_legacy(headers: &HeaderMap, remote_addr: &str) -> String {
|
||||
let addr = get_source_ip_from_headers(headers).unwrap_or_else(|| remote_addr.to_string());
|
||||
|
||||
// Default to remote address if headers not set.
|
||||
@@ -169,19 +289,20 @@ pub fn get_source_ip_raw(headers: &HeaderMap, remote_addr: &str) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
/// GetSourceIP retrieves the IP from the request headers
|
||||
/// and falls back to remote_addr when necessary.
|
||||
/// GetSourceIP retrieves the IP from the request headers with trusted proxy validation
|
||||
/// and falls back to peer_addr when necessary.
|
||||
/// It brackets IPv6 addresses.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `headers` - HTTP headers from the request
|
||||
/// * `remote_addr` - Remote address as a string
|
||||
/// * `peer_addr` - Peer IP address from the connection
|
||||
/// * `trusted_proxies` - Trusted proxy configuration
|
||||
///
|
||||
/// # Returns
|
||||
/// A `String` containing the source IP address, with IPv6 addresses bracketed
|
||||
///
|
||||
pub fn get_source_ip(headers: &HeaderMap, remote_addr: &str) -> String {
|
||||
let addr = get_source_ip_raw(headers, remote_addr);
|
||||
pub fn get_source_ip(headers: &HeaderMap, peer_addr: IpAddr, trusted_proxies: &TrustedProxies) -> String {
|
||||
let addr = get_source_ip_raw(headers, peer_addr, trusted_proxies);
|
||||
if addr.contains(':') { format!("[{addr}]") } else { addr }
|
||||
}
|
||||
|
||||
@@ -210,18 +331,58 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_source_ip_raw() {
|
||||
fn test_trusted_proxies_validation() {
|
||||
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.0/24".to_string(), "10.0.0.0/8".to_string()], true, 5);
|
||||
|
||||
// Trusted IPs
|
||||
assert!(trusted_proxies.is_trusted_proxy("192.168.1.1".parse().unwrap()));
|
||||
assert!(trusted_proxies.is_trusted_proxy("10.1.1.1".parse().unwrap()));
|
||||
|
||||
// Untrusted IPs
|
||||
assert!(!trusted_proxies.is_trusted_proxy("203.0.113.1".parse().unwrap()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_source_ip_raw_with_trusted_proxy() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-forwarded-for", HeaderValue::from_static("203.0.113.1"));
|
||||
|
||||
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.1/32".to_string()], true, 5);
|
||||
let peer_addr: IpAddr = "192.168.1.1".parse().unwrap();
|
||||
|
||||
let result = get_source_ip_raw(&headers, peer_addr, &trusted_proxies);
|
||||
assert_eq!(result, "203.0.113.1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_source_ip_raw_with_untrusted_proxy() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-forwarded-for", HeaderValue::from_static("203.0.113.1"));
|
||||
|
||||
let trusted_proxies = TrustedProxies::new(vec![], true, 5);
|
||||
let peer_addr: IpAddr = "203.0.113.2".parse().unwrap();
|
||||
|
||||
let result = get_source_ip_raw(&headers, peer_addr, &trusted_proxies);
|
||||
assert_eq!(result, "203.0.113.2"); // Should use peer_addr
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_source_ip_raw_legacy_mode() {
|
||||
let headers = create_test_headers();
|
||||
let remote_addr = "127.0.0.1:8080";
|
||||
let result = get_source_ip_raw(&headers, remote_addr);
|
||||
assert_eq!(result, "192.168.1.1");
|
||||
let trusted_proxies = TrustedProxies::new(vec![], false, 5); // Disabled validation
|
||||
let peer_addr: IpAddr = "127.0.0.1".parse().unwrap();
|
||||
|
||||
let result = get_source_ip_raw(&headers, peer_addr, &trusted_proxies);
|
||||
assert_eq!(result, "192.168.1.1"); // Should use header IP
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_source_ip() {
|
||||
let headers = create_test_headers();
|
||||
let remote_addr = "127.0.0.1:8080";
|
||||
let result = get_source_ip(&headers, remote_addr);
|
||||
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.1/32".to_string()], true, 5);
|
||||
let peer_addr: IpAddr = "192.168.1.1".parse().unwrap();
|
||||
|
||||
let result = get_source_ip(&headers, peer_addr, &trusted_proxies);
|
||||
assert_eq!(result, "192.168.1.1");
|
||||
}
|
||||
|
||||
@@ -229,8 +390,32 @@ mod tests {
|
||||
fn test_get_source_ip_ipv6() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-forwarded-for", HeaderValue::from_static("2001:db8::1"));
|
||||
let remote_addr = "127.0.0.1:8080";
|
||||
let result = get_source_ip(&headers, remote_addr);
|
||||
|
||||
let trusted_proxies = TrustedProxies::new(vec!["192.168.1.1/32".to_string()], true, 5);
|
||||
let peer_addr: IpAddr = "192.168.1.1".parse().unwrap();
|
||||
|
||||
let result = get_source_ip(&headers, peer_addr, &trusted_proxies);
|
||||
assert_eq!(result, "[2001:db8::1]");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_valid_client_ip() {
|
||||
// Valid public IPs
|
||||
assert!(is_valid_client_ip("203.0.113.1", 5));
|
||||
assert!(is_valid_client_ip("2001:db8::1", 5));
|
||||
|
||||
// Invalid private IPs
|
||||
assert!(!is_valid_client_ip("192.168.1.1", 5));
|
||||
assert!(!is_valid_client_ip("10.0.0.1", 5));
|
||||
assert!(!is_valid_client_ip("127.0.0.1", 5));
|
||||
|
||||
// Valid chain
|
||||
assert!(is_valid_client_ip("203.0.113.1, 198.51.100.1", 5));
|
||||
|
||||
// Invalid chain (too long)
|
||||
assert!(!is_valid_client_ip(
|
||||
"203.0.113.1, 198.51.100.1, 192.0.2.1, 192.0.2.2, 192.0.2.3, 192.0.2.4",
|
||||
5
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,10 +15,10 @@ type: application
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.0.77
|
||||
version: 0.0.76
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "1.0.0-alpha.77"
|
||||
appVersion: "1.0.0-alpha.73"
|
||||
|
||||
@@ -66,7 +66,7 @@ spec:
|
||||
- -c
|
||||
- |
|
||||
mkdir -p /data /logs
|
||||
chown 10001:10001 /data /logs
|
||||
chown -R 10001:10001 /data /logs
|
||||
volumeMounts:
|
||||
- name: data
|
||||
mountPath: /data
|
||||
|
||||
@@ -87,7 +87,7 @@ spec:
|
||||
mkdir -p /data
|
||||
fi
|
||||
mkdir -p {{ $logDir }}
|
||||
chown 10001:10001 /data {{ $logDir }}
|
||||
chown -R 10001:10001 /data {{ $logDir }}
|
||||
volumeMounts:
|
||||
{{- if eq (int .Values.replicaCount) 4 }}
|
||||
{{- range $i := until (int .Values.replicaCount) }}
|
||||
|
||||
@@ -329,14 +329,14 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
|
||||
init_update_check();
|
||||
|
||||
info!(target: "rustfs::main::run","server started successfully at {}", &server_address);
|
||||
// 4. Mark as Full Ready now that critical components are warm
|
||||
readiness.mark_stage(SystemStage::FullReady);
|
||||
println!(
|
||||
"RustFS server started successfully at {}, current time: {}",
|
||||
&server_address,
|
||||
chrono::offset::Utc::now().to_string()
|
||||
);
|
||||
info!(target: "rustfs::main::run","server started successfully at {}", &server_address);
|
||||
// 4. Mark as Full Ready now that critical components are warm
|
||||
readiness.mark_stage(SystemStage::FullReady);
|
||||
|
||||
// Perform hibernation for 1 second
|
||||
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
|
||||
|
||||
@@ -45,11 +45,6 @@ pub(crate) const CONSOLE_PREFIX: &str = "/rustfs/console";
|
||||
/// This prefix is used for endpoints that handle remote procedure calls (RPC).
|
||||
pub(crate) const RPC_PREFIX: &str = "/rustfs/rpc";
|
||||
|
||||
/// Predefined gRPC service prefix for RustFS server.
|
||||
/// This prefix is used for gRPC service endpoints.
|
||||
/// For example, the full gRPC method path would be "/node_service.NodeService/MethodName".
|
||||
pub(crate) const TONIC_PREFIX: &str = "/node_service.NodeService";
|
||||
|
||||
/// LOGO art for RustFS server.
|
||||
pub(crate) const LOGO: &str = r#"
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use tower::{Layer, Service};
|
||||
use tracing::debug;
|
||||
|
||||
/// ReadinessGateLayer ensures that the system components (IAM, Storage)
|
||||
/// are fully initialized before allowing any request to proceed.
|
||||
@@ -89,7 +88,6 @@ where
|
||||
let readiness = self.readiness.clone();
|
||||
Box::pin(async move {
|
||||
let path = req.uri().path();
|
||||
debug!("ReadinessGateService: Received request for path: {}", path);
|
||||
// 1) Exact match: fixed probe/resource path
|
||||
let is_exact_probe = matches!(
|
||||
path,
|
||||
@@ -103,8 +101,7 @@ where
|
||||
let is_prefix_probe = path.starts_with(crate::server::RUSTFS_ADMIN_PREFIX)
|
||||
|| path.starts_with(crate::server::CONSOLE_PREFIX)
|
||||
|| path.starts_with(crate::server::RPC_PREFIX)
|
||||
|| path.starts_with(crate::server::ADMIN_PREFIX)
|
||||
|| path.starts_with(crate::server::TONIC_PREFIX);
|
||||
|| path.starts_with(crate::server::ADMIN_PREFIX);
|
||||
|
||||
let is_probe = is_exact_probe || is_prefix_probe;
|
||||
if !is_probe && !readiness.is_ready() {
|
||||
|
||||
@@ -251,8 +251,6 @@ struct IoLoadMetrics {
|
||||
recent_waits: Vec<Duration>,
|
||||
/// Maximum samples to keep in the window
|
||||
max_samples: usize,
|
||||
/// The earliest record index in the recent_waits vector
|
||||
earliest_index: usize,
|
||||
/// Total wait time observed (for averaging)
|
||||
total_wait_ns: AtomicU64,
|
||||
/// Total number of observations
|
||||
@@ -265,7 +263,6 @@ impl IoLoadMetrics {
|
||||
Self {
|
||||
recent_waits: Vec::with_capacity(max_samples),
|
||||
max_samples,
|
||||
earliest_index: 0,
|
||||
total_wait_ns: AtomicU64::new(0),
|
||||
observation_count: AtomicU64::new(0),
|
||||
}
|
||||
@@ -274,12 +271,10 @@ impl IoLoadMetrics {
|
||||
/// Record a new permit wait observation
|
||||
fn record(&mut self, wait: Duration) {
|
||||
// Add to recent waits (with eviction if full)
|
||||
if self.recent_waits.len() < self.max_samples {
|
||||
self.recent_waits.push(wait);
|
||||
} else {
|
||||
self.recent_waits[self.earliest_index] = wait;
|
||||
self.earliest_index = (self.earliest_index + 1) % self.max_samples;
|
||||
if self.recent_waits.len() >= self.max_samples {
|
||||
self.recent_waits.remove(0);
|
||||
}
|
||||
self.recent_waits.push(wait);
|
||||
|
||||
// Update totals for overall statistics
|
||||
self.total_wait_ns.fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
|
||||
@@ -1872,154 +1867,4 @@ mod tests {
|
||||
assert!(manager.is_cached("warm2").await);
|
||||
assert!(manager.is_cached("warm3").await);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_record_not_full() {
|
||||
let mut metrics = IoLoadMetrics::new(5);
|
||||
metrics.record(Duration::from_millis(10));
|
||||
metrics.record(Duration::from_millis(20));
|
||||
assert_eq!(metrics.recent_waits.len(), 2);
|
||||
assert_eq!(metrics.recent_waits[0], Duration::from_millis(10));
|
||||
assert_eq!(metrics.recent_waits[1], Duration::from_millis(20));
|
||||
assert_eq!(metrics.observation_count(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_record_full_and_circular() {
|
||||
let mut metrics = IoLoadMetrics::new(3);
|
||||
metrics.record(Duration::from_millis(10));
|
||||
metrics.record(Duration::from_millis(20));
|
||||
metrics.record(Duration::from_millis(30));
|
||||
assert_eq!(metrics.recent_waits.len(), 3);
|
||||
assert_eq!(metrics.earliest_index, 0);
|
||||
|
||||
// This should overwrite the first element (10ms)
|
||||
metrics.record(Duration::from_millis(40));
|
||||
assert_eq!(metrics.recent_waits.len(), 3);
|
||||
assert_eq!(metrics.recent_waits[0], Duration::from_millis(40));
|
||||
assert_eq!(metrics.recent_waits[1], Duration::from_millis(20));
|
||||
assert_eq!(metrics.recent_waits[2], Duration::from_millis(30));
|
||||
assert_eq!(metrics.earliest_index, 1);
|
||||
assert_eq!(metrics.observation_count(), 4);
|
||||
|
||||
// This should overwrite the second element (20ms)
|
||||
metrics.record(Duration::from_millis(50));
|
||||
assert_eq!(metrics.recent_waits.len(), 3);
|
||||
assert_eq!(metrics.recent_waits[0], Duration::from_millis(40));
|
||||
assert_eq!(metrics.recent_waits[1], Duration::from_millis(50));
|
||||
assert_eq!(metrics.recent_waits[2], Duration::from_millis(30));
|
||||
assert_eq!(metrics.earliest_index, 2);
|
||||
assert_eq!(metrics.observation_count(), 5);
|
||||
|
||||
// This should overwrite the third element (30ms)
|
||||
metrics.record(Duration::from_millis(60));
|
||||
assert_eq!(metrics.recent_waits.len(), 3);
|
||||
assert_eq!(metrics.recent_waits[0], Duration::from_millis(40));
|
||||
assert_eq!(metrics.recent_waits[1], Duration::from_millis(50));
|
||||
assert_eq!(metrics.recent_waits[2], Duration::from_millis(60));
|
||||
assert_eq!(metrics.earliest_index, 0);
|
||||
assert_eq!(metrics.observation_count(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_average_wait() {
|
||||
let mut metrics = IoLoadMetrics::new(3);
|
||||
metrics.record(Duration::from_millis(10));
|
||||
metrics.record(Duration::from_millis(20));
|
||||
metrics.record(Duration::from_millis(30));
|
||||
assert_eq!(metrics.average_wait(), Duration::from_millis(20));
|
||||
|
||||
// Overwrite 10ms with 40ms, new avg = (20+30+40)/3 = 30
|
||||
metrics.record(Duration::from_millis(40));
|
||||
assert_eq!(metrics.average_wait(), Duration::from_millis(30));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_max_wait() {
|
||||
let mut metrics = IoLoadMetrics::new(3);
|
||||
assert_eq!(metrics.max_wait(), Duration::ZERO);
|
||||
metrics.record(Duration::from_millis(40));
|
||||
metrics.record(Duration::from_millis(30));
|
||||
metrics.record(Duration::from_millis(20));
|
||||
assert_eq!(metrics.max_wait(), Duration::from_millis(40));
|
||||
|
||||
// Overwrite 40ms with 5ms, max should still be 30
|
||||
metrics.record(Duration::from_millis(5));
|
||||
assert_eq!(metrics.max_wait(), Duration::from_millis(30));
|
||||
|
||||
// Overwrite 30ms with 10ms
|
||||
metrics.record(Duration::from_millis(10));
|
||||
assert_eq!(metrics.max_wait(), Duration::from_millis(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_p95_wait() {
|
||||
let mut metrics = IoLoadMetrics::new(20);
|
||||
for i in 1..=20 {
|
||||
metrics.record(Duration::from_millis(i * 5)); // 5, 10, ..., 100
|
||||
}
|
||||
assert_eq!(metrics.p95_wait(), Duration::from_millis(100));
|
||||
|
||||
// Test with different values
|
||||
let mut metrics = IoLoadMetrics::new(10);
|
||||
metrics.record(Duration::from_millis(10));
|
||||
metrics.record(Duration::from_millis(20));
|
||||
metrics.record(Duration::from_millis(30));
|
||||
metrics.record(Duration::from_millis(40));
|
||||
metrics.record(Duration::from_millis(50));
|
||||
metrics.record(Duration::from_millis(60));
|
||||
metrics.record(Duration::from_millis(70));
|
||||
metrics.record(Duration::from_millis(80));
|
||||
metrics.record(Duration::from_millis(90));
|
||||
metrics.record(Duration::from_millis(1000)); // outlier
|
||||
assert_eq!(metrics.p95_wait(), Duration::from_millis(1000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_smoothed_load_level() {
|
||||
let mut metrics = IoLoadMetrics::new(3);
|
||||
// Average is low
|
||||
metrics.record(Duration::from_millis(5));
|
||||
metrics.record(Duration::from_millis(8));
|
||||
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Low);
|
||||
|
||||
// Average is medium
|
||||
metrics.record(Duration::from_millis(40)); // avg = (5+8+40)/3 = 17.6 -> Medium
|
||||
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Medium);
|
||||
|
||||
// Average is High
|
||||
metrics.record(Duration::from_millis(100)); // avg = (8+40+100)/3 = 49.3 -> Medium
|
||||
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Medium);
|
||||
|
||||
metrics.record(Duration::from_millis(100)); // avg = (40+100+100)/3 = 80 -> High
|
||||
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::High);
|
||||
|
||||
// Average is Critical
|
||||
metrics.record(Duration::from_millis(300)); // avg = (100+100+300)/3 = 166.6 -> High
|
||||
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::High);
|
||||
|
||||
metrics.record(Duration::from_millis(300)); // avg = (100+300+300)/3 = 233.3 -> Critical
|
||||
assert_eq!(metrics.smoothed_load_level(), IoLoadLevel::Critical);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_io_load_metrics_lifetime_average() {
|
||||
let mut metrics = IoLoadMetrics::new(2);
|
||||
metrics.record(Duration::from_millis(10));
|
||||
metrics.record(Duration::from_millis(20));
|
||||
// total = 30, count = 2, avg = 15
|
||||
assert_eq!(metrics.lifetime_average_wait(), Duration::from_millis(15));
|
||||
|
||||
metrics.record(Duration::from_millis(30)); // recent=(20, 30), but lifetime avg is over all records
|
||||
// total = 10+20+30=60, count = 3, avg = 20
|
||||
let total_ns = metrics.total_wait_ns.load(Ordering::Relaxed);
|
||||
let count = metrics.observation_count.load(Ordering::Relaxed);
|
||||
assert_eq!(total_ns, 60_000_000);
|
||||
assert_eq!(count, 3);
|
||||
assert_eq!(metrics.lifetime_average_wait(), Duration::from_millis(20));
|
||||
|
||||
metrics.record(Duration::from_millis(40));
|
||||
// total = 60+40=100, count = 4, avg = 25
|
||||
assert_eq!(metrics.lifetime_average_wait(), Duration::from_millis(25));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,8 +66,8 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
|
||||
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
|
||||
#export RUSTFS_OBS_SERVICE_NAME=rustfs # Service name
|
||||
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
|
||||
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
|
||||
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
|
||||
export RUSTFS_OBS_ENVIRONMENT=production # Environment name
|
||||
export RUSTFS_OBS_LOGGER_LEVEL=warn # Log level, supports trace, debug, info, warn, error
|
||||
export RUSTFS_OBS_LOG_STDOUT_ENABLED=false # Whether to enable local stdout logging
|
||||
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
|
||||
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
|
||||
|
||||
Reference in New Issue
Block a user