improve code for metrics and switch tokio-tar to astral-tokio-tar (#705)

* improve code for metrics and switch tokio-tar to astral-tokio-tar

* remove log

* fix
This commit is contained in:
houseme
2025-10-24 13:07:56 +08:00
committed by GitHub
parent b47765b4c0
commit c5264f9703
10 changed files with 297 additions and 325 deletions

48
Cargo.lock generated
View File

@@ -431,6 +431,22 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "astral-tokio-tar"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec179a06c1769b1e42e1e2cbe74c7dcdb3d6383c838454d063eaac5bbb7ebbe5"
dependencies = [
"filetime",
"futures-core",
"libc",
"portable-atomic",
"rustc-hash",
"tokio",
"tokio-stream",
"xattr",
]
[[package]]
name = "async-channel"
version = "2.5.0"
@@ -4308,7 +4324,7 @@ checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb"
dependencies = [
"bitflags 2.10.0",
"libc",
"redox_syscall 0.5.18",
"redox_syscall",
]
[[package]]
@@ -5201,7 +5217,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.5.18",
"redox_syscall",
"smallvec",
"windows-link 0.2.1",
]
@@ -5916,15 +5932,6 @@ dependencies = [
"syn 2.0.108",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@@ -6258,6 +6265,7 @@ dependencies = [
name = "rustfs"
version = "0.0.5"
dependencies = [
"astral-tokio-tar",
"async-trait",
"atoi",
"atomic_enum",
@@ -6326,7 +6334,6 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.4",
"tokio-stream",
"tokio-tar",
"tokio-util",
"tonic",
"tower",
@@ -6916,10 +6923,10 @@ dependencies = [
name = "rustfs-zip"
version = "0.0.5"
dependencies = [
"astral-tokio-tar",
"async-compression",
"tokio",
"tokio-stream",
"tokio-tar",
]
[[package]]
@@ -8324,21 +8331,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tar"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75"
dependencies = [
"filetime",
"futures-core",
"libc",
"redox_syscall 0.3.5",
"tokio",
"tokio-stream",
"xattr",
]
[[package]]
name = "tokio-test"
version = "0.4.4"

View File

@@ -63,95 +63,134 @@ unsafe_code = "deny"
all = "warn"
[workspace.dependencies]
# RustFS Internal Crates
rustfs = { path = "./rustfs", version = "0.0.5" }
rustfs-ahm = { path = "crates/ahm", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-appauth = { path = "crates/appauth", version = "0.0.5" }
rustfs-audit = { path = "crates/audit", version = "0.0.5" }
rustfs-checksums = { path = "crates/checksums", version = "0.0.5" }
rustfs-common = { path = "crates/common", version = "0.0.5" }
rustfs-config = { path = "./crates/config", version = "0.0.5" }
rustfs-crypto = { path = "crates/crypto", version = "0.0.5" }
rustfs-ecstore = { path = "crates/ecstore", version = "0.0.5" }
rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" }
rustfs-iam = { path = "crates/iam", version = "0.0.5" }
rustfs-kms = { path = "crates/kms", version = "0.0.5" }
rustfs-lock = { path = "crates/lock", version = "0.0.5" }
rustfs-madmin = { path = "crates/madmin", version = "0.0.5" }
rustfs-mcp = { path = "crates/mcp", version = "0.0.5" }
rustfs-notify = { path = "crates/notify", version = "0.0.5" }
rustfs-obs = { path = "crates/obs", version = "0.0.5" }
rustfs-policy = { path = "crates/policy", version = "0.0.5" }
rustfs-protos = { path = "crates/protos", version = "0.0.5" }
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
rustfs = { path = "./rustfs", version = "0.0.5" }
rustfs-zip = { path = "./crates/zip", version = "0.0.5" }
rustfs-config = { path = "./crates/config", version = "0.0.5" }
rustfs-obs = { path = "crates/obs", version = "0.0.5" }
rustfs-notify = { path = "crates/notify", version = "0.0.5" }
rustfs-utils = { path = "crates/utils", version = "0.0.5" }
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
rustfs-checksums = { path = "crates/checksums", version = "0.0.5" }
rustfs-workers = { path = "crates/workers", version = "0.0.5" }
rustfs-mcp = { path = "crates/mcp", version = "0.0.5" }
rustfs-targets = { path = "crates/targets", version = "0.0.5" }
rustfs-kms = { path = "crates/kms", version = "0.0.5" }
aes-gcm = { version = "0.10.3", features = ["std"] }
anyhow = "1.0.100"
arc-swap = "1.7.1"
argon2 = { version = "0.5.3", features = ["std"] }
atoi = "2.0.0"
rustfs-utils = { path = "crates/utils", version = "0.0.5" }
rustfs-workers = { path = "crates/workers", version = "0.0.5" }
rustfs-zip = { path = "./crates/zip", version = "0.0.5" }
# Async Runtime and Networking
async-channel = "2.5.0"
async-compression = { version = "0.4.19" }
async-recursion = "1.1.1"
async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.8" }
aws-credential-types = { version = "1.2.8" }
aws-smithy-types = { version = "1.3.3" }
aws-sdk-s3 = { version = "1.108.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
axum = "0.8.6"
axum-extra = "0.10.3"
axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"], default-features = false }
base64-simd = "0.8.0"
base64 = "0.22.1"
brotli = "8.0.2"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
hyper = "1.7.0"
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "ring", "webpki-roots"] }
hyper-util = { version = "0.1.17", features = ["tokio", "server-auto", "server-graceful"] }
http = "1.3.1"
http-body = "1.0.1"
reqwest = { version = "0.12.24", default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
socket2 = "0.6.1"
tokio = { version = "1.48.0", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "ring"] }
tokio-stream = { version = "0.1.17" }
tokio-test = "0.4.4"
tokio-util = { version = "0.7.16", features = ["io", "compat"] }
tonic = { version = "0.14.2", features = ["gzip"] }
tonic-prost = { version = "0.14.2" }
tonic-prost-build = { version = "0.14.2" }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.6", features = ["cors"] }
# Serialization and Data Formats
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.1.0"
byteorder = "1.5.0"
cfg-if = "1.0.4"
convert_case = "0.8.0"
crc-fast = "1.3.0"
flatbuffers = "25.9.23"
form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.3"
rmcp = { version = "0.8.3" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.0.4"
# Cryptography and Security
aes-gcm = { version = "0.10.3", features = ["std"] }
argon2 = { version = "0.5.3", features = ["std"] }
blake3 = { version = "1.8.2" }
chacha20poly1305 = { version = "0.10.1" }
crc-fast = "1.3.0"
crc32c = "0.6.8"
crc32fast = "1.5.0"
crc64fast-nvme = "1.2.0"
hmac = "0.12.1"
jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] }
pbkdf2 = "0.12.2"
rsa = { version = "0.9.8" }
rustls = { version = "0.23.34", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pemfile = "2.2.0"
rustls-pki-types = "1.12.0"
sha1 = "0.10.6"
sha2 = "0.10.9"
zeroize = { version = "1.8.2", features = ["derive"] }
# Time and Date
chrono = { version = "0.4.42", features = ["serde"] }
humantime = "2.3.0"
time = { version = "0.3.44", features = ["std", "parsing", "formatting", "macros", "serde"] }
# Utilities and Tools
anyhow = "1.0.100"
arc-swap = "1.7.1"
astral-tokio-tar = "0.5.6"
atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.8" }
aws-credential-types = { version = "1.2.8" }
aws-sdk-s3 = { version = "1.108.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-smithy-types = { version = "1.3.3" }
base64 = "0.22.1"
base64-simd = "0.8.0"
brotli = "8.0.2"
cfg-if = "1.0.4"
clap = { version = "4.5.50", features = ["derive", "env"] }
const-str = { version = "0.7.0", features = ["std", "proc"] }
crc32fast = "1.5.0"
crc32c = "0.6.8"
crc64fast-nvme = "1.2.0"
convert_case = "0.8.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "50.2.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.9.23"
flate2 = "1.1.4"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
glob = "0.3.3"
hashbrown = { version = "0.16.0", features = ["serde", "rayon"] }
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
hmac = "0.12.1"
hyper = "1.7.0"
hyper-util = { version = "0.1.17", features = [
"tokio",
"server-auto",
"server-graceful",
] }
hyper-rustls = { version = "0.27.7", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "http2", "ring", "webpki-roots"] }
http = "1.3.1"
http-body = "1.0.1"
humantime = "2.3.0"
ipnetwork = { version = "0.21.1", features = ["serde"] }
jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] }
lazy_static = "1.5.0"
libc = "0.2.177"
libsystemd = { version = "0.7.2" }
@@ -171,93 +210,34 @@ num_cpus = { version = "1.17.0" }
nvml-wrapper = "0.11.0"
object_store = "0.12.4"
once_cell = "1.21.3"
opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = { version = "0.31.1", features = [
"experimental_use_tracing_span_context",
"experimental_metadata_attributes",
"spec_unstable_logs_enabled"
] }
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-stdout = { version = "0.31.0" }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = [
"grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"
] }
opentelemetry-semantic-conventions = { version = "0.31.0", features = [
"semconv_experimental",
] }
parking_lot = "0.12.5"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
blake3 = { version = "1.8.2" }
pbkdf2 = "0.12.2"
pin-project-lite = "0.2.16"
prost = "0.14.1"
pretty_assertions = "1.4.1"
quick-xml = "0.38.3"
rand = "0.9.2"
rayon = "1.11.0"
reed-solomon-simd = { version = "3.1.0" }
regex = { version = "1.12.2" }
reqwest = { version = "0.12.24", default-features = false, features = [
"rustls-tls-webpki-roots",
"charset",
"http2",
"system-proxy",
"stream",
"json",
"blocking",
] }
rmcp = { version = "0.8.3" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
rsa = { version = "0.9.8" }
rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.8.0" }
rustc-hash = { version = "2.1.1" }
rustls = { version = "0.23.34", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"
s3s = { version = "0.12.0-rc.3", features = ["minio"] }
schemars = "1.0.4"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
serial_test = "3.2.0"
sha1 = "0.10.6"
sha2 = "0.10.9"
shadow-rs = { version = "1.4.0", default-features = false }
siphasher = "1.0.1"
smallvec = { version = "1.15.1", features = ["serde"] }
smartstring = "1.0.1"
snafu = "0.8.9"
snap = "1.1.1"
socket2 = "0.6.1"
starshard = { version = "0.5.0", features = ["rayon", "async", "serde"] }
strum = { version = "0.27.2", features = ["derive"] }
sysinfo = "0.37.1"
sysctl = "0.7.1"
tempfile = "3.23.0"
sysinfo = "0.37.1"
temp-env = "0.3.6"
tempfile = "3.23.0"
test-case = "3.3.1"
thiserror = "2.0.17"
time = { version = "0.3.44", features = [
"std",
"parsing",
"formatting",
"macros",
"serde",
] }
tokio = { version = "1.48.0", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "ring"] }
tokio-stream = { version = "0.1.17" }
tokio-tar = "0.3.1"
tokio-test = "0.4.4"
tokio-util = { version = "0.7.16", features = ["io", "compat"] }
tonic = { version = "0.14.2", features = ["gzip"] }
tonic-prost = { version = "0.14.2" }
tonic-prost-build = { version = "0.14.2" }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.6", features = ["cors"] }
tracing = { version = "0.1.41" }
tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.0"
@@ -265,20 +245,24 @@ tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.7"
urlencoding = "2.1.3"
uuid = { version = "1.18.1", features = [
"v4",
"fast-rng",
"macro-diagnostics",
] }
uuid = { version = "1.18.1", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.5.0", features = ["serde"] }
zeroize = { version = "1.8.2", features = ["derive"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "6.0.0"
zstd = "0.13.3"
# Observability and Metrics
opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = { version = "0.31.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes", "spec_unstable_logs_enabled"] }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] }
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry-stdout = { version = "0.31.0" }
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rustfs-mcp", "tokio-test"]

View File

@@ -33,7 +33,7 @@ use tracing::info;
#[serial]
async fn test_comprehensive_kms_full_workflow() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🏁 开始KMS全功能综合测试");
info!("🏁 Start the KMS full-featured synthesis test");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -43,25 +43,25 @@ async fn test_comprehensive_kms_full_workflow() -> Result<(), Box<dyn std::error
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
// Phase 1: Test all single encryption types
info!("📋 阶段1: 测试所有单文件加密类型");
info!("📋 Phase 1: Test all single-file encryption types");
test_sse_s3_encryption(&s3_client, TEST_BUCKET).await?;
test_sse_kms_encryption(&s3_client, TEST_BUCKET).await?;
test_sse_c_encryption(&s3_client, TEST_BUCKET).await?;
// Phase 2: Test KMS key management APIs
info!("📋 阶段2: 测试KMS密钥管理API");
info!("📋 Phase 2: Test the KMS Key Management API");
test_kms_key_management(&kms_env.base_env.url, &kms_env.base_env.access_key, &kms_env.base_env.secret_key).await?;
// Phase 3: Test all multipart encryption types
info!("📋 阶段3: 测试所有分片上传加密类型");
info!("📋 Phase 3: Test all shard upload encryption types");
test_all_multipart_encryption_types(&s3_client, TEST_BUCKET, "comprehensive-multipart-test").await?;
// Phase 4: Mixed workload test
info!("📋 阶段4: 混合工作负载测试");
info!("📋 Phase 4: Mixed workload testing");
test_mixed_encryption_workload(&s3_client, TEST_BUCKET).await?;
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("✅ KMS全功能综合测试通过");
info!("✅ KMS fully functional comprehensive test passed");
Ok(())
}
@@ -70,7 +70,7 @@ async fn test_mixed_encryption_workload(
s3_client: &aws_sdk_s3::Client,
bucket: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("🔄 测试混合加密工作负载");
info!("🔄 Test hybrid crypto workloads");
// Test configuration: different sizes and encryption types
let test_configs = vec![
@@ -89,11 +89,11 @@ async fn test_mixed_encryption_workload(
];
for (i, config) in test_configs.iter().enumerate() {
info!("🔄 执行混合测试 {}/{}: {:?}", i + 1, test_configs.len(), config.encryption_type);
info!("🔄 Perform hybrid testing {}/{}: {:?}", i + 1, test_configs.len(), config.encryption_type);
test_multipart_upload_with_config(s3_client, bucket, config).await?;
}
info!("混合加密工作负载测试通过");
info!("Hybrid cryptographic workload tests pass");
Ok(())
}
@@ -102,7 +102,7 @@ async fn test_mixed_encryption_workload(
#[serial]
async fn test_comprehensive_stress_test() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("💪 开始KMS压力测试");
info!("💪 Start the KMS stress test");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -120,7 +120,7 @@ async fn test_comprehensive_stress_test() -> Result<(), Box<dyn std::error::Erro
for config in stress_configs {
info!(
"💪 执行压力测试: {:?}, 总大小: {}MB",
"💪 Perform stress test: {:?}, Total size: {}MB",
config.encryption_type,
config.total_size() / (1024 * 1024)
);
@@ -128,7 +128,7 @@ async fn test_comprehensive_stress_test() -> Result<(), Box<dyn std::error::Erro
}
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("✅ KMS压力测试通过");
info!("✅ KMS stress test passed");
Ok(())
}
@@ -137,7 +137,7 @@ async fn test_comprehensive_stress_test() -> Result<(), Box<dyn std::error::Erro
#[serial]
async fn test_comprehensive_key_isolation() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🔐 开始加密密钥隔离综合测试");
info!("🔐 Begin the comprehensive test of encryption key isolation");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -173,14 +173,14 @@ async fn test_comprehensive_key_isolation() -> Result<(), Box<dyn std::error::Er
);
// Upload with different keys
info!("🔐 上传文件用密钥1");
info!("🔐 Key 1 for uploading files");
test_multipart_upload_with_config(&s3_client, TEST_BUCKET, &config1).await?;
info!("🔐 上传文件用密钥2");
info!("🔐 Key 2 for uploading files");
test_multipart_upload_with_config(&s3_client, TEST_BUCKET, &config2).await?;
// Verify that files cannot be read with wrong keys
info!("🔒 验证密钥隔离");
info!("🔒 Verify key isolation");
let wrong_key = "11111111111111111111111111111111";
let wrong_key_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, wrong_key);
let wrong_key_md5 = format!("{:x}", md5::compute(wrong_key));
@@ -196,11 +196,11 @@ async fn test_comprehensive_key_isolation() -> Result<(), Box<dyn std::error::Er
.send()
.await;
assert!(wrong_read_result.is_err(), "应该无法用错误密钥读取加密文件");
info!("确认密钥隔离正常工作");
assert!(wrong_read_result.is_err(), "The encrypted file should not be readable with the wrong key");
info!("Confirm that key isolation is working correctly");
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("加密密钥隔离综合测试通过");
info!("Encryption key isolation comprehensive test passed");
Ok(())
}
@@ -209,7 +209,7 @@ async fn test_comprehensive_key_isolation() -> Result<(), Box<dyn std::error::Er
#[serial]
async fn test_comprehensive_concurrent_operations() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("开始并发加密操作综合测试");
info!("Started comprehensive testing of concurrent encryption operations");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -228,7 +228,7 @@ async fn test_comprehensive_concurrent_operations() -> Result<(), Box<dyn std::e
];
// Execute uploads concurrently
info!("开始并发上传");
info!("Start concurrent uploads");
let mut tasks = Vec::new();
for config in concurrent_configs {
let client = s3_client.clone();
@@ -243,10 +243,10 @@ async fn test_comprehensive_concurrent_operations() -> Result<(), Box<dyn std::e
task.await??;
}
info!("所有并发操作完成");
info!("All concurrent operations are completed");
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("并发加密操作综合测试通过");
info!("The comprehensive test of concurrent encryption operation has passed");
Ok(())
}
@@ -255,7 +255,7 @@ async fn test_comprehensive_concurrent_operations() -> Result<(), Box<dyn std::e
#[serial]
async fn test_comprehensive_performance_benchmark() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("📊 开始KMS性能基准测试");
info!("📊 Start KMS performance benchmarking");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -278,7 +278,7 @@ async fn test_comprehensive_performance_benchmark() -> Result<(), Box<dyn std::e
];
for (size_name, config) in perf_configs {
info!("📊 测试{}文件性能 ({}MB)", size_name, config.total_size() / (1024 * 1024));
info!("📊 Test {} file performance ({}MB)", size_name, config.total_size() / (1024 * 1024));
let start_time = std::time::Instant::now();
test_multipart_upload_with_config(&s3_client, TEST_BUCKET, &config).await?;
@@ -286,7 +286,7 @@ async fn test_comprehensive_performance_benchmark() -> Result<(), Box<dyn std::e
let throughput_mbps = (config.total_size() as f64 / (1024.0 * 1024.0)) / duration.as_secs_f64();
info!(
"📊 {}文件测试完成: {:.2}秒, 吞吐量: {:.2} MB/s",
"📊 {} file test completed: {:.2} seconds, throughput: {:.2} MB/s",
size_name,
duration.as_secs_f64(),
throughput_mbps
@@ -294,6 +294,6 @@ async fn test_comprehensive_performance_benchmark() -> Result<(), Box<dyn std::e
}
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("✅ KMS性能基准测试通过");
info!("✅ KMS performance benchmark passed");
Ok(())
}

