mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
5 Commits
1.0.0-alph
...
1.0.0-alph
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4595bf7db6 | ||
|
|
f372ccf4a8 | ||
|
|
9ce867f585 | ||
|
|
124c31a68b | ||
|
|
62a01f3801 |
@@ -14,18 +14,27 @@
|
||||
|
||||
services:
|
||||
|
||||
tempo-init:
|
||||
image: busybox:latest
|
||||
command: ["sh", "-c", "chown -R 10001:10001 /var/tempo"]
|
||||
volumes:
|
||||
- ./tempo-data:/var/tempo
|
||||
user: root
|
||||
networks:
|
||||
- otel-network
|
||||
restart: "no"
|
||||
|
||||
tempo:
|
||||
image: grafana/tempo:latest
|
||||
#user: root # The container must be started with root to execute chown in the script
|
||||
#entrypoint: [ "/etc/tempo/entrypoint.sh" ] # Specify a custom entry point
|
||||
user: "10001" # The container must be started with root to execute chown in the script
|
||||
command: [ "-config.file=/etc/tempo.yaml" ] # This is passed as a parameter to the entry point script
|
||||
volumes:
|
||||
- ./tempo-entrypoint.sh:/etc/tempo/entrypoint.sh # Mount entry point script
|
||||
- ./tempo.yaml:/etc/tempo.yaml
|
||||
- ./tempo.yaml:/etc/tempo.yaml:ro
|
||||
- ./tempo-data:/var/tempo
|
||||
ports:
|
||||
- "3200:3200" # tempo
|
||||
- "24317:4317" # otlp grpc
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- otel-network
|
||||
|
||||
@@ -94,4 +103,4 @@ networks:
|
||||
driver: bridge
|
||||
name: "network_otel_config"
|
||||
driver_opts:
|
||||
com.docker.network.enable_ipv6: "true"
|
||||
com.docker.network.enable_ipv6: "true"
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
#!/bin/sh
|
||||
# Run as root to fix directory permissions
|
||||
chown -R 10001:10001 /var/tempo
|
||||
|
||||
# Use su-exec (a lightweight sudo/gosu alternative, commonly used in Alpine mirroring)
|
||||
# Switch to user 10001 and execute the original command (CMD) passed to the script
|
||||
# "$@" represents all parameters passed to this script, i.e. command in docker-compose
|
||||
exec su-exec 10001:10001 /tempo "$@"
|
||||
122
CLAUDE.md
Normal file
122
CLAUDE.md
Normal file
@@ -0,0 +1,122 @@
|
||||
# CLAUDE.md
|
||||
|
||||
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
||||
|
||||
## Project Overview
|
||||
|
||||
RustFS is a high-performance distributed object storage software built with Rust, providing S3-compatible APIs and advanced features like data lakes, AI, and big data support. It's designed as an alternative to MinIO with better performance and a more business-friendly Apache 2.0 license.
|
||||
|
||||
## Build Commands
|
||||
|
||||
### Primary Build Commands
|
||||
- `cargo build --release` - Build the main RustFS binary
|
||||
- `./build-rustfs.sh` - Recommended build script that handles console resources and cross-platform compilation
|
||||
- `./build-rustfs.sh --dev` - Development build with debug symbols
|
||||
- `make build` or `just build` - Use Make/Just for standardized builds
|
||||
|
||||
### Platform-Specific Builds
|
||||
- `./build-rustfs.sh --platform x86_64-unknown-linux-musl` - Build for musl target
|
||||
- `./build-rustfs.sh --platform aarch64-unknown-linux-gnu` - Build for ARM64
|
||||
- `make build-musl` or `just build-musl` - Build musl variant
|
||||
- `make build-cross-all` - Build all supported architectures
|
||||
|
||||
### Testing Commands
|
||||
- `cargo test --workspace --exclude e2e_test` - Run unit tests (excluding e2e tests)
|
||||
- `cargo nextest run --all --exclude e2e_test` - Use nextest if available (faster)
|
||||
- `cargo test --all --doc` - Run documentation tests
|
||||
- `make test` or `just test` - Run full test suite
|
||||
|
||||
### Code Quality
|
||||
- `cargo fmt --all` - Format code
|
||||
- `cargo clippy --all-targets --all-features -- -D warnings` - Lint code
|
||||
- `make pre-commit` or `just pre-commit` - Run all quality checks (fmt, clippy, check, test)
|
||||
|
||||
### Docker Build Commands
|
||||
- `make docker-buildx` - Build multi-architecture production images
|
||||
- `make docker-dev-local` - Build development image for local use
|
||||
- `./docker-buildx.sh --push` - Build and push production images
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
### Core Components
|
||||
|
||||
**Main Binary (`rustfs/`):**
|
||||
- Entry point at `rustfs/src/main.rs`
|
||||
- Core modules: admin, auth, config, server, storage, license management, profiling
|
||||
- HTTP server with S3-compatible APIs
|
||||
- Service state management and graceful shutdown
|
||||
- Parallel service initialization with DNS resolver, bucket metadata, and IAM
|
||||
|
||||
**Key Crates (`crates/`):**
|
||||
- `ecstore` - Erasure coding storage implementation (core storage layer)
|
||||
- `iam` - Identity and Access Management
|
||||
- `madmin` - Management dashboard and admin API interface
|
||||
- `s3select-api` & `s3select-query` - S3 Select API and query engine
|
||||
- `config` - Configuration management with notify features
|
||||
- `crypto` - Cryptography and security features
|
||||
- `lock` - Distributed locking implementation
|
||||
- `filemeta` - File metadata management
|
||||
- `rio` - Rust I/O utilities and abstractions
|
||||
- `common` - Shared utilities and data structures
|
||||
- `protos` - Protocol buffer definitions
|
||||
- `audit-logger` - Audit logging for file operations
|
||||
- `notify` - Event notification system
|
||||
- `obs` - Observability utilities
|
||||
- `workers` - Worker thread pools and task scheduling
|
||||
- `appauth` - Application authentication and authorization
|
||||
|
||||
### Build System
|
||||
- Cargo workspace with 25+ crates
|
||||
- Custom `build-rustfs.sh` script for advanced build options
|
||||
- Multi-architecture Docker builds via `docker-buildx.sh`
|
||||
- Both Make and Just task runners supported
|
||||
- Cross-compilation support for multiple Linux targets
|
||||
|
||||
### Key Dependencies
|
||||
- `axum` - HTTP framework for S3 API server
|
||||
- `tokio` - Async runtime
|
||||
- `s3s` - S3 protocol implementation library
|
||||
- `datafusion` - For S3 Select query processing
|
||||
- `hyper`/`hyper-util` - HTTP client/server utilities
|
||||
- `rustls` - TLS implementation
|
||||
- `serde`/`serde_json` - Serialization
|
||||
- `tracing` - Structured logging and observability
|
||||
- `pprof` - Performance profiling with flamegraph support
|
||||
- `tikv-jemallocator` - Memory allocator for Linux GNU builds
|
||||
|
||||
### Development Workflow
|
||||
- Console resources are embedded during build via `rust-embed`
|
||||
- Protocol buffers generated via custom `gproto` binary
|
||||
- E2E tests in separate crate (`e2e_test`)
|
||||
- Shadow build for version/metadata embedding
|
||||
- Support for both GNU and musl libc targets
|
||||
|
||||
### Performance & Observability
|
||||
- Performance profiling available with `pprof` integration (disabled on Windows)
|
||||
- Profiling enabled via environment variables in production
|
||||
- Built-in observability with OpenTelemetry integration
|
||||
- Background services (scanner, heal) can be controlled via environment variables:
|
||||
- `RUSTFS_ENABLE_SCANNER` (default: true)
|
||||
- `RUSTFS_ENABLE_HEAL` (default: true)
|
||||
|
||||
### Service Architecture
|
||||
- Service state management with graceful shutdown handling
|
||||
- Parallel initialization of core systems (DNS, bucket metadata, IAM)
|
||||
- Event notification system with MQTT and webhook support
|
||||
- Auto-heal and data scanner for storage integrity
|
||||
- Jemalloc allocator for Linux GNU targets for better performance
|
||||
|
||||
## Environment Variables
|
||||
- `RUSTFS_ENABLE_SCANNER` - Enable/disable background data scanner
|
||||
- `RUSTFS_ENABLE_HEAL` - Enable/disable auto-heal functionality
|
||||
- Various profiling and observability controls
|
||||
|
||||
## Code Style
|
||||
- Communicate with me in Chinese, but only English can be used in code files.
|
||||
- Code that may cause program crashes (such as unwrap/expect) must not be used, except for testing purposes.
|
||||
- Code that may cause performance issues (such as blocking IO) must not be used, except for testing purposes.
|
||||
- Code that may cause memory leaks must not be used, except for testing purposes.
|
||||
- Code that may cause deadlocks must not be used, except for testing purposes.
|
||||
- Code that may cause undefined behavior must not be used, except for testing purposes.
|
||||
- Code that may cause panics must not be used, except for testing purposes.
|
||||
- Code that may cause data races must not be used, except for testing purposes.
|
||||
256
Cargo.lock
generated
256
Cargo.lock
generated
@@ -75,6 +75,15 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aligned-vec"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b"
|
||||
dependencies = [
|
||||
"equator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "alloc-no-stdlib"
|
||||
version = "2.0.4"
|
||||
@@ -1645,6 +1654,15 @@ version = "0.8.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
||||
|
||||
[[package]]
|
||||
name = "cpp_demangle"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.17"
|
||||
@@ -2439,6 +2457,15 @@ dependencies = [
|
||||
"sqlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "debugid"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
|
||||
dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deflate64"
|
||||
version = "0.1.9"
|
||||
@@ -2696,6 +2723,26 @@ dependencies = [
|
||||
"syn 2.0.106",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc"
|
||||
dependencies = [
|
||||
"equator-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator-macro"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.106",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.2"
|
||||
@@ -2785,6 +2832,18 @@ dependencies = [
|
||||
"windows-sys 0.60.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "findshlibs"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
@@ -2856,7 +2915,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"spin",
|
||||
"spin 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3620,6 +3679,24 @@ dependencies = [
|
||||
"hashbrown 0.15.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.11.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"indexmap",
|
||||
"is-terminal",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.26.0",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inlinable_string"
|
||||
version = "0.1.15"
|
||||
@@ -3709,6 +3786,17 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "is-terminal"
|
||||
version = "0.4.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "is_debug"
|
||||
version = "1.1.0"
|
||||
@@ -3815,7 +3903,7 @@ version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
dependencies = [
|
||||
"spin",
|
||||
"spin 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4115,6 +4203,15 @@ version = "2.7.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
|
||||
|
||||
[[package]]
|
||||
name = "memmap2"
|
||||
version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.9.1"
|
||||
@@ -4230,6 +4327,17 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.26.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.29.0"
|
||||
@@ -4393,6 +4501,16 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-format"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.46"
|
||||
@@ -5044,6 +5162,30 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
|
||||
dependencies = [
|
||||
"aligned-vec",
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"findshlibs",
|
||||
"inferno",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"protobuf",
|
||||
"protobuf-codegen",
|
||||
"smallvec",
|
||||
"spin 0.10.0",
|
||||
"symbolic-demangle",
|
||||
"tempfile",
|
||||
"thiserror 2.0.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.21"
|
||||
@@ -5181,6 +5323,57 @@ dependencies = [
|
||||
"prost 0.14.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"protobuf-support",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"once_cell",
|
||||
"protobuf",
|
||||
"protobuf-parse",
|
||||
"regex",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-parse"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"indexmap",
|
||||
"log",
|
||||
"protobuf",
|
||||
"protobuf-support",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-support"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
|
||||
dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psm"
|
||||
version = "0.1.26"
|
||||
@@ -5210,6 +5403,15 @@ dependencies = [
|
||||
"pulldown-cmark",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.37.5"
|
||||
@@ -5592,6 +5794,15 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rgb"
|
||||
version = "0.8.52"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.14"
|
||||
@@ -5786,6 +5997,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"pprof",
|
||||
"reqwest",
|
||||
"rust-embed",
|
||||
"rustfs-ahm",
|
||||
@@ -5981,9 +6193,11 @@ dependencies = [
|
||||
"hyper-util",
|
||||
"lazy_static",
|
||||
"md-5",
|
||||
"moka",
|
||||
"nix 0.30.1",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"path-absolutize",
|
||||
"pin-project-lite",
|
||||
"quick-xml 0.38.3",
|
||||
@@ -7146,6 +7360,15 @@ dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spin"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
version = "0.6.0"
|
||||
@@ -7223,6 +7446,12 @@ dependencies = [
|
||||
"thiserror 2.0.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "str_stack"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.11.1"
|
||||
@@ -7334,6 +7563,29 @@ dependencies = [
|
||||
"sval_nested",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "12.16.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9da12f8fecbbeaa1ee62c1d50dc656407e007c3ee7b2a41afce4b5089eaef15e"
|
||||
dependencies = [
|
||||
"debugid",
|
||||
"memmap2",
|
||||
"stable_deref_trait",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-demangle"
|
||||
version = "12.16.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fd35afe0ef9d35d3dcd41c67ddf882fc832a387221338153b7cd685a105495c"
|
||||
dependencies = [
|
||||
"cpp_demangle",
|
||||
"rustc-demangle",
|
||||
"symbolic-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
|
||||
@@ -101,6 +101,8 @@ rustfs-signer.workspace = true
|
||||
rustfs-checksums.workspace = true
|
||||
futures-util.workspace = true
|
||||
async-recursion.workspace = true
|
||||
parking_lot = "0.12"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
nix = { workspace = true }
|
||||
|
||||
231
crates/ecstore/src/batch_processor.rs
Normal file
231
crates/ecstore/src/batch_processor.rs
Normal file
@@ -0,0 +1,231 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! High-performance batch processor using JoinSet
|
||||
//!
|
||||
//! This module provides optimized batching utilities to reduce async runtime overhead
|
||||
//! and improve concurrent operation performance.
|
||||
|
||||
use crate::disk::error::{Error, Result};
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
/// Batch processor that executes tasks concurrently with a semaphore
|
||||
pub struct AsyncBatchProcessor {
|
||||
max_concurrent: usize,
|
||||
}
|
||||
|
||||
impl AsyncBatchProcessor {
|
||||
pub fn new(max_concurrent: usize) -> Self {
|
||||
Self { max_concurrent }
|
||||
}
|
||||
|
||||
/// Execute a batch of tasks concurrently with concurrency control
|
||||
pub async fn execute_batch<T, F>(&self, tasks: Vec<F>) -> Vec<Result<T>>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: Future<Output = Result<T>> + Send + 'static,
|
||||
{
|
||||
if tasks.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut results = Vec::with_capacity(tasks.len());
|
||||
for _ in 0..tasks.len() {
|
||||
results.push(Err(Error::other("Not completed")));
|
||||
}
|
||||
|
||||
// Spawn all tasks with semaphore control
|
||||
for (i, task) in tasks.into_iter().enumerate() {
|
||||
let sem = semaphore.clone();
|
||||
join_set.spawn(async move {
|
||||
let _permit = sem.acquire().await.map_err(|_| Error::other("Semaphore error"))?;
|
||||
let result = task.await;
|
||||
Ok::<(usize, Result<T>), Error>((i, result))
|
||||
});
|
||||
}
|
||||
|
||||
// Collect results
|
||||
while let Some(join_result) = join_set.join_next().await {
|
||||
match join_result {
|
||||
Ok(Ok((index, task_result))) => {
|
||||
if index < results.len() {
|
||||
results[index] = task_result;
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// Semaphore or other system error - this is rare
|
||||
tracing::warn!("Batch processor system error: {:?}", e);
|
||||
}
|
||||
Err(join_error) => {
|
||||
// Task panicked - log but continue
|
||||
tracing::warn!("Task panicked in batch processor: {:?}", join_error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
/// Execute batch with early termination when sufficient successful results are obtained
|
||||
pub async fn execute_batch_with_quorum<T, F>(&self, tasks: Vec<F>, required_successes: usize) -> Result<Vec<T>>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: Future<Output = Result<T>> + Send + 'static,
|
||||
{
|
||||
let results = self.execute_batch(tasks).await;
|
||||
let mut successes = Vec::new();
|
||||
|
||||
for value in results.into_iter().flatten() {
|
||||
successes.push(value);
|
||||
if successes.len() >= required_successes {
|
||||
return Ok(successes);
|
||||
}
|
||||
}
|
||||
|
||||
if successes.len() >= required_successes {
|
||||
Ok(successes)
|
||||
} else {
|
||||
Err(Error::other(format!(
|
||||
"Insufficient successful results: got {}, needed {}",
|
||||
successes.len(),
|
||||
required_successes
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Global batch processor instances
|
||||
pub struct GlobalBatchProcessors {
|
||||
read_processor: AsyncBatchProcessor,
|
||||
write_processor: AsyncBatchProcessor,
|
||||
metadata_processor: AsyncBatchProcessor,
|
||||
}
|
||||
|
||||
impl GlobalBatchProcessors {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
read_processor: AsyncBatchProcessor::new(16), // Higher concurrency for reads
|
||||
write_processor: AsyncBatchProcessor::new(8), // Lower concurrency for writes
|
||||
metadata_processor: AsyncBatchProcessor::new(12), // Medium concurrency for metadata
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_processor(&self) -> &AsyncBatchProcessor {
|
||||
&self.read_processor
|
||||
}
|
||||
|
||||
pub fn write_processor(&self) -> &AsyncBatchProcessor {
|
||||
&self.write_processor
|
||||
}
|
||||
|
||||
pub fn metadata_processor(&self) -> &AsyncBatchProcessor {
|
||||
&self.metadata_processor
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for GlobalBatchProcessors {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Global instance
|
||||
use std::sync::OnceLock;
|
||||
|
||||
static GLOBAL_PROCESSORS: OnceLock<GlobalBatchProcessors> = OnceLock::new();
|
||||
|
||||
pub fn get_global_processors() -> &'static GlobalBatchProcessors {
|
||||
GLOBAL_PROCESSORS.get_or_init(GlobalBatchProcessors::new)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_processor_basic() {
|
||||
let processor = AsyncBatchProcessor::new(4);
|
||||
|
||||
let tasks: Vec<_> = (0..10)
|
||||
.map(|i| async move {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
Ok::<i32, Error>(i)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = processor.execute_batch(tasks).await;
|
||||
assert_eq!(results.len(), 10);
|
||||
|
||||
// All tasks should succeed
|
||||
for (i, result) in results.iter().enumerate() {
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.as_ref().unwrap(), &(i as i32));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_processor_with_errors() {
|
||||
let processor = AsyncBatchProcessor::new(2);
|
||||
|
||||
let tasks: Vec<_> = (0..5)
|
||||
.map(|i| async move {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
if i % 2 == 0 {
|
||||
Ok::<i32, Error>(i)
|
||||
} else {
|
||||
Err(Error::other("Test error"))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = processor.execute_batch(tasks).await;
|
||||
assert_eq!(results.len(), 5);
|
||||
|
||||
// Check results pattern
|
||||
for (i, result) in results.iter().enumerate() {
|
||||
if i % 2 == 0 {
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.as_ref().unwrap(), &(i as i32));
|
||||
} else {
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_processor_quorum() {
|
||||
let processor = AsyncBatchProcessor::new(4);
|
||||
|
||||
let tasks: Vec<_> = (0..10)
|
||||
.map(|i| async move {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
if i < 3 {
|
||||
Ok::<i32, Error>(i)
|
||||
} else {
|
||||
Err(Error::other("Test error"))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = processor.execute_batch_with_quorum(tasks, 2).await;
|
||||
assert!(results.is_ok());
|
||||
let successes = results.unwrap();
|
||||
assert!(successes.len() >= 2);
|
||||
}
|
||||
}
|
||||
@@ -321,7 +321,7 @@ impl ExpiryState {
|
||||
let mut state = GLOBAL_ExpiryState.write().await;
|
||||
|
||||
while state.tasks_tx.len() < n {
|
||||
let (tx, rx) = mpsc::channel(10000);
|
||||
let (tx, rx) = mpsc::channel(1000);
|
||||
let api = api.clone();
|
||||
let rx = Arc::new(tokio::sync::Mutex::new(rx));
|
||||
state.tasks_tx.push(tx);
|
||||
@@ -432,7 +432,7 @@ pub struct TransitionState {
|
||||
impl TransitionState {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new() -> Arc<Self> {
|
||||
let (tx1, rx1) = bounded(100000);
|
||||
let (tx1, rx1) = bounded(1000);
|
||||
let (tx2, rx2) = bounded(1);
|
||||
Arc::new(Self {
|
||||
transition_tx: tx1,
|
||||
@@ -467,8 +467,12 @@ impl TransitionState {
|
||||
}
|
||||
|
||||
pub async fn init(api: Arc<ECStore>) {
|
||||
let mut n = 10; //globalAPIConfig.getTransitionWorkers();
|
||||
let tw = 10; //globalILMConfig.getTransitionWorkers();
|
||||
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<i64>().ok())
|
||||
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
|
||||
let mut n = max_workers;
|
||||
let tw = 8; //globalILMConfig.getTransitionWorkers();
|
||||
if tw > 0 {
|
||||
n = tw;
|
||||
}
|
||||
@@ -561,8 +565,18 @@ impl TransitionState {
|
||||
pub async fn update_workers_inner(api: Arc<ECStore>, n: i64) {
|
||||
let mut n = n;
|
||||
if n == 0 {
|
||||
n = 100;
|
||||
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<i64>().ok())
|
||||
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
|
||||
n = max_workers;
|
||||
}
|
||||
// Allow environment override of maximum workers
|
||||
let absolute_max = std::env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<i64>().ok())
|
||||
.unwrap_or(32);
|
||||
n = std::cmp::min(n, absolute_max);
|
||||
|
||||
let mut num_workers = GLOBAL_TransitionState.num_workers.load(Ordering::SeqCst);
|
||||
while num_workers < n {
|
||||
@@ -585,7 +599,10 @@ impl TransitionState {
|
||||
}
|
||||
|
||||
pub async fn init_background_expiry(api: Arc<ECStore>) {
|
||||
let mut workers = num_cpus::get() / 2;
|
||||
let mut workers = std::env::var("RUSTFS_MAX_EXPIRY_WORKERS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<usize>().ok())
|
||||
.unwrap_or_else(|| std::cmp::min(num_cpus::get(), 16));
|
||||
//globalILMConfig.getExpirationWorkers()
|
||||
if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") {
|
||||
if let Ok(num_expirations) = env_expiration_workers.parse::<usize>() {
|
||||
@@ -594,7 +611,10 @@ pub async fn init_background_expiry(api: Arc<ECStore>) {
|
||||
}
|
||||
|
||||
if workers == 0 {
|
||||
workers = 100;
|
||||
workers = std::env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<usize>().ok())
|
||||
.unwrap_or(8);
|
||||
}
|
||||
|
||||
//let expiry_state = GLOBAL_ExpiryStSate.write().await;
|
||||
|
||||
@@ -12,13 +12,45 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{fs::Metadata, path::Path};
|
||||
use std::{
|
||||
fs::Metadata,
|
||||
path::Path,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
fs::{self, File},
|
||||
io,
|
||||
};
|
||||
|
||||
static READONLY_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
|
||||
static WRITEONLY_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
|
||||
static READWRITE_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
|
||||
|
||||
fn get_readonly_options() -> &'static Arc<fs::OpenOptions> {
|
||||
READONLY_OPTIONS.get_or_init(|| {
|
||||
let mut opts = fs::OpenOptions::new();
|
||||
opts.read(true);
|
||||
Arc::new(opts)
|
||||
})
|
||||
}
|
||||
|
||||
fn get_writeonly_options() -> &'static Arc<fs::OpenOptions> {
|
||||
WRITEONLY_OPTIONS.get_or_init(|| {
|
||||
let mut opts = fs::OpenOptions::new();
|
||||
opts.write(true);
|
||||
Arc::new(opts)
|
||||
})
|
||||
}
|
||||
|
||||
fn get_readwrite_options() -> &'static Arc<fs::OpenOptions> {
|
||||
READWRITE_OPTIONS.get_or_init(|| {
|
||||
let mut opts = fs::OpenOptions::new();
|
||||
opts.read(true).write(true);
|
||||
Arc::new(opts)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
@@ -84,35 +116,28 @@ pub const O_APPEND: FileMode = 0x00400;
|
||||
// create_new: bool,
|
||||
|
||||
pub async fn open_file(path: impl AsRef<Path>, mode: FileMode) -> io::Result<File> {
|
||||
let mut opts = fs::OpenOptions::new();
|
||||
|
||||
match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
|
||||
O_RDONLY => {
|
||||
opts.read(true);
|
||||
}
|
||||
O_WRONLY => {
|
||||
opts.write(true);
|
||||
}
|
||||
O_RDWR => {
|
||||
opts.read(true);
|
||||
opts.write(true);
|
||||
}
|
||||
_ => (),
|
||||
let base_opts = match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
|
||||
O_RDONLY => get_readonly_options(),
|
||||
O_WRONLY => get_writeonly_options(),
|
||||
O_RDWR => get_readwrite_options(),
|
||||
_ => get_readonly_options(),
|
||||
};
|
||||
|
||||
if mode & O_CREATE != 0 {
|
||||
opts.create(true);
|
||||
if (mode & (O_CREATE | O_APPEND | O_TRUNC)) != 0 {
|
||||
let mut opts = (**base_opts).clone();
|
||||
if mode & O_CREATE != 0 {
|
||||
opts.create(true);
|
||||
}
|
||||
if mode & O_APPEND != 0 {
|
||||
opts.append(true);
|
||||
}
|
||||
if mode & O_TRUNC != 0 {
|
||||
opts.truncate(true);
|
||||
}
|
||||
opts.open(path.as_ref()).await
|
||||
} else {
|
||||
base_opts.open(path.as_ref()).await
|
||||
}
|
||||
|
||||
if mode & O_APPEND != 0 {
|
||||
opts.append(true);
|
||||
}
|
||||
|
||||
if mode & O_TRUNC != 0 {
|
||||
opts.truncate(true);
|
||||
}
|
||||
|
||||
opts.open(path.as_ref()).await
|
||||
}
|
||||
|
||||
pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
@@ -121,7 +146,7 @@ pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
}
|
||||
|
||||
pub fn access_std(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
tokio::task::block_in_place(|| std::fs::metadata(path))?;
|
||||
std::fs::metadata(path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -130,7 +155,7 @@ pub async fn lstat(path: impl AsRef<Path>) -> io::Result<Metadata> {
|
||||
}
|
||||
|
||||
pub fn lstat_std(path: impl AsRef<Path>) -> io::Result<Metadata> {
|
||||
tokio::task::block_in_place(|| std::fs::metadata(path))
|
||||
std::fs::metadata(path)
|
||||
}
|
||||
|
||||
pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
@@ -159,26 +184,22 @@ pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
let path = path.as_ref();
|
||||
tokio::task::block_in_place(|| {
|
||||
let meta = std::fs::metadata(path)?;
|
||||
if meta.is_dir() {
|
||||
std::fs::remove_dir(path)
|
||||
} else {
|
||||
std::fs::remove_file(path)
|
||||
}
|
||||
})
|
||||
let meta = std::fs::metadata(path)?;
|
||||
if meta.is_dir() {
|
||||
std::fs::remove_dir(path)
|
||||
} else {
|
||||
std::fs::remove_file(path)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_all_std(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
let path = path.as_ref();
|
||||
tokio::task::block_in_place(|| {
|
||||
let meta = std::fs::metadata(path)?;
|
||||
if meta.is_dir() {
|
||||
std::fs::remove_dir_all(path)
|
||||
} else {
|
||||
std::fs::remove_file(path)
|
||||
}
|
||||
})
|
||||
let meta = std::fs::metadata(path)?;
|
||||
if meta.is_dir() {
|
||||
std::fs::remove_dir_all(path)
|
||||
} else {
|
||||
std::fs::remove_file(path)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
@@ -190,7 +211,7 @@ pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<
|
||||
}
|
||||
|
||||
pub fn rename_std(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
|
||||
tokio::task::block_in_place(|| std::fs::rename(from, to))
|
||||
std::fs::rename(from, to)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
|
||||
@@ -41,18 +41,21 @@ use tokio::time::interval;
|
||||
|
||||
use crate::erasure_coding::bitrot_verify;
|
||||
use bytes::Bytes;
|
||||
use path_absolutize::Absolutize;
|
||||
// use path_absolutize::Absolutize; // Replaced with direct path operations for better performance
|
||||
use crate::file_cache::{get_global_file_cache, prefetch_metadata_patterns, read_metadata_cached};
|
||||
use parking_lot::RwLock as ParkingLotRwLock;
|
||||
use rustfs_filemeta::{
|
||||
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn,
|
||||
get_file_info, read_xl_meta_no_data,
|
||||
};
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use rustfs_utils::os::get_info;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::io::SeekFrom;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
fs::Metadata,
|
||||
@@ -101,6 +104,9 @@ pub struct LocalDisk {
|
||||
pub major: u64,
|
||||
pub minor: u64,
|
||||
pub nrrequests: u64,
|
||||
// Performance optimization fields
|
||||
path_cache: Arc<ParkingLotRwLock<HashMap<String, PathBuf>>>,
|
||||
current_dir: Arc<OnceLock<PathBuf>>,
|
||||
// pub id: Mutex<Option<Uuid>>,
|
||||
// pub format_data: Mutex<Vec<u8>>,
|
||||
// pub format_file_info: Mutex<Option<Metadata>>,
|
||||
@@ -130,8 +136,9 @@ impl Debug for LocalDisk {
|
||||
impl LocalDisk {
|
||||
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self> {
|
||||
debug!("Creating local disk");
|
||||
let root = match PathBuf::from(ep.get_file_path()).absolutize() {
|
||||
Ok(path) => path.into_owned(),
|
||||
// Use optimized path resolution instead of absolutize() for better performance
|
||||
let root = match std::fs::canonicalize(ep.get_file_path()) {
|
||||
Ok(path) => path,
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
return Err(DiskError::VolumeNotFound);
|
||||
@@ -144,10 +151,8 @@ impl LocalDisk {
|
||||
// TODO: 删除 tmp 数据
|
||||
}
|
||||
|
||||
let format_path = Path::new(RUSTFS_META_BUCKET)
|
||||
.join(Path::new(super::FORMAT_CONFIG_FILE))
|
||||
.absolutize_virtually(&root)?
|
||||
.into_owned();
|
||||
// Use optimized path resolution instead of absolutize_virtually
|
||||
let format_path = root.join(RUSTFS_META_BUCKET).join(super::FORMAT_CONFIG_FILE);
|
||||
debug!("format_path: {:?}", format_path);
|
||||
let (format_data, format_meta) = read_file_exists(&format_path).await?;
|
||||
|
||||
@@ -227,6 +232,8 @@ impl LocalDisk {
|
||||
// format_file_info: Mutex::new(format_meta),
|
||||
// format_data: Mutex::new(format_data),
|
||||
// format_last_check: Mutex::new(format_last_check),
|
||||
path_cache: Arc::new(ParkingLotRwLock::new(HashMap::with_capacity(2048))),
|
||||
current_dir: Arc::new(OnceLock::new()),
|
||||
exit_signal: None,
|
||||
};
|
||||
let (info, _root) = get_disk_info(root).await?;
|
||||
@@ -351,19 +358,178 @@ impl LocalDisk {
|
||||
self.make_volumes(defaults).await
|
||||
}
|
||||
|
||||
// Optimized path resolution with caching
|
||||
pub fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
|
||||
Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
|
||||
let path_ref = path.as_ref();
|
||||
let path_str = path_ref.to_string_lossy();
|
||||
|
||||
// Fast cache read
|
||||
{
|
||||
let cache = self.path_cache.read();
|
||||
if let Some(cached_path) = cache.get(path_str.as_ref()) {
|
||||
return Ok(cached_path.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate absolute path without using path_absolutize for better performance
|
||||
let abs_path = if path_ref.is_absolute() {
|
||||
path_ref.to_path_buf()
|
||||
} else {
|
||||
self.root.join(path_ref)
|
||||
};
|
||||
|
||||
// Normalize path components to avoid filesystem calls
|
||||
let normalized = self.normalize_path_components(&abs_path);
|
||||
|
||||
// Cache the result
|
||||
{
|
||||
let mut cache = self.path_cache.write();
|
||||
|
||||
// Simple cache size control
|
||||
if cache.len() >= 4096 {
|
||||
// Clear half the cache - simple eviction strategy
|
||||
let keys_to_remove: Vec<_> = cache.keys().take(cache.len() / 2).cloned().collect();
|
||||
for key in keys_to_remove {
|
||||
cache.remove(&key);
|
||||
}
|
||||
}
|
||||
|
||||
cache.insert(path_str.into_owned(), normalized.clone());
|
||||
}
|
||||
|
||||
Ok(normalized)
|
||||
}
|
||||
|
||||
// Lightweight path normalization without filesystem calls
|
||||
fn normalize_path_components(&self, path: &Path) -> PathBuf {
|
||||
let mut result = PathBuf::new();
|
||||
|
||||
for component in path.components() {
|
||||
match component {
|
||||
std::path::Component::Normal(name) => {
|
||||
result.push(name);
|
||||
}
|
||||
std::path::Component::ParentDir => {
|
||||
result.pop();
|
||||
}
|
||||
std::path::Component::CurDir => {
|
||||
// Ignore current directory components
|
||||
}
|
||||
std::path::Component::RootDir => {
|
||||
result.push(component);
|
||||
}
|
||||
std::path::Component::Prefix(_prefix) => {
|
||||
result.push(component);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
// Highly optimized object path generation
|
||||
pub fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
|
||||
let dir = Path::new(&bucket);
|
||||
let file_path = Path::new(&key);
|
||||
self.resolve_abs_path(dir.join(file_path))
|
||||
// For high-frequency paths, use faster string concatenation
|
||||
let cache_key = if key.is_empty() {
|
||||
bucket.to_string()
|
||||
} else {
|
||||
// Use with_capacity to pre-allocate, reducing memory reallocations
|
||||
let mut path_str = String::with_capacity(bucket.len() + key.len() + 1);
|
||||
path_str.push_str(bucket);
|
||||
path_str.push('/');
|
||||
path_str.push_str(key);
|
||||
path_str
|
||||
};
|
||||
|
||||
// Fast path: directly calculate based on root, avoiding cache lookup overhead for simple cases
|
||||
Ok(self.root.join(&cache_key))
|
||||
}
|
||||
|
||||
pub fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> {
|
||||
let dir = Path::new(&bucket);
|
||||
self.resolve_abs_path(dir)
|
||||
Ok(self.root.join(bucket))
|
||||
}
|
||||
|
||||
// Batch path generation with single lock acquisition
|
||||
pub fn get_object_paths_batch(&self, requests: &[(String, String)]) -> Result<Vec<PathBuf>> {
|
||||
let mut results = Vec::with_capacity(requests.len());
|
||||
let mut cache_misses = Vec::new();
|
||||
|
||||
// First attempt to get all paths from cache
|
||||
{
|
||||
let cache = self.path_cache.read();
|
||||
for (i, (bucket, key)) in requests.iter().enumerate() {
|
||||
let cache_key = format!("{}/{}", bucket, key);
|
||||
if let Some(cached_path) = cache.get(&cache_key) {
|
||||
results.push((i, cached_path.clone()));
|
||||
} else {
|
||||
cache_misses.push((i, bucket, key, cache_key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle cache misses
|
||||
if !cache_misses.is_empty() {
|
||||
let mut new_entries = Vec::new();
|
||||
for (i, _bucket, _key, cache_key) in cache_misses {
|
||||
let path = self.root.join(&cache_key);
|
||||
results.push((i, path.clone()));
|
||||
new_entries.push((cache_key, path));
|
||||
}
|
||||
|
||||
// Batch update cache
|
||||
{
|
||||
let mut cache = self.path_cache.write();
|
||||
for (key, path) in new_entries {
|
||||
cache.insert(key, path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort results back to original order
|
||||
results.sort_by_key(|(i, _)| *i);
|
||||
Ok(results.into_iter().map(|(_, path)| path).collect())
|
||||
}
|
||||
|
||||
// Optimized metadata reading with caching
|
||||
pub async fn read_metadata_cached(&self, path: PathBuf) -> Result<Arc<FileMeta>> {
|
||||
read_metadata_cached(path).await
|
||||
}
|
||||
|
||||
// Smart prefetching for related files
|
||||
pub async fn read_version_with_prefetch(
|
||||
&self,
|
||||
volume: &str,
|
||||
path: &str,
|
||||
version_id: &str,
|
||||
opts: &ReadOptions,
|
||||
) -> Result<FileInfo> {
|
||||
let file_path = self.get_object_path(volume, path)?;
|
||||
|
||||
// Async prefetch related files, don't block current read
|
||||
if let Some(parent) = file_path.parent() {
|
||||
prefetch_metadata_patterns(parent, &[super::STORAGE_FORMAT_FILE, "part.1", "part.2", "part.meta"]).await;
|
||||
}
|
||||
|
||||
// Main read logic
|
||||
let file_dir = self.get_bucket_path(volume)?;
|
||||
let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?;
|
||||
|
||||
get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data })
|
||||
.await
|
||||
.map_err(|_e| DiskError::Unexpected)
|
||||
}
|
||||
|
||||
// Batch metadata reading for multiple objects
|
||||
pub async fn read_metadata_batch(&self, requests: Vec<(String, String)>) -> Result<Vec<Option<Arc<FileMeta>>>> {
|
||||
let paths: Vec<PathBuf> = requests
|
||||
.iter()
|
||||
.map(|(bucket, key)| self.get_object_path(bucket, &format!("{}/{}", key, super::STORAGE_FORMAT_FILE)))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let cache = get_global_file_cache();
|
||||
let results = cache.get_metadata_batch(paths).await;
|
||||
|
||||
Ok(results.into_iter().map(|r| r.ok()).collect())
|
||||
}
|
||||
|
||||
// /// Write to the filesystem atomically.
|
||||
@@ -549,7 +715,15 @@ impl LocalDisk {
|
||||
}
|
||||
|
||||
async fn read_metadata(&self, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
|
||||
// TODO: support timeout
|
||||
// Try to use cached file content reading for better performance, with safe fallback
|
||||
let path = file_path.as_ref().to_path_buf();
|
||||
|
||||
// First, try the cache
|
||||
if let Ok(bytes) = get_global_file_cache().get_file_content(path.clone()).await {
|
||||
return Ok(bytes.to_vec());
|
||||
}
|
||||
|
||||
// Fallback to direct read if cache fails
|
||||
let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?;
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
@@ -668,7 +668,7 @@ pub struct VolumeInfo {
|
||||
pub created: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Default)]
|
||||
#[derive(Deserialize, Serialize, Debug, Default, Clone)]
|
||||
pub struct ReadOptions {
|
||||
pub incl_free_versions: bool,
|
||||
pub read_data: bool,
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, get_host_ip_async, is_local_host};
|
||||
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
|
||||
use tracing::{error, instrument, warn};
|
||||
|
||||
use crate::{
|
||||
@@ -248,8 +248,7 @@ impl PoolEndpointList {
|
||||
Ok(ips) => ips,
|
||||
Err(e) => {
|
||||
error!("host {} not found, error:{}", host, e);
|
||||
get_host_ip_async(host.clone())
|
||||
.map_err(|e| Error::other(format!("host '{host}' cannot resolve: {e}")))?
|
||||
return Err(Error::other(format!("host '{host}' cannot resolve: {e}")));
|
||||
}
|
||||
};
|
||||
host_ip_cache.insert(host.clone(), ips);
|
||||
|
||||
332
crates/ecstore/src/file_cache.rs
Normal file
332
crates/ecstore/src/file_cache.rs
Normal file
@@ -0,0 +1,332 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! High-performance file content and metadata caching using moka
|
||||
//!
|
||||
//! This module provides optimized caching for file operations to reduce
|
||||
//! redundant I/O and improve overall system performance.
|
||||
|
||||
use super::disk::error::{Error, Result};
|
||||
use bytes::Bytes;
|
||||
use moka::future::Cache;
|
||||
use rustfs_filemeta::FileMeta;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct OptimizedFileCache {
|
||||
// Use moka as high-performance async cache
|
||||
metadata_cache: Cache<PathBuf, Arc<FileMeta>>,
|
||||
file_content_cache: Cache<PathBuf, Bytes>,
|
||||
// Performance monitoring
|
||||
cache_hits: std::sync::atomic::AtomicU64,
|
||||
cache_misses: std::sync::atomic::AtomicU64,
|
||||
}
|
||||
|
||||
impl OptimizedFileCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
metadata_cache: Cache::builder()
|
||||
.max_capacity(2048)
|
||||
.time_to_live(Duration::from_secs(300)) // 5 minutes TTL
|
||||
.time_to_idle(Duration::from_secs(60)) // 1 minute idle
|
||||
.build(),
|
||||
|
||||
file_content_cache: Cache::builder()
|
||||
.max_capacity(512) // Smaller file content cache
|
||||
.time_to_live(Duration::from_secs(120))
|
||||
.weigher(|_key: &PathBuf, value: &Bytes| value.len() as u32)
|
||||
.build(),
|
||||
|
||||
cache_hits: std::sync::atomic::AtomicU64::new(0),
|
||||
cache_misses: std::sync::atomic::AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_metadata(&self, path: PathBuf) -> Result<Arc<FileMeta>> {
|
||||
if let Some(cached) = self.metadata_cache.get(&path).await {
|
||||
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
// Cache miss, read file
|
||||
let data = tokio::fs::read(&path)
|
||||
.await
|
||||
.map_err(|e| Error::other(format!("Read metadata failed: {}", e)))?;
|
||||
|
||||
let mut meta = FileMeta::default();
|
||||
meta.unmarshal_msg(&data)?;
|
||||
|
||||
let arc_meta = Arc::new(meta);
|
||||
self.metadata_cache.insert(path, arc_meta.clone()).await;
|
||||
|
||||
Ok(arc_meta)
|
||||
}
|
||||
|
||||
pub async fn get_file_content(&self, path: PathBuf) -> Result<Bytes> {
|
||||
if let Some(cached) = self.file_content_cache.get(&path).await {
|
||||
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let data = tokio::fs::read(&path)
|
||||
.await
|
||||
.map_err(|e| Error::other(format!("Read file failed: {}", e)))?;
|
||||
|
||||
let bytes = Bytes::from(data);
|
||||
self.file_content_cache.insert(path, bytes.clone()).await;
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
// Prefetch related files
|
||||
pub async fn prefetch_related(&self, base_path: &Path, patterns: &[&str]) {
|
||||
let mut prefetch_tasks = Vec::new();
|
||||
|
||||
for pattern in patterns {
|
||||
let path = base_path.join(pattern);
|
||||
if tokio::fs::metadata(&path).await.is_ok() {
|
||||
let cache = self.clone();
|
||||
let path_clone = path.clone();
|
||||
prefetch_tasks.push(async move {
|
||||
let _ = cache.get_metadata(path_clone).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Parallel prefetch, don't wait for completion
|
||||
if !prefetch_tasks.is_empty() {
|
||||
tokio::spawn(async move {
|
||||
futures::future::join_all(prefetch_tasks).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Batch metadata reading with deduplication
|
||||
pub async fn get_metadata_batch(
|
||||
&self,
|
||||
paths: Vec<PathBuf>,
|
||||
) -> Vec<std::result::Result<Arc<FileMeta>, rustfs_filemeta::Error>> {
|
||||
let mut results = Vec::with_capacity(paths.len());
|
||||
let mut cache_futures = Vec::new();
|
||||
|
||||
// First, attempt to get from cache
|
||||
for (i, path) in paths.iter().enumerate() {
|
||||
if let Some(cached) = self.metadata_cache.get(path).await {
|
||||
results.push((i, Ok(cached)));
|
||||
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
} else {
|
||||
cache_futures.push((i, path.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
// For cache misses, read from filesystem
|
||||
if !cache_futures.is_empty() {
|
||||
let mut fs_results = Vec::new();
|
||||
|
||||
for (i, path) in cache_futures {
|
||||
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
match tokio::fs::read(&path).await {
|
||||
Ok(data) => {
|
||||
let mut meta = FileMeta::default();
|
||||
match meta.unmarshal_msg(&data) {
|
||||
Ok(_) => {
|
||||
let arc_meta = Arc::new(meta);
|
||||
self.metadata_cache.insert(path, arc_meta.clone()).await;
|
||||
fs_results.push((i, Ok(arc_meta)));
|
||||
}
|
||||
Err(e) => {
|
||||
fs_results.push((i, Err(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
fs_results.push((i, Err(rustfs_filemeta::Error::Unexpected)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results.extend(fs_results);
|
||||
}
|
||||
|
||||
// Sort results back to original order
|
||||
results.sort_by_key(|(i, _)| *i);
|
||||
results.into_iter().map(|(_, result)| result).collect()
|
||||
}
|
||||
|
||||
// Invalidate cache entries for a path
|
||||
pub async fn invalidate(&self, path: &Path) {
|
||||
self.metadata_cache.remove(path).await;
|
||||
self.file_content_cache.remove(path).await;
|
||||
}
|
||||
|
||||
// Get cache statistics
|
||||
pub fn get_stats(&self) -> FileCacheStats {
|
||||
let hits = self.cache_hits.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let misses = self.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let hit_rate = if hits + misses > 0 {
|
||||
(hits as f64 / (hits + misses) as f64) * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
FileCacheStats {
|
||||
metadata_cache_size: self.metadata_cache.entry_count(),
|
||||
content_cache_size: self.file_content_cache.entry_count(),
|
||||
cache_hits: hits,
|
||||
cache_misses: misses,
|
||||
hit_rate,
|
||||
total_weight: 0, // Simplified for compatibility
|
||||
}
|
||||
}
|
||||
|
||||
// Clear all caches
|
||||
pub async fn clear(&self) {
|
||||
self.metadata_cache.invalidate_all();
|
||||
self.file_content_cache.invalidate_all();
|
||||
|
||||
// Wait for invalidation to complete
|
||||
self.metadata_cache.run_pending_tasks().await;
|
||||
self.file_content_cache.run_pending_tasks().await;
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for OptimizedFileCache {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
metadata_cache: self.metadata_cache.clone(),
|
||||
file_content_cache: self.file_content_cache.clone(),
|
||||
cache_hits: std::sync::atomic::AtomicU64::new(self.cache_hits.load(std::sync::atomic::Ordering::Relaxed)),
|
||||
cache_misses: std::sync::atomic::AtomicU64::new(self.cache_misses.load(std::sync::atomic::Ordering::Relaxed)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FileCacheStats {
|
||||
pub metadata_cache_size: u64,
|
||||
pub content_cache_size: u64,
|
||||
pub cache_hits: u64,
|
||||
pub cache_misses: u64,
|
||||
pub hit_rate: f64,
|
||||
pub total_weight: u64,
|
||||
}
|
||||
|
||||
impl Default for OptimizedFileCache {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Global cache instance
|
||||
use std::sync::OnceLock;
|
||||
|
||||
static GLOBAL_FILE_CACHE: OnceLock<OptimizedFileCache> = OnceLock::new();
|
||||
|
||||
pub fn get_global_file_cache() -> &'static OptimizedFileCache {
|
||||
GLOBAL_FILE_CACHE.get_or_init(OptimizedFileCache::new)
|
||||
}
|
||||
|
||||
// Utility functions for common operations
|
||||
pub async fn read_metadata_cached(path: PathBuf) -> Result<Arc<FileMeta>> {
|
||||
get_global_file_cache().get_metadata(path).await
|
||||
}
|
||||
|
||||
pub async fn read_file_content_cached(path: PathBuf) -> Result<Bytes> {
|
||||
get_global_file_cache().get_file_content(path).await
|
||||
}
|
||||
|
||||
pub async fn prefetch_metadata_patterns(base_path: &Path, patterns: &[&str]) {
|
||||
get_global_file_cache().prefetch_related(base_path, patterns).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Write;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_cache_basic() {
|
||||
let cache = OptimizedFileCache::new();
|
||||
|
||||
// Create a temporary file
|
||||
let dir = tempdir().unwrap();
|
||||
let file_path = dir.path().join("test.txt");
|
||||
let mut file = std::fs::File::create(&file_path).unwrap();
|
||||
writeln!(file, "test content").unwrap();
|
||||
drop(file);
|
||||
|
||||
// First read should be cache miss
|
||||
let content1 = cache.get_file_content(file_path.clone()).await.unwrap();
|
||||
assert_eq!(content1, Bytes::from("test content\n"));
|
||||
|
||||
// Second read should be cache hit
|
||||
let content2 = cache.get_file_content(file_path.clone()).await.unwrap();
|
||||
assert_eq!(content2, content1);
|
||||
|
||||
let stats = cache.get_stats();
|
||||
assert!(stats.cache_hits > 0);
|
||||
assert!(stats.cache_misses > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_batch_read() {
|
||||
let cache = OptimizedFileCache::new();
|
||||
|
||||
// Create test files
|
||||
let dir = tempdir().unwrap();
|
||||
let mut paths = Vec::new();
|
||||
|
||||
for i in 0..5 {
|
||||
let file_path = dir.path().join(format!("test_{}.txt", i));
|
||||
let mut file = std::fs::File::create(&file_path).unwrap();
|
||||
writeln!(file, "content {}", i).unwrap();
|
||||
paths.push(file_path);
|
||||
}
|
||||
|
||||
// Note: This test would need actual FileMeta files to work properly
|
||||
// For now, we just test that the function runs without errors
|
||||
let results = cache.get_metadata_batch(paths).await;
|
||||
assert_eq!(results.len(), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cache_invalidation() {
|
||||
let cache = OptimizedFileCache::new();
|
||||
|
||||
let dir = tempdir().unwrap();
|
||||
let file_path = dir.path().join("test.txt");
|
||||
let mut file = std::fs::File::create(&file_path).unwrap();
|
||||
writeln!(file, "test content").unwrap();
|
||||
drop(file);
|
||||
|
||||
// Read file to populate cache
|
||||
let _ = cache.get_file_content(file_path.clone()).await.unwrap();
|
||||
|
||||
// Invalidate cache
|
||||
cache.invalidate(&file_path).await;
|
||||
|
||||
// Next read should be cache miss again
|
||||
let _ = cache.get_file_content(file_path.clone()).await.unwrap();
|
||||
|
||||
let stats = cache.get_stats();
|
||||
assert!(stats.cache_misses >= 2);
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@
|
||||
extern crate core;
|
||||
|
||||
pub mod admin_server_info;
|
||||
pub mod batch_processor;
|
||||
pub mod bitrot;
|
||||
pub mod bucket;
|
||||
pub mod cache_value;
|
||||
@@ -29,6 +30,7 @@ pub mod disks_layout;
|
||||
pub mod endpoints;
|
||||
pub mod erasure_coding;
|
||||
pub mod error;
|
||||
pub mod file_cache;
|
||||
pub mod global;
|
||||
pub mod lock_utils;
|
||||
pub mod metrics_realtime;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
|
||||
use crate::batch_processor::{AsyncBatchProcessor, get_global_processors};
|
||||
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
|
||||
use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE;
|
||||
use crate::bucket::versioning::VersioningApi;
|
||||
@@ -232,7 +233,10 @@ impl SetDisks {
|
||||
});
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
// Use optimized batch processor for disk info retrieval
|
||||
let processor = get_global_processors().metadata_processor();
|
||||
let results = processor.execute_batch(futures).await;
|
||||
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(res) => {
|
||||
@@ -507,21 +511,28 @@ impl SetDisks {
|
||||
|
||||
#[tracing::instrument(skip(disks))]
|
||||
async fn cleanup_multipart_path(disks: &[Option<DiskStore>], paths: &[String]) {
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
|
||||
let mut errs = Vec::with_capacity(disks.len());
|
||||
|
||||
for disk in disks.iter() {
|
||||
futures.push(async move {
|
||||
if let Some(disk) = disk {
|
||||
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await
|
||||
} else {
|
||||
Err(DiskError::DiskNotFound)
|
||||
// Use improved simple batch processor instead of join_all for better performance
|
||||
let processor = get_global_processors().write_processor();
|
||||
|
||||
let tasks: Vec<_> = disks
|
||||
.iter()
|
||||
.map(|disk| {
|
||||
let disk = disk.clone();
|
||||
let paths = paths.to_vec();
|
||||
|
||||
async move {
|
||||
if let Some(disk) = disk {
|
||||
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, &paths).await
|
||||
} else {
|
||||
Err(DiskError::DiskNotFound)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
.collect();
|
||||
|
||||
let results = join_all(futures).await;
|
||||
let results = processor.execute_batch(tasks).await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
@@ -545,21 +556,32 @@ impl SetDisks {
|
||||
part_numbers: &[usize],
|
||||
read_quorum: usize,
|
||||
) -> disk::error::Result<Vec<ObjectPartInfo>> {
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
for (i, disk) in disks.iter().enumerate() {
|
||||
futures.push(async move {
|
||||
if let Some(disk) = disk {
|
||||
disk.read_parts(bucket, part_meta_paths).await
|
||||
} else {
|
||||
Err(DiskError::DiskNotFound)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut errs = Vec::with_capacity(disks.len());
|
||||
let mut object_parts = Vec::with_capacity(disks.len());
|
||||
|
||||
let results = join_all(futures).await;
|
||||
// Use batch processor for better performance
|
||||
let processor = get_global_processors().read_processor();
|
||||
let bucket = bucket.to_string();
|
||||
let part_meta_paths = part_meta_paths.to_vec();
|
||||
|
||||
let tasks: Vec<_> = disks
|
||||
.iter()
|
||||
.map(|disk| {
|
||||
let disk = disk.clone();
|
||||
let bucket = bucket.clone();
|
||||
let part_meta_paths = part_meta_paths.clone();
|
||||
|
||||
async move {
|
||||
if let Some(disk) = disk {
|
||||
disk.read_parts(&bucket, &part_meta_paths).await
|
||||
} else {
|
||||
Err(DiskError::DiskNotFound)
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = processor.execute_batch(tasks).await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(res) => {
|
||||
@@ -1369,22 +1391,71 @@ impl SetDisks {
|
||||
})
|
||||
});
|
||||
|
||||
// Wait for all tasks to complete
|
||||
let results = join_all(futures).await;
|
||||
|
||||
for result in results {
|
||||
match result? {
|
||||
Ok(res) => {
|
||||
ress.push(res);
|
||||
errors.push(None);
|
||||
}
|
||||
Err(e) => {
|
||||
match result {
|
||||
Ok(res) => match res {
|
||||
Ok(file_info) => {
|
||||
ress.push(file_info);
|
||||
errors.push(None);
|
||||
}
|
||||
Err(e) => {
|
||||
ress.push(FileInfo::default());
|
||||
errors.push(Some(e));
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
ress.push(FileInfo::default());
|
||||
errors.push(Some(e));
|
||||
errors.push(Some(DiskError::Unexpected));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((ress, errors))
|
||||
}
|
||||
|
||||
// Optimized version using batch processor with quorum support
|
||||
pub async fn read_version_optimized(
|
||||
&self,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
version_id: &str,
|
||||
opts: &ReadOptions,
|
||||
) -> Result<Vec<rustfs_filemeta::FileInfo>> {
|
||||
// Use existing disk selection logic
|
||||
let disks = self.disks.read().await;
|
||||
let required_reads = self.format.erasure.sets.len();
|
||||
|
||||
// Clone parameters outside the closure to avoid lifetime issues
|
||||
let bucket = bucket.to_string();
|
||||
let object = object.to_string();
|
||||
let version_id = version_id.to_string();
|
||||
let opts = opts.clone();
|
||||
|
||||
let processor = get_global_processors().read_processor();
|
||||
let tasks: Vec<_> = disks
|
||||
.iter()
|
||||
.take(required_reads + 2) // Read a few extra for reliability
|
||||
.filter_map(|disk| {
|
||||
disk.as_ref().map(|d| {
|
||||
let disk = d.clone();
|
||||
let bucket = bucket.clone();
|
||||
let object = object.clone();
|
||||
let version_id = version_id.clone();
|
||||
let opts = opts.clone();
|
||||
|
||||
async move { disk.read_version(&bucket, &bucket, &object, &version_id, &opts).await }
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
match processor.execute_batch_with_quorum(tasks, required_reads).await {
|
||||
Ok(results) => Ok(results),
|
||||
Err(_) => Err(DiskError::FileNotFound.into()), // Use existing error type
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_all_xl(
|
||||
disks: &[Option<DiskStore>],
|
||||
bucket: &str,
|
||||
@@ -1403,10 +1474,11 @@ impl SetDisks {
|
||||
object: &str,
|
||||
read_data: bool,
|
||||
) -> (Vec<Option<RawFileInfo>>, Vec<Option<DiskError>>) {
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
let mut ress = Vec::with_capacity(disks.len());
|
||||
let mut errors = Vec::with_capacity(disks.len());
|
||||
|
||||
let mut futures = Vec::with_capacity(disks.len());
|
||||
|
||||
for disk in disks.iter() {
|
||||
futures.push(async move {
|
||||
if let Some(disk) = disk {
|
||||
|
||||
@@ -302,17 +302,19 @@ impl TierConfigMgr {
|
||||
}
|
||||
|
||||
pub async fn get_driver<'a>(&'a mut self, tier_name: &str) -> std::result::Result<&'a WarmBackendImpl, AdminError> {
|
||||
Ok(match self.driver_cache.entry(tier_name.to_string()) {
|
||||
Entry::Occupied(e) => e.into_mut(),
|
||||
Entry::Vacant(e) => {
|
||||
let t = self.tiers.get(tier_name);
|
||||
if t.is_none() {
|
||||
return Err(ERR_TIER_NOT_FOUND.clone());
|
||||
}
|
||||
let d = new_warm_backend(t.expect("err"), false).await?;
|
||||
e.insert(d)
|
||||
}
|
||||
})
|
||||
// Return cached driver if present
|
||||
if self.driver_cache.contains_key(tier_name) {
|
||||
return Ok(self.driver_cache.get(tier_name).unwrap());
|
||||
}
|
||||
|
||||
// Get tier configuration and create new driver
|
||||
let tier_config = self.tiers.get(tier_name).ok_or_else(|| ERR_TIER_NOT_FOUND.clone())?;
|
||||
|
||||
let driver = new_warm_backend(tier_config, false).await?;
|
||||
|
||||
// Insert and return reference
|
||||
self.driver_cache.insert(tier_name.to_string(), driver);
|
||||
Ok(self.driver_cache.get(tier_name).unwrap())
|
||||
}
|
||||
|
||||
pub async fn reload(&mut self, api: Arc<ECStore>) -> std::result::Result<(), std::io::Error> {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -27,7 +27,7 @@ use crate::fast_lock::{
|
||||
/// High-performance object lock manager
|
||||
#[derive(Debug)]
|
||||
pub struct FastObjectLockManager {
|
||||
shards: Vec<Arc<LockShard>>,
|
||||
pub shards: Vec<Arc<LockShard>>,
|
||||
shard_mask: usize,
|
||||
config: LockConfig,
|
||||
metrics: Arc<GlobalMetrics>,
|
||||
@@ -66,7 +66,12 @@ impl FastObjectLockManager {
|
||||
pub async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {
|
||||
let shard = self.get_shard(&request.key);
|
||||
match shard.acquire_lock(&request).await {
|
||||
Ok(()) => Ok(FastLockGuard::new(request.key, request.mode, request.owner, shard.clone())),
|
||||
Ok(()) => {
|
||||
let guard = FastLockGuard::new(request.key, request.mode, request.owner, shard.clone());
|
||||
// Register guard to prevent premature cleanup
|
||||
shard.register_guard(guard.guard_id());
|
||||
Ok(guard)
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
@@ -117,6 +122,54 @@ impl FastObjectLockManager {
|
||||
self.acquire_lock(request).await
|
||||
}
|
||||
|
||||
/// Acquire high-priority read lock - optimized for database queries
|
||||
pub async fn acquire_high_priority_read_lock(
|
||||
&self,
|
||||
bucket: impl Into<Arc<str>>,
|
||||
object: impl Into<Arc<str>>,
|
||||
owner: impl Into<Arc<str>>,
|
||||
) -> Result<FastLockGuard, LockResult> {
|
||||
let request =
|
||||
ObjectLockRequest::new_read(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::High);
|
||||
self.acquire_lock(request).await
|
||||
}
|
||||
|
||||
/// Acquire high-priority write lock - optimized for database queries
|
||||
pub async fn acquire_high_priority_write_lock(
|
||||
&self,
|
||||
bucket: impl Into<Arc<str>>,
|
||||
object: impl Into<Arc<str>>,
|
||||
owner: impl Into<Arc<str>>,
|
||||
) -> Result<FastLockGuard, LockResult> {
|
||||
let request =
|
||||
ObjectLockRequest::new_write(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::High);
|
||||
self.acquire_lock(request).await
|
||||
}
|
||||
|
||||
/// Acquire critical priority read lock - for system operations
|
||||
pub async fn acquire_critical_read_lock(
|
||||
&self,
|
||||
bucket: impl Into<Arc<str>>,
|
||||
object: impl Into<Arc<str>>,
|
||||
owner: impl Into<Arc<str>>,
|
||||
) -> Result<FastLockGuard, LockResult> {
|
||||
let request =
|
||||
ObjectLockRequest::new_read(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::Critical);
|
||||
self.acquire_lock(request).await
|
||||
}
|
||||
|
||||
/// Acquire critical priority write lock - for system operations
|
||||
pub async fn acquire_critical_write_lock(
|
||||
&self,
|
||||
bucket: impl Into<Arc<str>>,
|
||||
object: impl Into<Arc<str>>,
|
||||
owner: impl Into<Arc<str>>,
|
||||
) -> Result<FastLockGuard, LockResult> {
|
||||
let request =
|
||||
ObjectLockRequest::new_write(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::Critical);
|
||||
self.acquire_lock(request).await
|
||||
}
|
||||
|
||||
/// Acquire multiple locks atomically - optimized version
|
||||
pub async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult {
|
||||
// Pre-sort requests by (shard_id, key) to avoid deadlocks
|
||||
@@ -304,7 +357,7 @@ impl FastObjectLockManager {
|
||||
}
|
||||
|
||||
/// Get shard for object key
|
||||
fn get_shard(&self, key: &crate::fast_lock::types::ObjectKey) -> &Arc<LockShard> {
|
||||
pub fn get_shard(&self, key: &crate::fast_lock::types::ObjectKey) -> &Arc<LockShard> {
|
||||
let index = key.shard_index(self.shard_mask);
|
||||
&self.shards[index]
|
||||
}
|
||||
@@ -362,6 +415,18 @@ impl Drop for FastObjectLockManager {
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for FastObjectLockManager {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
shards: self.shards.clone(),
|
||||
shard_mask: self.shard_mask,
|
||||
config: self.config.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
cleanup_handle: RwLock::new(None), // Don't clone the cleanup task
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl LockManager for FastObjectLockManager {
|
||||
async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {
|
||||
|
||||
@@ -53,8 +53,11 @@ pub const DEFAULT_SHARD_COUNT: usize = 1024;
|
||||
/// Default lock timeout
|
||||
pub const DEFAULT_LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
/// Default acquire timeout
|
||||
pub const DEFAULT_ACQUIRE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
|
||||
/// Default acquire timeout - increased for database workloads
|
||||
pub const DEFAULT_ACQUIRE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
/// Maximum acquire timeout for high-load scenarios
|
||||
pub const MAX_ACQUIRE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
|
||||
|
||||
/// Lock cleanup interval
|
||||
pub const CLEANUP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
/// Optimized notification pool to reduce memory overhead and thundering herd effects
|
||||
static NOTIFY_POOL: Lazy<Vec<Arc<Notify>>> = Lazy::new(|| (0..64).map(|_| Arc::new(Notify::new())).collect());
|
||||
/// Increased pool size for better performance under high concurrency
|
||||
static NOTIFY_POOL: Lazy<Vec<Arc<Notify>>> = Lazy::new(|| (0..128).map(|_| Arc::new(Notify::new())).collect());
|
||||
|
||||
/// Optimized notification system for object locks
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::fast_lock::{
|
||||
state::ObjectLockState,
|
||||
types::{LockMode, LockResult, ObjectKey, ObjectLockRequest},
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// Lock shard to reduce global contention
|
||||
#[derive(Debug)]
|
||||
@@ -36,6 +37,8 @@ pub struct LockShard {
|
||||
metrics: ShardMetrics,
|
||||
/// Shard ID for debugging
|
||||
_shard_id: usize,
|
||||
/// Active guard IDs to prevent cleanup of locks with live guards
|
||||
active_guards: parking_lot::Mutex<HashSet<u64>>,
|
||||
}
|
||||
|
||||
impl LockShard {
|
||||
@@ -45,6 +48,7 @@ impl LockShard {
|
||||
object_pool: ObjectStatePool::new(),
|
||||
metrics: ShardMetrics::new(),
|
||||
_shard_id: shard_id,
|
||||
active_guards: parking_lot::Mutex::new(HashSet::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,7 +127,12 @@ impl LockShard {
|
||||
|
||||
/// Slow path with async waiting
|
||||
async fn acquire_lock_slow_path(&self, request: &ObjectLockRequest, start_time: Instant) -> Result<(), LockResult> {
|
||||
let deadline = start_time + request.acquire_timeout;
|
||||
// Use adaptive timeout based on current load and request priority
|
||||
let adaptive_timeout = self.calculate_adaptive_timeout(request);
|
||||
let deadline = start_time + adaptive_timeout;
|
||||
|
||||
let mut retry_count = 0u32;
|
||||
const MAX_RETRIES: u32 = 10;
|
||||
|
||||
loop {
|
||||
// Get or create object state
|
||||
@@ -157,8 +166,22 @@ impl LockShard {
|
||||
return Err(LockResult::Timeout);
|
||||
}
|
||||
|
||||
// Wait for notification using optimized notify system
|
||||
// Use intelligent wait strategy: mix of notification wait and exponential backoff
|
||||
let remaining = deadline - Instant::now();
|
||||
|
||||
if retry_count < MAX_RETRIES && remaining > Duration::from_millis(10) {
|
||||
// For early retries, use a brief exponential backoff instead of full notification wait
|
||||
let backoff_ms = std::cmp::min(10 << retry_count, 100); // 10ms, 20ms, 40ms, 80ms, 100ms max
|
||||
let backoff_duration = Duration::from_millis(backoff_ms);
|
||||
|
||||
if backoff_duration < remaining {
|
||||
tokio::time::sleep(backoff_duration).await;
|
||||
retry_count += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// If we've exhausted quick retries or have little time left, use notification wait
|
||||
let wait_result = match request.mode {
|
||||
LockMode::Shared => {
|
||||
state.atomic_state.inc_readers_waiting();
|
||||
@@ -179,7 +202,7 @@ impl LockShard {
|
||||
return Err(LockResult::Timeout);
|
||||
}
|
||||
|
||||
// Continue the loop to try acquisition again
|
||||
retry_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,10 +226,30 @@ impl LockShard {
|
||||
should_cleanup = !state.is_locked() && !state.atomic_state.has_waiters();
|
||||
} else {
|
||||
should_cleanup = false;
|
||||
// Additional diagnostics for release failures
|
||||
let current_mode = state.current_mode();
|
||||
let is_locked = state.is_locked();
|
||||
let has_waiters = state.atomic_state.has_waiters();
|
||||
|
||||
tracing::debug!(
|
||||
"Lock release failed in shard: key={}, owner={}, mode={:?}, current_mode={:?}, is_locked={}, has_waiters={}",
|
||||
key,
|
||||
owner,
|
||||
mode,
|
||||
current_mode,
|
||||
is_locked,
|
||||
has_waiters
|
||||
);
|
||||
}
|
||||
} else {
|
||||
result = false;
|
||||
should_cleanup = false;
|
||||
tracing::debug!(
|
||||
"Lock release failed - key not found in shard: key={}, owner={}, mode={:?}",
|
||||
key,
|
||||
owner,
|
||||
mode
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,6 +261,134 @@ impl LockShard {
|
||||
result
|
||||
}
|
||||
|
||||
/// Release lock with guard ID tracking for double-release prevention
|
||||
pub fn release_lock_with_guard(&self, key: &ObjectKey, owner: &Arc<str>, mode: LockMode, guard_id: u64) -> bool {
|
||||
// First, try to remove the guard from active set
|
||||
let guard_was_active = {
|
||||
let mut guards = self.active_guards.lock();
|
||||
guards.remove(&guard_id)
|
||||
};
|
||||
|
||||
// If guard was not active, this is a double-release attempt
|
||||
if !guard_was_active {
|
||||
tracing::debug!(
|
||||
"Double-release attempt blocked: key={}, owner={}, mode={:?}, guard_id={}",
|
||||
key,
|
||||
owner,
|
||||
mode,
|
||||
guard_id
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Proceed with normal release
|
||||
let should_cleanup;
|
||||
let result;
|
||||
|
||||
{
|
||||
let objects = self.objects.read();
|
||||
if let Some(state) = objects.get(key) {
|
||||
result = match mode {
|
||||
LockMode::Shared => state.release_shared(owner),
|
||||
LockMode::Exclusive => state.release_exclusive(owner),
|
||||
};
|
||||
|
||||
if result {
|
||||
self.metrics.record_release();
|
||||
should_cleanup = !state.is_locked() && !state.atomic_state.has_waiters();
|
||||
} else {
|
||||
should_cleanup = false;
|
||||
}
|
||||
} else {
|
||||
result = false;
|
||||
should_cleanup = false;
|
||||
}
|
||||
}
|
||||
|
||||
if should_cleanup {
|
||||
self.schedule_cleanup(key.clone());
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Register a guard to prevent premature cleanup
|
||||
pub fn register_guard(&self, guard_id: u64) {
|
||||
let mut guards = self.active_guards.lock();
|
||||
guards.insert(guard_id);
|
||||
}
|
||||
|
||||
/// Unregister a guard (called when guard is dropped)
|
||||
pub fn unregister_guard(&self, guard_id: u64) {
|
||||
let mut guards = self.active_guards.lock();
|
||||
guards.remove(&guard_id);
|
||||
}
|
||||
|
||||
/// Get count of active guards (for testing)
|
||||
#[cfg(test)]
|
||||
pub fn active_guard_count(&self) -> usize {
|
||||
let guards = self.active_guards.lock();
|
||||
guards.len()
|
||||
}
|
||||
|
||||
/// Check if a guard is active (for testing)
|
||||
#[cfg(test)]
|
||||
pub fn is_guard_active(&self, guard_id: u64) -> bool {
|
||||
let guards = self.active_guards.lock();
|
||||
guards.contains(&guard_id)
|
||||
}
|
||||
|
||||
/// Calculate adaptive timeout based on current system load and request priority
|
||||
fn calculate_adaptive_timeout(&self, request: &ObjectLockRequest) -> Duration {
|
||||
let base_timeout = request.acquire_timeout;
|
||||
|
||||
// Get current shard load metrics
|
||||
let lock_count = {
|
||||
let objects = self.objects.read();
|
||||
objects.len()
|
||||
};
|
||||
|
||||
let active_guard_count = {
|
||||
let guards = self.active_guards.lock();
|
||||
guards.len()
|
||||
};
|
||||
|
||||
// Calculate load factor with more generous thresholds for database workloads
|
||||
let total_load = (lock_count + active_guard_count) as f64;
|
||||
let load_factor = total_load / 500.0; // Lowered threshold for faster scaling
|
||||
|
||||
// More aggressive priority multipliers for database scenarios
|
||||
let priority_multiplier = match request.priority {
|
||||
crate::fast_lock::types::LockPriority::Critical => 3.0, // Increased
|
||||
crate::fast_lock::types::LockPriority::High => 2.0, // Increased
|
||||
crate::fast_lock::types::LockPriority::Normal => 1.2, // Slightly increased base
|
||||
crate::fast_lock::types::LockPriority::Low => 0.9,
|
||||
};
|
||||
|
||||
// More generous load-based scaling
|
||||
let load_multiplier = if load_factor > 2.0 {
|
||||
// Very high load: drastically extend timeout
|
||||
1.0 + (load_factor * 2.0)
|
||||
} else if load_factor > 1.0 {
|
||||
// High load: significantly extend timeout
|
||||
1.0 + (load_factor * 1.8)
|
||||
} else if load_factor > 0.3 {
|
||||
// Medium load: moderately extend timeout
|
||||
1.0 + (load_factor * 1.2)
|
||||
} else {
|
||||
// Low load: still give some buffer
|
||||
1.1
|
||||
};
|
||||
|
||||
let total_multiplier = priority_multiplier * load_multiplier;
|
||||
let adaptive_timeout_secs =
|
||||
(base_timeout.as_secs_f64() * total_multiplier).min(crate::fast_lock::MAX_ACQUIRE_TIMEOUT.as_secs_f64());
|
||||
|
||||
// Ensure minimum reasonable timeout even for low priority
|
||||
let min_timeout_secs = base_timeout.as_secs_f64() * 0.8;
|
||||
Duration::from_secs_f64(adaptive_timeout_secs.max(min_timeout_secs))
|
||||
}
|
||||
|
||||
/// Batch acquire locks with ordering to prevent deadlocks
|
||||
pub async fn acquire_locks_batch(
|
||||
&self,
|
||||
@@ -324,22 +495,44 @@ impl LockShard {
|
||||
pub fn adaptive_cleanup(&self) -> usize {
|
||||
let current_load = self.current_load_factor();
|
||||
let lock_count = self.lock_count();
|
||||
let active_guard_count = self.active_guards.lock().len();
|
||||
|
||||
// Be much more conservative if there are active guards or very high load
|
||||
if active_guard_count > 0 && current_load > 0.8 {
|
||||
tracing::debug!(
|
||||
"Skipping aggressive cleanup due to {} active guards and high load ({:.2})",
|
||||
active_guard_count,
|
||||
current_load
|
||||
);
|
||||
// Only clean very old entries when under high load with active guards
|
||||
return self.cleanup_expired_batch(3, 1_200_000); // 20 minutes, smaller batches
|
||||
}
|
||||
|
||||
// Under extreme load, skip cleanup entirely to reduce contention
|
||||
if current_load > 1.5 && active_guard_count > 10 {
|
||||
tracing::debug!(
|
||||
"Skipping all cleanup due to extreme load ({:.2}) and {} active guards",
|
||||
current_load,
|
||||
active_guard_count
|
||||
);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Dynamically adjust cleanup strategy based on load
|
||||
let cleanup_batch_size = match current_load {
|
||||
load if load > 0.9 => lock_count / 20, // High load: small batch cleanup
|
||||
load if load > 0.7 => lock_count / 10, // Medium load: moderate cleanup
|
||||
_ => lock_count / 5, // Low load: aggressive cleanup
|
||||
load if load > 0.9 => lock_count / 50, // Much smaller batches for high load
|
||||
load if load > 0.7 => lock_count / 20, // Smaller batches for medium load
|
||||
_ => lock_count / 10, // More conservative even for low load
|
||||
};
|
||||
|
||||
// Use longer timeout for high load scenarios
|
||||
// Use much longer timeouts to prevent premature cleanup
|
||||
let cleanup_threshold_millis = match current_load {
|
||||
load if load > 0.8 => 300_000, // 5 minutes for high load
|
||||
load if load > 0.5 => 180_000, // 3 minutes for medium load
|
||||
_ => 60_000, // 1 minute for low load
|
||||
load if load > 0.8 => 600_000, // 10 minutes for high load
|
||||
load if load > 0.5 => 300_000, // 5 minutes for medium load
|
||||
_ => 120_000, // 2 minutes for low load
|
||||
};
|
||||
|
||||
self.cleanup_expired_batch(cleanup_batch_size.max(10), cleanup_threshold_millis)
|
||||
self.cleanup_expired_batch_protected(cleanup_batch_size.max(5), cleanup_threshold_millis)
|
||||
}
|
||||
|
||||
/// Cleanup expired and unused locks
|
||||
@@ -378,6 +571,19 @@ impl LockShard {
|
||||
cleaned
|
||||
}
|
||||
|
||||
/// Protected batch cleanup that respects active guards
|
||||
fn cleanup_expired_batch_protected(&self, max_batch_size: usize, cleanup_threshold_millis: u64) -> usize {
|
||||
let active_guards = self.active_guards.lock();
|
||||
let guard_count = active_guards.len();
|
||||
drop(active_guards); // Release lock early
|
||||
|
||||
if guard_count > 0 {
|
||||
tracing::debug!("Cleanup with {} active guards, being conservative", guard_count);
|
||||
}
|
||||
|
||||
self.cleanup_expired_batch(max_batch_size, cleanup_threshold_millis)
|
||||
}
|
||||
|
||||
/// Batch cleanup with limited processing to avoid blocking
|
||||
fn cleanup_expired_batch(&self, max_batch_size: usize, cleanup_threshold_millis: u64) -> usize {
|
||||
let mut cleaned = 0;
|
||||
|
||||
@@ -373,11 +373,23 @@ impl ObjectLockState {
|
||||
}
|
||||
true
|
||||
} else {
|
||||
// Inconsistency - re-add owner
|
||||
// Inconsistency detected - atomic state shows no shared lock but owner was found
|
||||
tracing::warn!(
|
||||
"Atomic state inconsistency during shared lock release: owner={}, remaining_owners={}",
|
||||
owner,
|
||||
shared.len()
|
||||
);
|
||||
// Re-add owner to maintain consistency
|
||||
shared.push(owner.clone());
|
||||
false
|
||||
}
|
||||
} else {
|
||||
// Owner not found in shared owners list
|
||||
tracing::debug!(
|
||||
"Shared lock release failed - owner not found: owner={}, current_owners={:?}",
|
||||
owner,
|
||||
shared.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
@@ -401,9 +413,21 @@ impl ObjectLockState {
|
||||
}
|
||||
true
|
||||
} else {
|
||||
// Atomic state inconsistency - current owner matches but atomic release failed
|
||||
tracing::warn!(
|
||||
"Atomic state inconsistency during exclusive lock release: owner={}, atomic_state={:b}",
|
||||
owner,
|
||||
self.atomic_state.state.load(Ordering::Acquire)
|
||||
);
|
||||
false
|
||||
}
|
||||
} else {
|
||||
// Owner mismatch
|
||||
tracing::debug!(
|
||||
"Exclusive lock release failed - owner mismatch: expected_owner={}, actual_owner={:?}",
|
||||
owner,
|
||||
current.as_ref().map(|s| s.as_ref())
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{Deserialize, Serialize, de::Error};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use super::Policy;
|
||||
@@ -59,15 +59,17 @@ impl TryFrom<Vec<u8>> for PolicyDoc {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
match serde_json::from_slice::<PolicyDoc>(&value) {
|
||||
Ok(res) => Ok(res),
|
||||
Err(err) => match serde_json::from_slice::<Policy>(&value) {
|
||||
Ok(res2) => Ok(Self {
|
||||
policy: res2,
|
||||
..Default::default()
|
||||
}),
|
||||
Err(_) => Err(err),
|
||||
},
|
||||
// Try to parse as PolicyDoc first
|
||||
if let Ok(policy_doc) = serde_json::from_slice::<PolicyDoc>(&value) {
|
||||
return Ok(policy_doc);
|
||||
}
|
||||
|
||||
// Fall back to parsing as Policy and wrap in PolicyDoc
|
||||
serde_json::from_slice::<Policy>(&value)
|
||||
.map(|policy| Self {
|
||||
policy,
|
||||
..Default::default()
|
||||
})
|
||||
.map_err(|_| serde_json::Error::custom("Failed to parse as PolicyDoc or Policy".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,17 +16,40 @@ use bytes::Bytes;
|
||||
use futures::pin_mut;
|
||||
use futures::{Stream, StreamExt};
|
||||
use std::net::Ipv6Addr;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::{LazyLock, Mutex};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
collections::{HashMap, HashSet},
|
||||
fmt::Display,
|
||||
net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use transform_stream::AsyncTryStream;
|
||||
use url::{Host, Url};
|
||||
|
||||
static LOCAL_IPS: LazyLock<Vec<IpAddr>> = LazyLock::new(|| must_get_local_ips().unwrap());
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DnsCacheEntry {
|
||||
ips: HashSet<IpAddr>,
|
||||
cached_at: Instant,
|
||||
}
|
||||
|
||||
impl DnsCacheEntry {
|
||||
fn new(ips: HashSet<IpAddr>) -> Self {
|
||||
Self {
|
||||
ips,
|
||||
cached_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_expired(&self, ttl: Duration) -> bool {
|
||||
self.cached_at.elapsed() > ttl
|
||||
}
|
||||
}
|
||||
|
||||
static DNS_CACHE: LazyLock<Mutex<HashMap<String, DnsCacheEntry>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||
const DNS_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
|
||||
|
||||
/// helper for validating if the provided arg is an ip address.
|
||||
pub fn is_socket_addr(addr: &str) -> bool {
|
||||
// TODO IPv6 zone information?
|
||||
@@ -87,19 +110,19 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R
|
||||
}
|
||||
|
||||
/// returns IP address of given host using layered DNS resolution.
|
||||
pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
|
||||
///
|
||||
/// This is the async version of `get_host_ip()` that provides enhanced DNS resolution
|
||||
/// with Kubernetes support when the "net" feature is enabled.
|
||||
pub async fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
|
||||
match host {
|
||||
Host::Domain(domain) => {
|
||||
#[cfg(feature = "net")]
|
||||
{
|
||||
use crate::dns_resolver::resolve_domain;
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
handle.block_on(async {
|
||||
match resolve_domain(domain).await {
|
||||
Ok(ips) => Ok(ips.into_iter().collect()),
|
||||
Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))),
|
||||
}
|
||||
})
|
||||
match resolve_domain(domain).await {
|
||||
Ok(ips) => Ok(ips.into_iter().collect()),
|
||||
Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))),
|
||||
}
|
||||
}
|
||||
#[cfg(not(feature = "net"))]
|
||||
{
|
||||
@@ -128,17 +151,41 @@ pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
|
||||
|
||||
/// returns IP address of given host using standard resolution.
|
||||
///
|
||||
/// **Note**: This function uses standard library DNS resolution.
|
||||
/// **Note**: This function uses standard library DNS resolution with caching.
|
||||
/// For enhanced DNS resolution with Kubernetes support, use `get_host_ip_async()`.
|
||||
pub fn get_host_ip(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
|
||||
match host {
|
||||
Host::Domain(domain) => match (domain, 0)
|
||||
.to_socket_addrs()
|
||||
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
|
||||
{
|
||||
Ok(ips) => Ok(ips),
|
||||
Err(err) => Err(std::io::Error::other(err)),
|
||||
},
|
||||
Host::Domain(domain) => {
|
||||
// Check cache first
|
||||
if let Ok(mut cache) = DNS_CACHE.lock() {
|
||||
if let Some(entry) = cache.get(domain) {
|
||||
if !entry.is_expired(DNS_CACHE_TTL) {
|
||||
return Ok(entry.ips.clone());
|
||||
}
|
||||
// Remove expired entry
|
||||
cache.remove(domain);
|
||||
}
|
||||
}
|
||||
|
||||
// Perform DNS resolution
|
||||
match (domain, 0)
|
||||
.to_socket_addrs()
|
||||
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
|
||||
{
|
||||
Ok(ips) => {
|
||||
// Cache the result
|
||||
if let Ok(mut cache) = DNS_CACHE.lock() {
|
||||
cache.insert(domain.to_string(), DnsCacheEntry::new(ips.clone()));
|
||||
// Limit cache size to prevent memory bloat
|
||||
if cache.len() > 1000 {
|
||||
cache.retain(|_, v| !v.is_expired(DNS_CACHE_TTL));
|
||||
}
|
||||
}
|
||||
Ok(ips)
|
||||
}
|
||||
Err(err) => Err(std::io::Error::other(err)),
|
||||
}
|
||||
}
|
||||
Host::Ipv4(ip) => {
|
||||
let mut set = HashSet::with_capacity(1);
|
||||
set.insert(IpAddr::V4(ip));
|
||||
|
||||
@@ -41,7 +41,7 @@ services:
|
||||
- rustfs_data_1:/data/rustfs1
|
||||
- rustfs_data_2:/data/rustfs2
|
||||
- rustfs_data_3:/data/rustfs3
|
||||
- ./logs:/app/logs
|
||||
- logs_data:/app/logs
|
||||
networks:
|
||||
- rustfs-network
|
||||
restart: unless-stopped
|
||||
@@ -95,7 +95,7 @@ services:
|
||||
command:
|
||||
- --config=/etc/otelcol-contrib/otel-collector.yml
|
||||
volumes:
|
||||
- ./.docker/observability/otel-collector.yml:/etc/otelcol-contrib/otel-collector.yml:ro
|
||||
- ./.docker/observability/otel-collector-config.yaml:/etc/otelcol-contrib/otel-collector.yml:ro
|
||||
ports:
|
||||
- "4317:4317" # OTLP gRPC receiver
|
||||
- "4318:4318" # OTLP HTTP receiver
|
||||
@@ -219,3 +219,5 @@ volumes:
|
||||
driver: local
|
||||
redis_data:
|
||||
driver: local
|
||||
logs_data:
|
||||
driver: local
|
||||
|
||||
326
docs/PERFORMANCE_TESTING.md
Normal file
326
docs/PERFORMANCE_TESTING.md
Normal file
@@ -0,0 +1,326 @@
|
||||
# RustFS 性能测试指南
|
||||
|
||||
本文档提供了对 RustFS 进行性能测试和性能分析的完整方法和工具。
|
||||
|
||||
## 概览
|
||||
|
||||
RustFS 提供了多种性能测试和分析工具:
|
||||
|
||||
1. **性能分析(Profiling)** - 使用内置的 pprof 接口收集 CPU 性能数据
|
||||
2. **负载测试(Load Testing)** - 使用多种客户端工具模拟高并发请求
|
||||
3. **监控和分析** - 查看性能指标和识别性能瓶颈
|
||||
|
||||
## 前置条件
|
||||
|
||||
### 1. 启用性能分析
|
||||
|
||||
在启动 RustFS 时,需要设置环境变量启用性能分析功能:
|
||||
|
||||
```bash
|
||||
export RUSTFS_ENABLE_PROFILING=true
|
||||
./rustfs
|
||||
```
|
||||
|
||||
### 2. 安装依赖工具
|
||||
|
||||
确保系统中安装了以下工具:
|
||||
|
||||
```bash
|
||||
# 基础工具
|
||||
curl # HTTP 请求
|
||||
jq # JSON 处理 (可选)
|
||||
|
||||
# 分析工具
|
||||
go # Go pprof 工具 (可选,用于 protobuf 格式)
|
||||
python3 # Python 负载测试脚本
|
||||
|
||||
# macOS 用户
|
||||
brew install curl jq go python3
|
||||
|
||||
# Ubuntu/Debian 用户
|
||||
sudo apt-get install curl jq golang-go python3
|
||||
```
|
||||
|
||||
## 性能测试方法
|
||||
|
||||
### 方法 1:使用专业脚本(推荐)
|
||||
|
||||
项目提供了完整的性能分析脚本:
|
||||
|
||||
```bash
|
||||
# 查看脚本帮助
|
||||
./scripts/profile_rustfs.sh help
|
||||
|
||||
# 检查性能分析状态
|
||||
./scripts/profile_rustfs.sh status
|
||||
|
||||
# 收集火焰图(30秒)
|
||||
./scripts/profile_rustfs.sh flamegraph
|
||||
|
||||
# 收集 protobuf 格式性能数据
|
||||
./scripts/profile_rustfs.sh protobuf
|
||||
|
||||
# 收集两种格式的性能数据
|
||||
./scripts/profile_rustfs.sh both
|
||||
|
||||
# 自定义参数
|
||||
./scripts/profile_rustfs.sh -d 60 -u http://192.168.1.100:9000 both
|
||||
```
|
||||
|
||||
### 方法 2:使用 Python 综合测试
|
||||
|
||||
Python 脚本提供了负载测试和性能分析的一体化解决方案:
|
||||
|
||||
```bash
|
||||
# 运行综合性能分析
|
||||
python3 test_load.py
|
||||
```
|
||||
|
||||
此脚本会:
|
||||
1. 启动后台负载测试(多线程 S3 操作)
|
||||
2. 并行收集性能分析数据
|
||||
3. 生成火焰图用于分析
|
||||
|
||||
### 方法 3:使用简单负载测试
|
||||
|
||||
对于快速测试,可以使用 bash 脚本:
|
||||
|
||||
```bash
|
||||
# 运行简单负载测试
|
||||
./simple_load_test.sh
|
||||
```
|
||||
|
||||
## 性能分析输出格式
|
||||
|
||||
### 1. 火焰图(SVG 格式)
|
||||
|
||||
- **用途**: 可视化 CPU 使用情况
|
||||
- **文件**: `rustfs_profile_TIMESTAMP.svg`
|
||||
- **查看方式**: 使用浏览器打开 SVG 文件
|
||||
- **分析要点**:
|
||||
- 宽度表示 CPU 使用时间
|
||||
- 高度表示调用栈深度
|
||||
- 点击可以放大特定函数
|
||||
|
||||
```bash
|
||||
# 在浏览器中打开
|
||||
open profiles/rustfs_profile_20240911_143000.svg
|
||||
```
|
||||
|
||||
### 2. Protobuf 格式
|
||||
|
||||
- **用途**: 使用 Go pprof 工具进行详细分析
|
||||
- **文件**: `rustfs_profile_TIMESTAMP.pb`
|
||||
- **分析工具**: `go tool pprof`
|
||||
|
||||
```bash
|
||||
# 使用 Go pprof 分析
|
||||
go tool pprof profiles/rustfs_profile_20240911_143000.pb
|
||||
|
||||
# pprof 常用命令
|
||||
(pprof) top # 显示 CPU 使用率最高的函数
|
||||
(pprof) list func # 显示指定函数的源代码
|
||||
(pprof) web # 生成 web 界面(需要 graphviz)
|
||||
(pprof) png # 生成 PNG 图片
|
||||
(pprof) help # 查看所有命令
|
||||
```
|
||||
|
||||
## API 接口使用
|
||||
|
||||
### 检查性能分析状态
|
||||
|
||||
```bash
|
||||
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/status"
|
||||
```
|
||||
|
||||
返回示例:
|
||||
```json
|
||||
{
|
||||
"enabled": "true",
|
||||
"sampling_rate": "100"
|
||||
}
|
||||
```
|
||||
|
||||
### 收集性能数据
|
||||
|
||||
```bash
|
||||
# 收集 30 秒的火焰图
|
||||
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=flamegraph" \
|
||||
-o profile.svg
|
||||
|
||||
# 收集 protobuf 格式数据
|
||||
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=protobuf" \
|
||||
-o profile.pb
|
||||
```
|
||||
|
||||
**参数说明**:
|
||||
- `seconds`: 收集时长(1-300 秒)
|
||||
- `format`: 输出格式(`flamegraph`/`svg` 或 `protobuf`/`pb`)
|
||||
|
||||
## 负载测试场景
|
||||
|
||||
### 1. S3 API 负载测试
|
||||
|
||||
使用 Python 脚本进行完整的 S3 操作负载测试:
|
||||
|
||||
```python
|
||||
# 基本配置
|
||||
tester = S3LoadTester(
|
||||
endpoint="http://127.0.0.1:9000",
|
||||
access_key="rustfsadmin",
|
||||
secret_key="rustfsadmin"
|
||||
)
|
||||
|
||||
# 运行负载测试
|
||||
# 4 个线程,每个线程执行 10 次操作
|
||||
tester.run_load_test(num_threads=4, operations_per_thread=10)
|
||||
```
|
||||
|
||||
每次操作包括:
|
||||
1. 上传 1MB 对象
|
||||
2. 下载对象
|
||||
3. 删除对象
|
||||
|
||||
### 2. 自定义负载测试
|
||||
|
||||
```bash
|
||||
# 创建测试桶
|
||||
curl -X PUT "http://127.0.0.1:9000/test-bucket"
|
||||
|
||||
# 并发上传测试
|
||||
for i in {1..10}; do
|
||||
echo "test data $i" | curl -X PUT "http://127.0.0.1:9000/test-bucket/object-$i" -d @- &
|
||||
done
|
||||
wait
|
||||
|
||||
# 并发下载测试
|
||||
for i in {1..10}; do
|
||||
curl "http://127.0.0.1:9000/test-bucket/object-$i" > /dev/null &
|
||||
done
|
||||
wait
|
||||
```
|
||||
|
||||
## 性能分析最佳实践
|
||||
|
||||
### 1. 测试环境准备
|
||||
|
||||
- 确保 RustFS 已启用性能分析: `RUSTFS_ENABLE_PROFILING=true`
|
||||
- 使用独立的测试环境,避免其他程序干扰
|
||||
- 确保有足够的磁盘空间存储分析文件
|
||||
|
||||
### 2. 数据收集建议
|
||||
|
||||
- **预热阶段**: 先运行 5-10 分钟的轻量负载
|
||||
- **数据收集**: 在稳定负载下收集 30-60 秒的性能数据
|
||||
- **多次采样**: 收集多个样本进行对比分析
|
||||
|
||||
### 3. 分析重点
|
||||
|
||||
在火焰图中重点关注:
|
||||
|
||||
1. **宽度最大的函数** - CPU 使用时间最长
|
||||
2. **平顶函数** - 可能的性能瓶颈
|
||||
3. **深度调用栈** - 可能的递归或复杂逻辑
|
||||
4. **意外的系统调用** - I/O 或内存分配问题
|
||||
|
||||
### 4. 常见性能问题
|
||||
|
||||
- **锁竞争**: 查找 `std::sync` 相关函数
|
||||
- **内存分配**: 查找 `alloc` 相关函数
|
||||
- **I/O 等待**: 查找文件系统或网络 I/O 函数
|
||||
- **序列化开销**: 查找 JSON/XML 解析函数
|
||||
|
||||
## 故障排除
|
||||
|
||||
### 1. 性能分析未启用
|
||||
|
||||
错误信息:`{"enabled":"false"}`
|
||||
|
||||
解决方案:
|
||||
```bash
|
||||
export RUSTFS_ENABLE_PROFILING=true
|
||||
# 重启 RustFS
|
||||
```
|
||||
|
||||
### 2. 连接被拒绝
|
||||
|
||||
错误信息:`Connection refused`
|
||||
|
||||
检查项:
|
||||
- RustFS 是否正在运行
|
||||
- 端口是否正确(默认 9000)
|
||||
- 防火墙设置
|
||||
|
||||
### 3. 分析文件过大
|
||||
|
||||
如果生成的分析文件过大:
|
||||
- 减少收集时间(如 15-30 秒)
|
||||
- 降低负载测试的并发度
|
||||
- 使用 protobuf 格式而非 SVG
|
||||
|
||||
## 配置参数
|
||||
|
||||
### 环境变量
|
||||
|
||||
| 变量 | 默认值 | 描述 |
|
||||
|------|--------|------|
|
||||
| `RUSTFS_ENABLE_PROFILING` | `false` | 启用性能分析 |
|
||||
| `RUSTFS_URL` | `http://127.0.0.1:9000` | RustFS 服务器地址 |
|
||||
| `PROFILE_DURATION` | `30` | 性能数据收集时长(秒) |
|
||||
| `OUTPUT_DIR` | `./profiles` | 输出文件目录 |
|
||||
|
||||
### 脚本参数
|
||||
|
||||
```bash
|
||||
./scripts/profile_rustfs.sh [OPTIONS] [COMMAND]
|
||||
|
||||
OPTIONS:
|
||||
-u, --url URL RustFS URL
|
||||
-d, --duration SECONDS Profile duration
|
||||
-o, --output DIR Output directory
|
||||
|
||||
COMMANDS:
|
||||
status 检查状态
|
||||
flamegraph 收集火焰图
|
||||
protobuf 收集 protobuf 数据
|
||||
both 收集两种格式(默认)
|
||||
```
|
||||
|
||||
## 输出文件位置
|
||||
|
||||
- **脚本输出**: `./profiles/` 目录
|
||||
- **Python 脚本**: `/tmp/rustfs_profiles/` 目录
|
||||
- **文件命名**: `rustfs_profile_TIMESTAMP.{svg|pb}`
|
||||
|
||||
## 示例工作流程
|
||||
|
||||
1. **启动 RustFS**:
|
||||
```bash
|
||||
RUSTFS_ENABLE_PROFILING=true ./rustfs
|
||||
```
|
||||
|
||||
2. **验证性能分析可用**:
|
||||
```bash
|
||||
./scripts/profile_rustfs.sh status
|
||||
```
|
||||
|
||||
3. **开始负载测试**:
|
||||
```bash
|
||||
python3 test_load.py &
|
||||
```
|
||||
|
||||
4. **收集性能数据**:
|
||||
```bash
|
||||
./scripts/profile_rustfs.sh -d 60 both
|
||||
```
|
||||
|
||||
5. **分析结果**:
|
||||
```bash
|
||||
# 查看火焰图
|
||||
open profiles/rustfs_profile_*.svg
|
||||
|
||||
# 或使用 pprof 分析
|
||||
go tool pprof profiles/rustfs_profile_*.pb
|
||||
```
|
||||
|
||||
通过这个完整的性能测试流程,你可以系统地分析 RustFS 的性能特征,识别瓶颈,并进行有针对性的优化。
|
||||
@@ -56,4 +56,5 @@ if [ "${RUSTFS_ACCESS_KEY}" = "rustfsadmin" ] || [ "${RUSTFS_SECRET_KEY}" = "rus
|
||||
fi
|
||||
|
||||
echo "Starting: $*"
|
||||
set -- "$@" $LOCAL_VOLUMES
|
||||
exec "$@"
|
||||
|
||||
@@ -118,6 +118,9 @@ libsystemd.workspace = true
|
||||
[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies]
|
||||
tikv-jemallocator = "0.6"
|
||||
|
||||
[target.'cfg(not(target_os = "windows"))'.dependencies]
|
||||
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }
|
||||
|
||||
[build-dependencies]
|
||||
http.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -81,6 +81,8 @@ pub mod sts;
|
||||
pub mod tier;
|
||||
pub mod trace;
|
||||
pub mod user;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use pprof::protos::Message;
|
||||
use urlencoding::decode;
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -1233,6 +1235,172 @@ async fn count_bucket_objects(
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProfileHandler {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ProfileHandler {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
#[cfg(target_os = "windows")]
|
||||
{
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::NOT_IMPLEMENTED,
|
||||
Body::from("CPU profiling is not supported on Windows platform".to_string()),
|
||||
)));
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
{
|
||||
use crate::profiling;
|
||||
|
||||
if !profiling::is_profiler_enabled() {
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
Body::from("Profiler not enabled. Set RUSTFS_ENABLE_PROFILING=true to enable profiling".to_string()),
|
||||
)));
|
||||
}
|
||||
|
||||
let queries = extract_query_params(&req.uri);
|
||||
let seconds = queries.get("seconds").and_then(|s| s.parse::<u64>().ok()).unwrap_or(30);
|
||||
let format = queries.get("format").cloned().unwrap_or_else(|| "protobuf".to_string());
|
||||
|
||||
if seconds > 300 {
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Body::from("Profile duration cannot exceed 300 seconds".to_string()),
|
||||
)));
|
||||
}
|
||||
|
||||
let guard = match profiling::get_profiler_guard() {
|
||||
Some(guard) => guard,
|
||||
None => {
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
Body::from("Profiler not initialized".to_string()),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Starting CPU profile collection for {} seconds", seconds);
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
|
||||
|
||||
let guard_lock = match guard.lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
error!("Failed to acquire profiler guard lock");
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Body::from("Failed to acquire profiler lock".to_string()),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let report = match guard_lock.report().build() {
|
||||
Ok(report) => report,
|
||||
Err(e) => {
|
||||
error!("Failed to build profiler report: {}", e);
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Body::from(format!("Failed to build profile report: {}", e)),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
info!("CPU profile collection completed");
|
||||
|
||||
match format.as_str() {
|
||||
"protobuf" | "pb" => {
|
||||
let profile = report.pprof().unwrap();
|
||||
let mut body = Vec::new();
|
||||
if let Err(e) = profile.write_to_vec(&mut body) {
|
||||
error!("Failed to serialize protobuf profile: {}", e);
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Body::from("Failed to serialize profile".to_string()),
|
||||
)));
|
||||
}
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/octet-stream".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), headers))
|
||||
}
|
||||
"flamegraph" | "svg" => {
|
||||
let mut flamegraph_buf = Vec::new();
|
||||
match report.flamegraph(&mut flamegraph_buf) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Failed to generate flamegraph: {}", e);
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Body::from(format!("Failed to generate flamegraph: {}", e)),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(flamegraph_buf)), headers))
|
||||
}
|
||||
_ => Ok(S3Response::new((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Body::from("Unsupported format. Use 'protobuf' or 'flamegraph'".to_string()),
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProfileStatusHandler {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ProfileStatusHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
let status = HashMap::from([
|
||||
("enabled", "false"),
|
||||
("status", "not_supported"),
|
||||
("platform", "windows"),
|
||||
("message", "CPU profiling is not supported on Windows platform"),
|
||||
]);
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let status = {
|
||||
use crate::profiling;
|
||||
|
||||
if profiling::is_profiler_enabled() {
|
||||
HashMap::from([
|
||||
("enabled", "true"),
|
||||
("status", "running"),
|
||||
("supported_formats", "protobuf, flamegraph"),
|
||||
("max_duration_seconds", "300"),
|
||||
("endpoint", "/rustfs/admin/debug/pprof/profile"),
|
||||
])
|
||||
} else {
|
||||
HashMap::from([
|
||||
("enabled", "false"),
|
||||
("status", "disabled"),
|
||||
("message", "Set RUSTFS_ENABLE_PROFILING=true to enable profiling"),
|
||||
])
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::to_string(&status) {
|
||||
Ok(json) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(json)), headers))
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize status: {}", e);
|
||||
Ok(S3Response::new((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Body::from("Failed to serialize status".to_string()),
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -214,6 +214,21 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route>
|
||||
AdminOperation(&RemoveRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
// Performance profiling endpoints (available on all platforms, with platform-specific responses)
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/debug/pprof/profile").as_str(),
|
||||
AdminOperation(&handlers::ProfileHandler {}),
|
||||
)?;
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/debug/pprof/status").as_str(),
|
||||
AdminOperation(&handlers::ProfileStatusHandler {}),
|
||||
)?;
|
||||
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@ mod config;
|
||||
mod error;
|
||||
// mod grpc;
|
||||
pub mod license;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
mod profiling;
|
||||
mod server;
|
||||
mod storage;
|
||||
mod update;
|
||||
@@ -94,6 +96,10 @@ async fn main() -> Result<()> {
|
||||
// Store in global storage
|
||||
set_global_guard(guard).map_err(Error::other)?;
|
||||
|
||||
// Initialize performance profiling if enabled
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
profiling::start_profiling_if_enabled();
|
||||
|
||||
// Run parameters
|
||||
run(opt).await
|
||||
}
|
||||
@@ -102,10 +108,12 @@ async fn main() -> Result<()> {
|
||||
async fn run(opt: config::Opt) -> Result<()> {
|
||||
debug!("opt: {:?}", &opt);
|
||||
|
||||
// Initialize global DNS resolver early for enhanced DNS resolution
|
||||
if let Err(e) = init_global_dns_resolver().await {
|
||||
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
|
||||
}
|
||||
// Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
|
||||
let dns_init = tokio::spawn(async {
|
||||
if let Err(e) = init_global_dns_resolver().await {
|
||||
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(region) = &opt.region {
|
||||
rustfs_ecstore::global::set_global_region(region.clone());
|
||||
@@ -176,6 +184,9 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
// Initialize event notifier
|
||||
init_event_notifier().await;
|
||||
|
||||
// Wait for DNS initialization to complete before network-heavy operations
|
||||
dns_init.await.map_err(Error::other)?;
|
||||
|
||||
let buckets_list = store
|
||||
.list_bucket(&BucketOptions {
|
||||
no_metadata: true,
|
||||
@@ -187,14 +198,31 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
// Collect bucket names into a vector
|
||||
let buckets: Vec<String> = buckets_list.into_iter().map(|v| v.name).collect();
|
||||
|
||||
// Initialize the bucket metadata system
|
||||
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
|
||||
// Parallelize initialization tasks for better network performance
|
||||
let bucket_metadata_task = tokio::spawn({
|
||||
let store = store.clone();
|
||||
let buckets = buckets.clone();
|
||||
async move {
|
||||
init_bucket_metadata_sys(store, buckets).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Initialize the IAM system
|
||||
init_iam_sys(store.clone()).await?;
|
||||
let iam_init_task = tokio::spawn({
|
||||
let store = store.clone();
|
||||
async move { init_iam_sys(store).await }
|
||||
});
|
||||
|
||||
// add bucket notification configuration
|
||||
add_bucket_notification_configuration(buckets).await;
|
||||
let notification_config_task = tokio::spawn({
|
||||
let buckets = buckets.clone();
|
||||
async move {
|
||||
add_bucket_notification_configuration(buckets).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for all parallel initialization tasks to complete
|
||||
bucket_metadata_task.await.map_err(Error::other)?;
|
||||
iam_init_task.await.map_err(Error::other)??;
|
||||
notification_config_task.await.map_err(Error::other)?;
|
||||
|
||||
// Initialize the global notification system
|
||||
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
|
||||
@@ -239,12 +267,13 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
// initialize bucket replication pool
|
||||
init_bucket_replication_pool().await;
|
||||
|
||||
// Async update check (optional)
|
||||
// Async update check with timeout (optional)
|
||||
tokio::spawn(async {
|
||||
use crate::update::{UpdateCheckError, check_updates};
|
||||
|
||||
match check_updates().await {
|
||||
Ok(result) => {
|
||||
// Add timeout to prevent hanging network calls
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(30), check_updates()).await {
|
||||
Ok(Ok(result)) => {
|
||||
if result.update_available {
|
||||
if let Some(latest) = &result.latest_version {
|
||||
info!(
|
||||
@@ -262,12 +291,15 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
debug!("✅ Version check: Current version is up to date: {}", result.current_version);
|
||||
}
|
||||
}
|
||||
Err(UpdateCheckError::HttpError(e)) => {
|
||||
Ok(Err(UpdateCheckError::HttpError(e))) => {
|
||||
debug!("Version check: network error (this is normal): {}", e);
|
||||
}
|
||||
Err(e) => {
|
||||
Ok(Err(e)) => {
|
||||
debug!("Version check: failed (this is normal): {}", e);
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("Version check: timeout after 30 seconds (this is normal)");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -364,14 +396,12 @@ async fn init_event_notifier() {
|
||||
info!("Event notifier configuration found, proceeding with initialization.");
|
||||
|
||||
// 3. Initialize the notification system asynchronously with a global configuration
|
||||
// Put it into a separate task to avoid blocking the main initialization process
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = rustfs_notify::initialize(server_config).await {
|
||||
error!("Failed to initialize event notifier system: {}", e);
|
||||
} else {
|
||||
info!("Event notifier system initialized successfully.");
|
||||
}
|
||||
});
|
||||
// Use direct await for better error handling and faster initialization
|
||||
if let Err(e) = rustfs_notify::initialize(server_config).await {
|
||||
error!("Failed to initialize event notifier system: {}", e);
|
||||
} else {
|
||||
info!("Event notifier system initialized successfully.");
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
|
||||
63
rustfs/src/profiling.rs
Normal file
63
rustfs/src/profiling.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use pprof::ProfilerGuard;
|
||||
use std::sync::{Arc, Mutex, OnceLock};
|
||||
use tracing::info;
|
||||
|
||||
static PROFILER_GUARD: OnceLock<Arc<Mutex<ProfilerGuard<'static>>>> = OnceLock::new();
|
||||
|
||||
pub fn init_profiler() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let guard = pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(1000)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()
|
||||
.map_err(|e| format!("Failed to build profiler guard: {}", e))?;
|
||||
|
||||
PROFILER_GUARD
|
||||
.set(Arc::new(Mutex::new(guard)))
|
||||
.map_err(|_| "Failed to set profiler guard (already initialized)")?;
|
||||
|
||||
info!("Performance profiler initialized");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_profiler_enabled() -> bool {
|
||||
PROFILER_GUARD.get().is_some()
|
||||
}
|
||||
|
||||
pub fn get_profiler_guard() -> Option<Arc<Mutex<ProfilerGuard<'static>>>> {
|
||||
PROFILER_GUARD.get().cloned()
|
||||
}
|
||||
|
||||
pub fn start_profiling_if_enabled() {
|
||||
let enable_profiling = std::env::var("RUSTFS_ENABLE_PROFILING")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.parse::<bool>()
|
||||
.unwrap_or(false);
|
||||
|
||||
if enable_profiling {
|
||||
match init_profiler() {
|
||||
Ok(()) => {
|
||||
info!("Performance profiling enabled via RUSTFS_ENABLE_PROFILING environment variable");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to initialize profiler: {}", e);
|
||||
info!("Performance profiling disabled due to initialization error");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("Performance profiling disabled. Set RUSTFS_ENABLE_PROFILING=true to enable");
|
||||
}
|
||||
}
|
||||
@@ -157,6 +157,8 @@ pub async fn start_http_server(
|
||||
b.build()
|
||||
};
|
||||
|
||||
// Server will be created per connection - this ensures isolation
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Record the PID-related metrics of the current process
|
||||
let meter = opentelemetry::global::meter("system");
|
||||
|
||||
Reference in New Issue
Block a user