View File

@@ -25,6 +25,7 @@ use super::common::LocalKMSTestEnvironment;
use crate::common::{TEST_BUCKET, init_logging};
use aws_sdk_s3::types::ServerSideEncryption;
use base64::Engine;
use md5::compute;
use serial_test::serial;
use std::sync::Arc;
use tokio::sync::Semaphore;
@@ -71,7 +72,7 @@ async fn test_kms_zero_byte_file_encryption() -> Result<(), Box<dyn std::error::
info!("📤 Testing SSE-C with zero-byte file");
let test_key = "01234567890123456789012345678901";
let test_key_b64 = base64::engine::general_purpose::STANDARD.encode(test_key);
let test_key_md5 = format!("{:x}", md5::compute(test_key));
let test_key_md5 = format!("{:x}", compute(test_key));
let object_key_c = "zero-byte-sse-c";
let _put_response_c = s3_client
@@ -165,7 +166,7 @@ async fn test_kms_single_byte_file_encryption() -> Result<(), Box<dyn std::error
info!("📤 Testing SSE-C with single-byte file");
let test_key = "01234567890123456789012345678901";
let test_key_b64 = base64::engine::general_purpose::STANDARD.encode(test_key);
let test_key_md5 = format!("{:x}", md5::compute(test_key));
let test_key_md5 = format!("{:x}", compute(test_key));
let object_key_c = "single-byte-sse-c";
s3_client
@@ -293,7 +294,7 @@ async fn test_kms_invalid_key_scenarios() -> Result<(), Box<dyn std::error::Erro
info!("🔍 Testing invalid SSE-C key length");
let invalid_short_key = "short"; // Too short
let invalid_key_b64 = base64::engine::general_purpose::STANDARD.encode(invalid_short_key);
let invalid_key_md5 = format!("{:x}", md5::compute(invalid_short_key));
let invalid_key_md5 = format!("{:x}", compute(invalid_short_key));
let invalid_key_result = s3_client
.put_object()
@@ -333,7 +334,7 @@ async fn test_kms_invalid_key_scenarios() -> Result<(), Box<dyn std::error::Erro
info!("🔍 Testing access to SSE-C object without key");
// First upload a valid SSE-C object
let valid_key_md5 = format!("{:x}", md5::compute(valid_key));
let valid_key_md5 = format!("{:x}", compute(valid_key));
s3_client
.put_object()
.bucket(TEST_BUCKET)
@@ -420,7 +421,7 @@ async fn test_kms_concurrent_encryption() -> Result<(), Box<dyn std::error::Erro
// SSE-C
let key = format!("testkey{i:026}"); // 32-byte key
let key_b64 = base64::engine::general_purpose::STANDARD.encode(&key);
let key_md5 = format!("{:x}", md5::compute(&key));
let key_md5 = format!("{:x}", compute(&key));
client
.put_object()
@@ -492,8 +493,8 @@ async fn test_kms_key_validation_security() -> Result<(), Box<dyn std::error::Er
let key1_b64 = base64::engine::general_purpose::STANDARD.encode(key1);
let key2_b64 = base64::engine::general_purpose::STANDARD.encode(key2);
let key1_md5 = format!("{:x}", md5::compute(key1));
let key2_md5 = format!("{:x}", md5::compute(key2));
let key1_md5 = format!("{:x}", compute(key1));
let key2_md5 = format!("{:x}", compute(key2));
// Upload same data with different keys
s3_client

View File

@@ -19,6 +19,7 @@
//! multipart upload behaviour.
use crate::common::{TEST_BUCKET, init_logging};
use md5::compute;
use serial_test::serial;
use tokio::time::{Duration, sleep};
use tracing::{error, info};
@@ -132,8 +133,8 @@ async fn test_vault_kms_key_isolation() -> Result<(), Box<dyn std::error::Error
let key2 = "98765432109876543210987654321098";
let key1_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, key1);
let key2_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, key2);
let key1_md5 = format!("{:x}", md5::compute(key1));
let key2_md5 = format!("{:x}", md5::compute(key2));
let key1_md5 = format!("{:x}", compute(key1));
let key2_md5 = format!("{:x}", compute(key2));
let data1 = b"Vault data encrypted with key 1";
let data2 = b"Vault data encrypted with key 2";

View File

@@ -13,25 +13,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! 分片上传加密功能的分步测试用例
//! Step-by-step test cases for sharded upload encryption
//!
//! 这个测试套件将验证分片上传加密功能的每一个步骤:
//! 1. 测试基础的单分片加密(验证加密基础逻辑)
//! 2. 测试多分片上传(验证分片拼接逻辑)
//! 3. 测试加密元数据的保存和读取
//! 4. 测试完整的分片上传加密流程
//! This test suite will validate every step of the sharded upload encryption feature:
//! 1. Test the underlying single-shard encryption (validate the encryption underlying logic)
//! 2. Test multi-shard uploads (verify shard stitching logic)
//! 3. Test the saving and reading of encrypted metadata
//! 4. Test the complete sharded upload encryption process
use super::common::LocalKMSTestEnvironment;
use crate::common::{TEST_BUCKET, init_logging};
use serial_test::serial;
use tracing::{debug, info};
/// 步骤1测试基础单文件加密功能确保SSE-S3在非分片场景下正常工作
/// Step 1: Test the basic single-file encryption function (ensure that SSE-S3 works properly in non-sharded scenarios)
#[tokio::test]
#[serial]
async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 步骤1测试基础单文件加密功能");
info!("🧪 Step 1: Test the basic single-file encryption function");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -40,11 +40,11 @@ async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::er
let s3_client = kms_env.base_env.create_s3_client();
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
// 测试小文件加密(应该会内联存储)
// Test small file encryption (should be stored inline)
let test_data = b"Hello, this is a small test file for SSE-S3!";
let object_key = "test-single-file-encrypted";
info!("📤 上传小文件({}字节启用SSE-S3加密", test_data.len());
info!("📤 Upload a small file ({} bytes) with SSE-S3 encryption enabled", test_data.len());
let put_response = s3_client
.put_object()
.bucket(TEST_BUCKET)
@@ -54,41 +54,41 @@ async fn test_step1_basic_single_file_encryption() -> Result<(), Box<dyn std::er
.send()
.await?;
debug!("PUT响应ETag: {:?}", put_response.e_tag());
debug!("PUT响应SSE: {:?}", put_response.server_side_encryption());
debug!("PUT responds to ETags: {:?}", put_response.e_tag());
debug!("PUT responds to SSE: {:?}", put_response.server_side_encryption());
// 验证PUT响应包含正确的加密头
// Verify that the PUT response contains the correct cipher header
assert_eq!(
put_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
);
info!("📥 下载文件并验证加密状态");
info!("📥 Download the file and verify the encryption status");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
debug!("GET响应SSE: {:?}", get_response.server_side_encryption());
debug!("GET responds to SSE: {:?}", get_response.server_side_encryption());
// 验证GET响应包含正确的加密头
// Verify that the GET response contains the correct cipher header
assert_eq!(
get_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
);
// 验证数据完整性
// Verify data integrity
let downloaded_data = get_response.body.collect().await?.into_bytes();
assert_eq!(&downloaded_data[..], test_data);
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("步骤1通过基础单文件加密功能正常");
info!("Step 1: The basic single file encryption function is normal");
Ok(())
}
/// 步骤2测试不加密的分片上传确保分片上传基础功能正常
/// Step 2: Test the unencrypted shard upload (make sure the shard upload base is working properly)
#[tokio::test]
#[serial]
async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 步骤2测试不加密的分片上传");
info!("🧪 Step 2: Test unencrypted shard uploads");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -102,12 +102,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
let total_parts = 2;
let total_size = part_size * total_parts;
// 生成测试数据(有明显的模式便于验证)
// Generate test data (with obvious patterns for easy verification)
let test_data: Vec<u8> = (0..total_size).map(|i| (i % 256) as u8).collect();
info!("🚀 开始分片上传(无加密):{} parts每个 {}MB", total_parts, part_size / (1024 * 1024));
info!(
"🚀 Start sharded upload (unencrypted): {} parts, {}MB each",
total_parts,
part_size / (1024 * 1024)
);
// 步骤1创建分片上传
// Step 1: Create a sharded upload
let create_multipart_output = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
@@ -116,16 +120,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
.await?;
let upload_id = create_multipart_output.upload_id().unwrap();
info!("📋 创建分片上传,ID: {}", upload_id);
info!("📋 Create a shard upload with ID: {}", upload_id);
// 步骤2上传各个分片
// Step 2: Upload individual shards
let mut completed_parts = Vec::new();
for part_number in 1..=total_parts {
let start = (part_number - 1) * part_size;
let end = std::cmp::min(start + part_size, total_size);
let part_data = &test_data[start..end];
info!("📤 上传分片 {} ({} bytes)", part_number, part_data.len());
info!("📤 Upload the shard {} ({} bytes)", part_number, part_data.len());
let upload_part_output = s3_client
.upload_part()
@@ -145,15 +149,15 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
.build(),
);
debug!("分片 {} 上传完成,ETag: {}", part_number, etag);
debug!("Fragment {} upload complete,ETag: {}", part_number, etag);
}
// 步骤3完成分片上传
// Step 3: Complete the shard upload
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
info!("🔗 完成分片上传");
info!("🔗 Complete the shard upload");
let complete_output = s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
@@ -163,10 +167,10 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
.send()
.await?;
debug!("完成分片上传,ETag: {:?}", complete_output.e_tag());
debug!("Complete the shard upload,ETag: {:?}", complete_output.e_tag());
// 步骤4下载并验证
info!("📥 下载文件并验证数据完整性");
// Step 4: Download and verify
info!("📥 Download the file and verify data integrity");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
let downloaded_data = get_response.body.collect().await?.into_bytes();
@@ -174,16 +178,16 @@ async fn test_step2_basic_multipart_upload_without_encryption() -> Result<(), Bo
assert_eq!(&downloaded_data[..], &test_data[..]);
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("步骤2通过不加密的分片上传功能正常");
info!("Step 2: Unencrypted shard upload functions normally");
Ok(())
}
/// 步骤3测试分片上传 + SSE-S3加密重点测试
/// Step 3: Test Shard Upload + SSE-S3 Encryption (Focus Test)
#[tokio::test]
#[serial]
async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 步骤3测试分片上传 + SSE-S3加密");
info!("🧪 Step 3: Test Shard Upload + SSE-S3 Encryption");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -201,12 +205,12 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
let test_data: Vec<u8> = (0..total_size).map(|i| ((i / 1000) % 256) as u8).collect();
info!(
"🔐 开始分片上传SSE-S3加密{} parts,每个 {}MB",
"🔐 Start sharded upload (SSE-S3 encryption): {} parts, {}MB each",
total_parts,
part_size / (1024 * 1024)
);
// 步骤1创建分片上传并启用SSE-S3
// Step 1: Create a shard upload and enable SSE-S3
let create_multipart_output = s3_client
.create_multipart_upload()
.bucket(TEST_BUCKET)
@@ -216,24 +220,24 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
.await?;
let upload_id = create_multipart_output.upload_id().unwrap();
info!("📋 创建加密分片上传,ID: {}", upload_id);
info!("📋 Create an encrypted shard upload with ID: {}", upload_id);
// 验证CreateMultipartUpload响应如果有SSE头的话
// Verify the CreateMultipartUpload response (if there is an SSE header)
if let Some(sse) = create_multipart_output.server_side_encryption() {
debug!("CreateMultipartUpload包含SSE响应: {:?}", sse);
debug!("CreateMultipartUpload Contains SSE responses: {:?}", sse);
assert_eq!(sse, &aws_sdk_s3::types::ServerSideEncryption::Aes256);
} else {
debug!("CreateMultipartUpload不包含SSE响应头某些实现中正常");
debug!("CreateMultipartUpload does not contain SSE response headers (normal in some implementations)");
}
// 步骤2上传各个分片
// Step 2: Upload individual shards
let mut completed_parts = Vec::new();
for part_number in 1..=total_parts {
let start = (part_number - 1) * part_size;
let end = std::cmp::min(start + part_size, total_size);
let part_data = &test_data[start..end];
info!("🔐 上传加密分片 {} ({} bytes)", part_number, part_data.len());
info!("🔐 Upload encrypted shards {} ({} bytes)", part_number, part_data.len());
let upload_part_output = s3_client
.upload_part()
@@ -253,15 +257,15 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
.build(),
);
debug!("加密分片 {} 上传完成,ETag: {}", part_number, etag);
debug!("Encrypted shard {} upload complete,ETag: {}", part_number, etag);
}
// 步骤3完成分片上传
// Step 3: Complete the shard upload
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
info!("🔗 完成加密分片上传");
info!("🔗 Complete the encrypted shard upload");
let complete_output = s3_client
.complete_multipart_upload()
.bucket(TEST_BUCKET)
@@ -273,20 +277,20 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
debug!("完成加密分片上传ETag: {:?}", complete_output.e_tag());
// 步骤4HEAD请求检查元数据
// 步骤 4HEAD 请求检查元数据
info!("📋 检查对象元数据");
let head_response = s3_client.head_object().bucket(TEST_BUCKET).key(object_key).send().await?;
debug!("HEAD响应 SSE: {:?}", head_response.server_side_encryption());
debug!("HEAD响应 元数据: {:?}", head_response.metadata());
debug!("HEAD 响应 SSE: {:?}", head_response.server_side_encryption());
debug!("HEAD 响应 元数据{:?}", head_response.metadata());
// 步骤5GET请求下载并验证
// 步骤 5GET 请求下载并验证
info!("📥 下载加密文件并验证");
let get_response = s3_client.get_object().bucket(TEST_BUCKET).key(object_key).send().await?;
debug!("GET响应 SSE: {:?}", get_response.server_side_encryption());
debug!("GET 响应 SSE: {:?}", get_response.server_side_encryption());
// 🎯 关键验证GET响应必须包含SSE-S3加密头
// 🎯 关键验证GET 响应必须包含 SSE-S3 加密头
assert_eq!(
get_response.server_side_encryption(),
Some(&aws_sdk_s3::types::ServerSideEncryption::Aes256)
@@ -298,16 +302,16 @@ async fn test_step3_multipart_upload_with_sse_s3() -> Result<(), Box<dyn std::er
assert_eq!(&downloaded_data[..], &test_data[..]);
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("✅ 步骤3通过:分片上传 + SSE-S3加密功能正常");
info!("✅ 步骤 3 通过:分片上传 + SSE-S3 加密功能正常");
Ok(())
}
/// 步骤4测试更大的分片上传测试流式加密
/// 步骤 4测试更大的分片上传测试流式加密
#[tokio::test]
#[serial]
async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 步骤4测试大文件分片上传加密");
info!("🧪 步骤 4测试大文件分片上传加密");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -317,8 +321,8 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
kms_env.base_env.create_test_bucket(TEST_BUCKET).await?;
let object_key = "test-large-multipart-encrypted";
let part_size = 6 * 1024 * 1024; // 6MB per part (大于1MB加密块大小)
let total_parts = 3; // 总共18MB
let part_size = 6 * 1024 * 1024; // 6MB per part (大于 1MB 加密块大小)
let total_parts = 3; // 总共 18MB
let total_size = part_size * total_parts;
info!(
@@ -337,7 +341,7 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
})
.collect();
info!("🔐 开始大文件分片上传SSE-S3加密");
info!("🔐 开始大文件分片上传SSE-S3 加密)");
// 创建分片上传
let create_multipart_output = s3_client
@@ -419,21 +423,21 @@ async fn test_step4_large_multipart_upload_with_encryption() -> Result<(), Box<d
// 逐字节验证数据(对于大文件更严格)
for (i, (&actual, &expected)) in downloaded_data.iter().zip(test_data.iter()).enumerate() {
if actual != expected {
panic!("大文件数据在第{i}字节不匹配: 实际={actual}, 期待={expected}");
panic!("大文件数据在第{i}字节不匹配实际={actual}, 期待={expected}");
}
}
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("✅ 步骤4通过:大文件分片上传加密功能正常");
info!("✅ 步骤 4 通过:大文件分片上传加密功能正常");
Ok(())
}
/// 步骤5测试所有加密类型的分片上传
/// 步骤 5测试所有加密类型的分片上传
#[tokio::test]
#[serial]
async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("🧪 步骤5测试所有加密类型的分片上传");
info!("🧪 步骤 5测试所有加密类型的分片上传");
let mut kms_env = LocalKMSTestEnvironment::new().await?;
let _default_key_id = kms_env.start_rustfs_for_local_kms().await?;
@@ -446,7 +450,7 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
let total_parts = 2;
let total_size = part_size * total_parts;
// 测试SSE-KMS
// 测试 SSE-KMS
info!("🔐 测试 SSE-KMS 分片上传");
test_multipart_encryption_type(
&s3_client,
@@ -459,7 +463,7 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
)
.await?;
// 测试SSE-C
// 测试 SSE-C
info!("🔐 测试 SSE-C 分片上传");
test_multipart_encryption_type(
&s3_client,
@@ -473,7 +477,7 @@ async fn test_step5_all_encryption_types_multipart() -> Result<(), Box<dyn std::
.await?;
kms_env.base_env.delete_test_bucket(TEST_BUCKET).await?;
info!("✅ 步骤5通过:所有加密类型的分片上传功能正常");
info!("✅ 步骤 5 通过:所有加密类型的分片上传功能正常");
Ok(())
}
@@ -496,7 +500,7 @@ async fn test_multipart_encryption_type(
// 生成测试数据
let test_data: Vec<u8> = (0..total_size).map(|i| ((i * 7) % 256) as u8).collect();
// 准备SSE-C所需的密钥如果需要
// 准备 SSE-C 所需的密钥(如果需要)
let (sse_c_key, sse_c_md5) = if matches!(encryption_type, EncryptionType::SSEC) {
let key = "01234567890123456789012345678901";
let key_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, key);
@@ -537,7 +541,7 @@ async fn test_multipart_encryption_type(
.part_number(part_number as i32)
.body(aws_sdk_s3::primitives::ByteStream::from(part_data.to_vec()));
// SSE-C需要在每个UploadPart请求中包含密钥
// SSE-C 需要在每个 UploadPart 请求中包含密钥
if matches!(encryption_type, EncryptionType::SSEC) {
upload_request = upload_request
.sse_customer_algorithm("AES256")
@@ -574,7 +578,7 @@ async fn test_multipart_encryption_type(
// 下载并验证
let mut get_request = s3_client.get_object().bucket(bucket).key(object_key);
// SSE-C需要在GET请求中包含密钥
// SSE-C 需要在 GET 请求中包含密钥
if matches!(encryption_type, EncryptionType::SSEC) {
get_request = get_request
.sse_customer_algorithm("AES256")

View File

@@ -37,7 +37,7 @@ async-compression = { workspace = true, features = [
] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
tokio-tar = { workspace = true }
astral-tokio-tar = { workspace = true }
[lints]

View File

@@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZlibDecoder, ZstdDecoder};
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZlibEncoder, ZstdEncoder};
// use async_zip::tokio::read::seek::ZipFileReader;
// use async_zip::tokio::write::ZipFileWriter;
// use async_zip::{Compression, ZipEntryBuilder};
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZlibDecoder, ZstdDecoder};
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZlibEncoder, ZstdEncoder};
use std::path::Path;
use tokio::fs::File;
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};

View File

@@ -37,97 +37,97 @@ default = []
workspace = true
[dependencies]
# RustFS Internal Crates
rustfs-ahm = { workspace = true }
rustfs-zip = { workspace = true }
rustfs-madmin = { workspace = true }
rustfs-s3select-api = { workspace = true }
rustfs-appauth = { workspace = true }
rustfs-audit = { workspace = true }
rustfs-ecstore = { workspace = true }
rustfs-policy = { workspace = true }
rustfs-common = { workspace = true }
rustfs-iam = { workspace = true }
rustfs-filemeta.workspace = true
rustfs-rio.workspace = true
rustfs-config = { workspace = true, features = ["constants", "notify"] }
rustfs-notify = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-protos = { workspace = true }
rustfs-s3select-query = { workspace = true }
rustfs-targets = { workspace = true }
rustfs-ecstore = { workspace = true }
rustfs-filemeta.workspace = true
rustfs-iam = { workspace = true }
rustfs-kms = { workspace = true }
rustfs-lock.workspace = true
atoi = { workspace = true }
atomic_enum = { workspace = true }
rustfs-madmin = { workspace = true }
rustfs-notify = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-policy = { workspace = true }
rustfs-protos = { workspace = true }
rustfs-rio.workspace = true
rustfs-s3select-api = { workspace = true }
rustfs-s3select-query = { workspace = true }
rustfs-targets = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-zip = { workspace = true }
# Async Runtime and Networking
async-trait = { workspace = true }
axum.workspace = true
axum-extra = { workspace = true }
axum-server = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
datafusion = { workspace = true }
const-str = { workspace = true }
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
matchit = { workspace = true }
md5.workspace = true
mime_guess = { workspace = true }
opentelemetry = { workspace = true }
pin-project-lite.workspace = true
reqwest = { workspace = true }
socket2 = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] }
tokio-rustls = { workspace = true }
tokio-stream.workspace = true
tokio-util.workspace = true
tonic = { workspace = true }
tower.workspace = true
tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors", "catch-panic", "timeout", "limit"] }
# Serialization and Data Formats
bytes = { workspace = true }
flatbuffers.workspace = true
rmp-serde.workspace = true
rustls = { workspace = true }
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_urlencoded = { workspace = true }
# Cryptography and Security
rustls = { workspace = true }
# Time and Date
chrono = { workspace = true }
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
# Utilities and Tools
astral-tokio-tar = { workspace = true }
atoi = { workspace = true }
atomic_enum = { workspace = true }
base64 = { workspace = true }
base64-simd.workspace = true
clap = { workspace = true }
const-str = { workspace = true }
datafusion = { workspace = true }
hex-simd.workspace = true
matchit = { workspace = true }
md5.workspace = true
mime_guess = { workspace = true }
pin-project-lite.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
shadow-rs = { workspace = true, features = ["build", "metadata"] }
socket2 = { workspace = true }
thiserror = { workspace = true }
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util.workspace = true
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
"net",
"signal",
] }
tokio-stream.workspace = true
tokio-rustls = { workspace = true }
tokio-tar = { workspace = true }
tonic = { workspace = true }
tower.workspace = true
tower-http = { workspace = true, features = [
"trace",
"compression-deflate",
"compression-gzip",
"cors",
"catch-panic",
"timeout",
"limit",
] }
url = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true }
zip = { workspace = true }
base64-simd.workspace = true
hex-simd.workspace = true
# Observability and Metrics
metrics = { workspace = true }
opentelemetry = { workspace = true }
[target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
sysctl = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
libsystemd.workspace = true

View File

@@ -27,7 +27,7 @@ use hyper_util::{
server::graceful::GracefulShutdown,
service::TowerToHyperService,
};
use metrics::counter;
use metrics::{counter, histogram};
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::SystemObserver;
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
@@ -483,12 +483,6 @@ fn process_connection(
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
info!(
counter.rustfs_api_requests_total = 1_u64,
key_request_method = %request.method().to_string(),
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path());
let labels = [
("key_request_method", format!("{}", request.method())),
@@ -501,14 +495,14 @@ fn process_connection(
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
histogram!("request.body.len").record(chunk.len() as f64);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency);
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
counter!("rustfs_api_requests_failure_total").increment(1);
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
@@ -559,11 +553,7 @@ fn process_connection(
"TLS handshake failed: {}", err
);
}
info!(
counter.rustfs_tls_handshake_failures = 1_u64,
key_failure_type = key_failure_type_str,
"TLS handshake failure metric"
);
counter!("rustfs_tls_handshake_failures", &[("key_failure_type", key_failure_type_str)]).increment(1);
// Record detailed diagnostic information
debug!(
peer_addr = %peer_addr,