Compare commits

...

10 Commits

Author SHA1 Message Date
houseme
7e1a9e2ede 🔒 Upgrade Cryptography Libraries to Latest RC Versions (#837)
* fix

* chore: upgrade cryptography libraries to RC versions

- Upgrade aes-gcm to 0.11.0-rc.2 with rand_core support
- Upgrade chacha20poly1305 to 0.11.0-rc.2
- Upgrade argon2 to 0.6.0-rc.2 with std features
- Upgrade hmac to 0.13.0-rc.3
- Upgrade pbkdf2 to 0.13.0-rc.2
- Upgrade rsa to 0.10.0-rc.10
- Upgrade sha1 and sha2 to 0.11.0-rc.3
- Upgrade md-5 to 0.11.0-rc.3

These upgrades provide enhanced security features and performance
improvements while maintaining backward compatibility with existing
encryption workflows.

* add

* improve code

* fix
2025-11-11 21:10:03 +08:00
安正超
8a020ec4d9 wip (#830) 2025-11-11 09:34:58 +08:00
weisd
77a3489ed2 fix list object err (#831)
fix list object err (#831)

#827
#815
#635
#752
2025-11-10 23:42:15 +08:00
weisd
5941062909 fix (#828) 2025-11-10 19:22:58 +08:00
houseme
98be7df0f5 feat(storage): refactor audit and notification with OperationHelper (#825)
* improve code for audit

* improve code ecfs.rs

* improve code

* improve code for ecfs.rs

* feat(storage): refactor audit and notification with OperationHelper

This commit introduces a significant refactoring of the audit logging and event notification mechanisms within `ecfs.rs`.

The core of this change is the new `OperationHelper` struct, which encapsulates and simplifies the logic for both concerns. It replaces the previous `AuditHelper` and manual event dispatching.

Key improvements include:

- **Unified Handling**: `OperationHelper` manages both audit and notification builders, providing a single, consistent entry point for S3 operations.
- **RAII for Automation**: By leveraging the `Drop` trait, the helper automatically dispatches logs and notifications when it goes out of scope. This simplifies S3 method implementations and ensures cleanup even on early returns.
- **Fluent API**: A builder-like pattern with methods such as `.object()`, `.version_id()`, and `.suppress_event()` makes the code more readable and expressive.
- **Context-Aware Logic**: The helper's `.complete()` method intelligently populates log details based on the operation's `S3Result` and only triggers notifications on success.
- **Modular Design**: All helper logic is now isolated in `rustfs/src/storage/helper.rs`, improving separation of concerns and making `ecfs.rs` cleaner.

This refactoring significantly enhances code clarity, reduces boilerplate, and improves the robustness of logging and notification handling across the storage layer.

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* improve code for audit and notify

* fix

* fix

* fix
2025-11-10 17:30:50 +08:00
houseme
b26aad4129 improve code for logger (#822)
* improve code for logger

* fix
2025-11-08 22:36:24 +08:00
Alex Bykov
5989589c3e Update configuration.md (#812)
Escaping Pipe Character in the table "CLI Flags..."

Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-11-08 10:56:14 +08:00
majinghe
4716454faa add non root user support for container deployment (#817) 2025-11-08 10:00:14 +08:00
houseme
29056a767a Refactor Telemetry Initialization and Environment Utilities (#811)
* improve code for metrics

* improve code for metrics

* fix

* fix

* Refactor telemetry initialization and environment functions ordering

- Reorder functions in envs.rs by type size (8-bit to 64-bit, signed before unsigned) and add missing variants like get_env_opt_u16.
- Optimize init_telemetry to support three modes: stdout logging (default error level with span tracing), file rolling logs (size-based with retention), and HTTP-based observability with sub-endpoints (trace, metric, log) falling back to unified endpoint.
- Fix stdout logging issue by retaining WorkerGuard in OtelGuard to prevent premature release of async writer threads.
- Enhance observability mode with HTTP protocol, compression, and proper resource management.
- Update OtelGuard to include tracing_guard for stdout and flexi_logger_handles for file logging.
- Improve error handling and configuration extraction in OtelConfig.

* fix

* up

* fix

* fix

* improve code for obs

* fix

* fix
2025-11-07 20:01:54 +08:00
weisd
e823922654 feat:add api error message (#801)
* feat:add api error message
* fix: check input
* fix: test
2025-11-07 09:53:49 +08:00
99 changed files with 3169 additions and 1966 deletions

View File

@@ -16,7 +16,7 @@ services:
tempo-init:
image: busybox:latest
command: ["sh", "-c", "chown -R 10001:10001 /var/tempo"]
command: [ "sh", "-c", "chown -R 10001:10001 /var/tempo" ]
volumes:
- ./tempo-data:/var/tempo
user: root
@@ -39,7 +39,7 @@ services:
- otel-network
otel-collector:
image: otel/opentelemetry-collector-contrib:0.129.1
image: otel/opentelemetry-collector-contrib:latest
environment:
- TZ=Asia/Shanghai
volumes:
@@ -55,7 +55,7 @@ services:
networks:
- otel-network
jaeger:
image: jaegertracing/jaeger:2.8.0
image: jaegertracing/jaeger:latest
environment:
- TZ=Asia/Shanghai
ports:
@@ -65,17 +65,21 @@ services:
networks:
- otel-network
prometheus:
image: prom/prometheus:v3.4.2
image: prom/prometheus:latest
environment:
- TZ=Asia/Shanghai
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--web.enable-otlp-receiver' # Enable OTLP
- '--enable-feature=promql-experimental-functions' # Enable info()
networks:
- otel-network
loki:
image: grafana/loki:3.5.1
image: grafana/loki:latest
environment:
- TZ=Asia/Shanghai
volumes:
@@ -86,7 +90,7 @@ services:
networks:
- otel-network
grafana:
image: grafana/grafana:12.0.2
image: grafana/grafana:latest
ports:
- "3000:3000" # Web UI
volumes:

View File

@@ -29,4 +29,80 @@ datasources:
serviceMap:
datasourceUid: prometheus
streamingEnabled:
search: true
search: true
tracesToLogsV2:
# Field with an internal link pointing to a logs data source in Grafana.
# datasourceUid value must match the uid value of the logs data source.
datasourceUid: 'loki'
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
tags: [ 'job', 'instance', 'pod', 'namespace' ]
filterByTraceID: false
filterBySpanID: false
customQuery: true
query: 'method="$${__span.tags.method}"'
tracesToMetrics:
datasourceUid: 'prom'
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
tags: [ { key: 'service.name', value: 'service' }, { key: 'job' } ]
queries:
- name: 'Sample query'
query: 'sum(rate(traces_spanmetrics_latency_bucket{$$__tags}[5m]))'
tracesToProfiles:
datasourceUid: 'grafana-pyroscope-datasource'
tags: [ 'job', 'instance', 'pod', 'namespace' ]
profileTypeId: 'process_cpu:cpu:nanoseconds:cpu:nanoseconds'
customQuery: true
query: 'method="$${__span.tags.method}"'
serviceMap:
datasourceUid: 'prometheus'
nodeGraph:
enabled: true
search:
hide: false
traceQuery:
timeShiftEnabled: true
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
spanBar:
type: 'Tag'
tag: 'http.path'
streamingEnabled:
search: true
- name: Jaeger
type: jaeger
uid: Jaeger
url: http://jaeger:16686
basicAuth: false
access: proxy
readOnly: false
isDefault: false
jsonData:
tracesToLogsV2:
# Field with an internal link pointing to a logs data source in Grafana.
# datasourceUid value must match the uid value of the logs data source.
datasourceUid: 'loki'
spanStartTimeShift: '1h'
spanEndTimeShift: '-1h'
tags: [ 'job', 'instance', 'pod', 'namespace' ]
filterByTraceID: false
filterBySpanID: false
customQuery: true
query: 'method="$${__span.tags.method}"'
tracesToMetrics:
datasourceUid: 'prom'
spanStartTimeShift: '1h'
spanEndTimeShift: '-1h'
tags: [ { key: 'service.name', value: 'service' }, { key: 'job' } ]
queries:
- name: 'Sample query'
query: 'sum(rate(traces_spanmetrics_latency_bucket{$$__tags}[5m]))'
nodeGraph:
enabled: true
traceQuery:
timeShiftEnabled: true
spanStartTimeShift: '1h'
spanEndTimeShift: '-1h'
spanBar:
type: 'None'

View File

@@ -63,6 +63,7 @@ ruler:
frontend:
encoding: protobuf
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#

View File

@@ -43,7 +43,6 @@ exporters:
send_timestamps: true # 发送时间戳
# enable_open_metrics: true
otlphttp/loki: # Loki 导出器,用于日志数据
# endpoint: "http://loki:3100/otlp/v1/logs"
endpoint: "http://loki:3100/otlp/v1/logs"
tls:
insecure: true

View File

@@ -13,16 +13,43 @@
# limitations under the License.
global:
scrape_interval: 5s # 刮取间隔
scrape_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: [ 'otel-collector:8888' ] # Collector 刮取指标
- targets: [ 'otel-collector:8888' ] # Scrape metrics from Collector
- job_name: 'otel-metrics'
static_configs:
- targets: [ 'otel-collector:8889' ] # 应用指标
- targets: [ 'otel-collector:8889' ] # Application indicators
- job_name: 'tempo'
static_configs:
- targets: [ 'tempo:3200' ]
- targets: [ 'tempo:3200' ] # Scrape metrics from Tempo
otlp:
# Recommended attributes to be promoted to labels.
promote_resource_attributes:
- service.instance.id
- service.name
- service.namespace
- cloud.availability_zone
- cloud.region
- container.name
- deployment.environment.name
- k8s.cluster.name
- k8s.container.name
- k8s.cronjob.name
- k8s.daemonset.name
- k8s.deployment.name
- k8s.job.name
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.statefulset.name
# Ingest OTLP data keeping all characters in metric/label names.
translation_strategy: NoUTF8EscapingWithSuffixes
storage:
# OTLP is a push-based protocol, Out of order samples is a common scenario.
tsdb:
out_of_order_time_window: 30m

657
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -123,38 +123,38 @@ tower-http = { version = "0.6.6", features = ["cors"] }
# Serialization and Data Formats
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.1.0"
bytesize = "2.2.0"
byteorder = "1.5.0"
flatbuffers = "25.9.23"
form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.3"
rmcp = { version = "0.8.4" }
rmcp = { version = "0.8.5" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.0.5"
schemars = "1.1.0"
# Cryptography and Security
aes-gcm = { version = "0.10.3", features = ["std"] }
argon2 = { version = "0.5.3", features = ["std"] }
aes-gcm = { version = "0.11.0-rc.2", features = ["rand_core"] }
argon2 = { version = "0.6.0-rc.2", features = ["std"] }
blake3 = { version = "1.8.2" }
chacha20poly1305 = { version = "0.10.1" }
chacha20poly1305 = { version = "0.11.0-rc.2" }
crc-fast = "1.3.0"
crc32c = "0.6.8"
crc32fast = "1.5.0"
crc64fast-nvme = "1.2.0"
hmac = "0.12.1"
jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] }
pbkdf2 = "0.12.2"
rsa = { version = "0.9.8" }
hmac = { version = "0.13.0-rc.3" }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
pbkdf2 = "0.13.0-rc.2"
rsa = { version = "0.10.0-rc.10" }
rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false }
rustls-pemfile = "2.2.0"
rustls-pki-types = "1.13.0"
sha1 = "0.10.6"
sha2 = "0.10.9"
sha1 = "0.11.0-rc.3"
sha2 = "0.11.0-rc.3"
zeroize = { version = "1.8.2", features = ["derive"] }
# Time and Date
@@ -169,8 +169,8 @@ astral-tokio-tar = "0.5.6"
atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.10" }
aws-credential-types = { version = "1.2.8" }
aws-sdk-s3 = { version = "1.110.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-credential-types = { version = "1.2.9" }
aws-sdk-s3 = { version = "1.112.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-smithy-types = { version = "1.3.4" }
base64 = "0.22.1"
base64-simd = "0.8.0"
@@ -178,14 +178,15 @@ brotli = "8.0.2"
cfg-if = "1.0.4"
clap = { version = "4.5.51", features = ["derive", "env"] }
const-str = { version = "0.7.0", features = ["std", "proc"] }
convert_case = "0.8.0"
convert_case = "0.9.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "50.3.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.5"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
glob = "0.3.3"
google-cloud-storage = "1.2.0"
google-cloud-auth = "1.1.0"
@@ -196,11 +197,11 @@ highway = { version = "1.3.0" }
ipnetwork = { version = "0.21.1", features = ["serde"] }
lazy_static = "1.5.0"
libc = "0.2.177"
libsystemd = { version = "0.7.2" }
libsystemd = "0.7.2"
local-ip-address = "0.6.5"
lz4 = "1.28.1"
matchit = "0.9.0"
md-5 = "0.10.6"
md-5 = "0.11.0-rc.3"
md5 = "0.8.0"
metrics = "0.24.2"
metrics-exporter-opentelemetry = "0.1.2"
@@ -218,15 +219,14 @@ path-absolutize = "3.1.1"
path-clean = "1.0.1"
pin-project-lite = "0.2.16"
pretty_assertions = "1.4.1"
rand = "0.9.2"
rand = { version = "0.10.0-rc.5", features = ["serde"] }
rayon = "1.11.0"
reed-solomon-simd = { version = "3.1.0" }
regex = { version = "1.12.2" }
rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.9.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.12.0-rc.3", features = ["minio"] }
scopeguard = "1.2.0"
s3s = { git = "https://github.com/s3s-project/s3s.git", rev = "1ab064b", version = "0.12.0-rc.3", features = ["minio"] }
serial_test = "3.2.0"
shadow-rs = { version = "1.4.0", default-features = false }
siphasher = "1.0.1"
@@ -253,7 +253,7 @@ urlencoding = "2.1.3"
uuid = { version = "1.18.1", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.5.0", features = ["serde"] }
wildmatch = { version = "2.6.0", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "6.0.0"
@@ -262,7 +262,7 @@ zstd = "0.13.3"
# Observability and Metrics
opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = { version = "0.31.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes", "spec_unstable_logs_enabled"] }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] }
opentelemetry-otlp = { version = "0.31.0", features = ["http-proto", "zstd-http"] }
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry-stdout = { version = "0.31.0" }
@@ -280,7 +280,7 @@ mimalloc = "0.1"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rustfs-mcp", "tokio-test", "scopeguard"]
ignored = ["rustfs", "rustfs-mcp", "tokio-test"]
[profile.release]
opt-level = 3

View File

@@ -64,8 +64,12 @@ COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=build /build/rustfs /usr/bin/rustfs
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /usr/bin/rustfs /entrypoint.sh && \
RUN chmod +x /usr/bin/rustfs /entrypoint.sh
RUN addgroup -g 1000 -S rustfs && \
adduser -u 1000 -G rustfs -S rustfs -D && \
mkdir -p /data /logs && \
chown -R rustfs:rustfs /data /logs && \
chmod 0750 /data /logs
ENV RUSTFS_ADDRESS=":9000" \
@@ -82,8 +86,11 @@ ENV RUSTFS_ADDRESS=":9000" \
RUSTFS_SINKS_FILE_PATH="/logs"
EXPOSE 9000 9001
VOLUME ["/data", "/logs"]
USER rustfs
ENTRYPOINT ["/entrypoint.sh"]
CMD ["rustfs"]

View File

@@ -139,6 +139,8 @@ observability. If you want to start redis as well as nginx container, you can sp
make help-docker # Show all Docker-related commands
```
> **Heads-up (macOS cross-compilation)**: macOS keeps the default `ulimit -n` at 256, so `cargo zigbuild` or `./build-rustfs.sh --platform ...` may fail with `ProcessFdQuotaExceeded` when targeting Linux. The build script now tries to raise the limit automatically, but if you still see the warning, run `ulimit -n 4096` (or higher) in your shell before building.
4. **Build with helm chart(Option 4) - Cloud Native environment**
Following the instructions on [helm chart README](./helm/README.md) to install RustFS on kubernetes cluster.
@@ -207,4 +209,3 @@ top charts.
[Apache 2.0](https://opensource.org/licenses/Apache-2.0)
**RustFS** is a trademark of RustFS, Inc. All other trademarks are the property of their respective owners.

View File

@@ -113,12 +113,14 @@ RustFS 是一个使用 Rust全球最受欢迎的编程语言之一构建
你也可以使用 Makefile 提供的目标命令以提升便捷性:
```bash
make docker-buildx # 本地构建
make docker-buildx-push # 构建并推送
make docker-buildx-version VERSION=v1.0.0 # 构建指定版本
make help-docker # 显示全部 Docker 相关命令
```
```bash
make docker-buildx # 本地构建
make docker-buildx-push # 构建并推送
make docker-buildx-version VERSION=v1.0.0 # 构建指定版本
make help-docker # 显示全部 Docker 相关命令
```
> **提示macOS 交叉编译)**macOS 默认的 `ulimit -n` 只有 256使用 `cargo zigbuild` 或 `./build-rustfs.sh --platform ...` 编译 Linux 目标时容易触发 `ProcessFdQuotaExceeded` 链接错误。脚本会尝试自动提升该限制,如仍提示失败,请在构建前手动执行 `ulimit -n 4096`(或更大的值)。
4. **使用 Helm Chart 部署(方案四)- 云原生环境**

View File

@@ -163,6 +163,35 @@ print_message() {
echo -e "${color}${message}${NC}"
}
# Prevent zig/ld from hitting macOS file descriptor defaults during linking
ensure_file_descriptor_limit() {
local required_limit=4096
local current_limit
current_limit=$(ulimit -Sn 2>/dev/null || echo "")
if [ -z "$current_limit" ] || [ "$current_limit" = "unlimited" ]; then
return
fi
if (( current_limit >= required_limit )); then
return
fi
local hard_limit target_limit
hard_limit=$(ulimit -Hn 2>/dev/null || echo "")
target_limit=$required_limit
if [ -n "$hard_limit" ] && [ "$hard_limit" != "unlimited" ] && (( hard_limit < required_limit )); then
target_limit=$hard_limit
fi
if ulimit -Sn "$target_limit" 2>/dev/null; then
print_message $YELLOW "🔧 Increased open file limit from $current_limit to $target_limit to avoid ProcessFdQuotaExceeded"
else
print_message $YELLOW "⚠️ Unable to raise ulimit -n automatically (current: $current_limit, needed: $required_limit). Please run 'ulimit -n $required_limit' manually before building."
fi
}
# Get version from git
get_version() {
if git describe --abbrev=0 --tags >/dev/null 2>&1; then
@@ -570,10 +599,11 @@ main() {
fi
fi
ensure_file_descriptor_limit
# Start build process
build_rustfs
}
# Run main function
main

View File

@@ -29,6 +29,7 @@ base64-simd = { workspace = true }
rsa = { workspace = true }
serde.workspace = true
serde_json.workspace = true
rand.workspace = true
[lints]
workspace = true

View File

@@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rsa::Pkcs1v15Encrypt;
use rsa::{
RsaPrivateKey, RsaPublicKey,
Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey,
pkcs8::{DecodePrivateKey, DecodePublicKey},
rand_core::OsRng,
};
use serde::{Deserialize, Serialize};
use std::io::{Error, Result};
@@ -33,8 +31,9 @@ pub struct Token {
/// Returns the encrypted string processed by base64
pub fn gencode(token: &Token, key: &str) -> Result<String> {
let data = serde_json::to_vec(token)?;
let mut rng = rand::rng();
let public_key = RsaPublicKey::from_public_key_pem(key).map_err(Error::other)?;
let encrypted_data = public_key.encrypt(&mut OsRng, Pkcs1v15Encrypt, &data).map_err(Error::other)?;
let encrypted_data = public_key.encrypt(&mut rng, Pkcs1v15Encrypt, &data).map_err(Error::other)?;
Ok(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&encrypted_data))
}
@@ -76,9 +75,10 @@ mod tests {
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
};
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn test_gencode_and_parse() {
let mut rng = OsRng;
let mut rng = rand::rng();
let bits = 2048;
let private_key = RsaPrivateKey::new(&mut rng, bits).expect("Failed to generate private key");
let public_key = RsaPublicKey::from(&private_key);
@@ -101,7 +101,8 @@ mod tests {
#[test]
fn test_parse_invalid_token() {
let private_key_pem = RsaPrivateKey::new(&mut OsRng, 2048)
let mut rng = rand::rng();
let private_key_pem = RsaPrivateKey::new(&mut rng, 2048)
.expect("Failed to generate private key")
.to_pkcs8_pem(LineEnding::LF)
.unwrap();

View File

@@ -30,7 +30,10 @@ rustfs-targets = { workspace = true }
rustfs-config = { workspace = true, features = ["audit", "constants"] }
rustfs-ecstore = { workspace = true }
chrono = { workspace = true }
const-str = { workspace = true }
futures = { workspace = true }
hashbrown = { workspace = true }
metrics = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
@@ -39,5 +42,6 @@ tracing = { workspace = true, features = ["std", "attributes"] }
url = { workspace = true }
rumqttc = { workspace = true }
[lints]
workspace = true

View File

@@ -13,18 +13,10 @@
// limitations under the License.
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Trait for types that can be serialized to JSON and have a timestamp
pub trait LogRecord {
/// Serialize the record to a JSON string
fn to_json(&self) -> String;
/// Get the timestamp of the record
fn get_timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
/// ObjectVersion represents an object version with key and versionId
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
@@ -36,19 +28,12 @@ pub struct ObjectVersion {
}
impl ObjectVersion {
/// Set the object name (chainable)
pub fn set_object_name(&mut self, name: String) -> &mut Self {
self.object_name = name;
self
}
/// Set the version ID (chainable)
pub fn set_version_id(&mut self, version_id: Option<String>) -> &mut Self {
self.version_id = version_id;
self
pub fn new(object_name: String, version_id: Option<String>) -> Self {
Self { object_name, version_id }
}
}
/// ApiDetails contains API information for the audit entry
/// `ApiDetails` contains API information for the audit entry.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ApiDetails {
#[serde(skip_serializing_if = "Option::is_none")]
@@ -79,75 +64,86 @@ pub struct ApiDetails {
pub time_to_response_in_ns: Option<String>,
}
impl ApiDetails {
/// Set API name (chainable)
pub fn set_name(&mut self, name: Option<String>) -> &mut Self {
self.name = name;
/// Builder for `ApiDetails`.
#[derive(Default, Clone)]
pub struct ApiDetailsBuilder(pub ApiDetails);
impl ApiDetailsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn name(mut self, name: impl Into<String>) -> Self {
self.0.name = Some(name.into());
self
}
/// Set bucket name (chainable)
pub fn set_bucket(&mut self, bucket: Option<String>) -> &mut Self {
self.bucket = bucket;
pub fn bucket(mut self, bucket: impl Into<String>) -> Self {
self.0.bucket = Some(bucket.into());
self
}
/// Set object name (chainable)
pub fn set_object(&mut self, object: Option<String>) -> &mut Self {
self.object = object;
pub fn object(mut self, object: impl Into<String>) -> Self {
self.0.object = Some(object.into());
self
}
/// Set objects list (chainable)
pub fn set_objects(&mut self, objects: Option<Vec<ObjectVersion>>) -> &mut Self {
self.objects = objects;
pub fn objects(mut self, objects: Vec<ObjectVersion>) -> Self {
self.0.objects = Some(objects);
self
}
/// Set status (chainable)
pub fn set_status(&mut self, status: Option<String>) -> &mut Self {
self.status = status;
pub fn status(mut self, status: impl Into<String>) -> Self {
self.0.status = Some(status.into());
self
}
/// Set status code (chainable)
pub fn set_status_code(&mut self, code: Option<i32>) -> &mut Self {
self.status_code = code;
pub fn status_code(mut self, code: i32) -> Self {
self.0.status_code = Some(code);
self
}
/// Set input bytes (chainable)
pub fn set_input_bytes(&mut self, bytes: Option<i64>) -> &mut Self {
self.input_bytes = bytes;
pub fn input_bytes(mut self, bytes: i64) -> Self {
self.0.input_bytes = Some(bytes);
self
}
/// Set output bytes (chainable)
pub fn set_output_bytes(&mut self, bytes: Option<i64>) -> &mut Self {
self.output_bytes = bytes;
pub fn output_bytes(mut self, bytes: i64) -> Self {
self.0.output_bytes = Some(bytes);
self
}
/// Set header bytes (chainable)
pub fn set_header_bytes(&mut self, bytes: Option<i64>) -> &mut Self {
self.header_bytes = bytes;
pub fn header_bytes(mut self, bytes: i64) -> Self {
self.0.header_bytes = Some(bytes);
self
}
/// Set time to first byte (chainable)
pub fn set_time_to_first_byte(&mut self, t: Option<String>) -> &mut Self {
self.time_to_first_byte = t;
pub fn time_to_first_byte(mut self, t: impl Into<String>) -> Self {
self.0.time_to_first_byte = Some(t.into());
self
}
/// Set time to first byte in nanoseconds (chainable)
pub fn set_time_to_first_byte_in_ns(&mut self, t: Option<String>) -> &mut Self {
self.time_to_first_byte_in_ns = t;
pub fn time_to_first_byte_in_ns(mut self, t: impl Into<String>) -> Self {
self.0.time_to_first_byte_in_ns = Some(t.into());
self
}
/// Set time to response (chainable)
pub fn set_time_to_response(&mut self, t: Option<String>) -> &mut Self {
self.time_to_response = t;
pub fn time_to_response(mut self, t: impl Into<String>) -> Self {
self.0.time_to_response = Some(t.into());
self
}
/// Set time to response in nanoseconds (chainable)
pub fn set_time_to_response_in_ns(&mut self, t: Option<String>) -> &mut Self {
self.time_to_response_in_ns = t;
pub fn time_to_response_in_ns(mut self, t: impl Into<String>) -> Self {
self.0.time_to_response_in_ns = Some(t.into());
self
}
pub fn build(self) -> ApiDetails {
self.0
}
}
/// AuditEntry represents an audit log entry
/// `AuditEntry` represents an audit log entry.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct AuditEntry {
pub version: String,
@@ -155,6 +151,7 @@ pub struct AuditEntry {
pub deployment_id: Option<String>,
#[serde(rename = "siteName", skip_serializing_if = "Option::is_none")]
pub site_name: Option<String>,
#[serde(with = "chrono::serde::ts_milliseconds")]
pub time: DateTime<Utc>,
pub event: EventName,
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
@@ -191,200 +188,130 @@ pub struct AuditEntry {
pub error: Option<String>,
}
impl AuditEntry {
/// Create a new AuditEntry with required fields
#[allow(clippy::too_many_arguments)]
pub fn new(
version: String,
deployment_id: Option<String>,
site_name: Option<String>,
time: DateTime<Utc>,
event: EventName,
entry_type: Option<String>,
trigger: String,
api: ApiDetails,
) -> Self {
AuditEntry {
version,
deployment_id,
site_name,
time,
/// Constructor for `AuditEntry`.
pub struct AuditEntryBuilder(AuditEntry);
impl AuditEntryBuilder {
/// Create a new builder with all required fields.
pub fn new(version: impl Into<String>, event: EventName, trigger: impl Into<String>, api: ApiDetails) -> Self {
Self(AuditEntry {
version: version.into(),
time: Utc::now(),
event,
entry_type,
trigger,
trigger: trigger.into(),
api,
remote_host: None,
request_id: None,
user_agent: None,
req_path: None,
req_host: None,
req_node: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
tags: None,
access_key: None,
parent_user: None,
error: None,
}
..Default::default()
})
}
/// Set version (chainable)
pub fn set_version(&mut self, version: String) -> &mut Self {
self.version = version;
self
}
/// Set deployment ID (chainable)
pub fn set_deployment_id(&mut self, id: Option<String>) -> &mut Self {
self.deployment_id = id;
self
}
/// Set site name (chainable)
pub fn set_site_name(&mut self, name: Option<String>) -> &mut Self {
self.site_name = name;
self
}
/// Set time (chainable)
pub fn set_time(&mut self, time: DateTime<Utc>) -> &mut Self {
self.time = time;
self
}
/// Set event (chainable)
pub fn set_event(&mut self, event: EventName) -> &mut Self {
self.event = event;
self
}
/// Set entry type (chainable)
pub fn set_entry_type(&mut self, entry_type: Option<String>) -> &mut Self {
self.entry_type = entry_type;
self
}
/// Set trigger (chainable)
pub fn set_trigger(&mut self, trigger: String) -> &mut Self {
self.trigger = trigger;
self
}
/// Set API details (chainable)
pub fn set_api(&mut self, api: ApiDetails) -> &mut Self {
self.api = api;
self
}
/// Set remote host (chainable)
pub fn set_remote_host(&mut self, host: Option<String>) -> &mut Self {
self.remote_host = host;
self
}
/// Set request ID (chainable)
pub fn set_request_id(&mut self, id: Option<String>) -> &mut Self {
self.request_id = id;
self
}
/// Set user agent (chainable)
pub fn set_user_agent(&mut self, agent: Option<String>) -> &mut Self {
self.user_agent = agent;
self
}
/// Set request path (chainable)
pub fn set_req_path(&mut self, path: Option<String>) -> &mut Self {
self.req_path = path;
self
}
/// Set request host (chainable)
pub fn set_req_host(&mut self, host: Option<String>) -> &mut Self {
self.req_host = host;
self
}
/// Set request node (chainable)
pub fn set_req_node(&mut self, node: Option<String>) -> &mut Self {
self.req_node = node;
self
}
/// Set request claims (chainable)
pub fn set_req_claims(&mut self, claims: Option<HashMap<String, Value>>) -> &mut Self {
self.req_claims = claims;
self
}
/// Set request query (chainable)
pub fn set_req_query(&mut self, query: Option<HashMap<String, String>>) -> &mut Self {
self.req_query = query;
self
}
/// Set request header (chainable)
pub fn set_req_header(&mut self, header: Option<HashMap<String, String>>) -> &mut Self {
self.req_header = header;
self
}
/// Set response header (chainable)
pub fn set_resp_header(&mut self, header: Option<HashMap<String, String>>) -> &mut Self {
self.resp_header = header;
self
}
/// Set tags (chainable)
pub fn set_tags(&mut self, tags: Option<HashMap<String, Value>>) -> &mut Self {
self.tags = tags;
self
}
/// Set access key (chainable)
pub fn set_access_key(&mut self, key: Option<String>) -> &mut Self {
self.access_key = key;
self
}
/// Set parent user (chainable)
pub fn set_parent_user(&mut self, user: Option<String>) -> &mut Self {
self.parent_user = user;
self
}
/// Set error message (chainable)
pub fn set_error(&mut self, error: Option<String>) -> &mut Self {
self.error = error;
// event
pub fn version(mut self, version: impl Into<String>) -> Self {
self.0.version = version.into();
self
}
/// Build AuditEntry from context or parameters (example, can be extended)
pub fn from_context(
version: String,
deployment_id: Option<String>,
time: DateTime<Utc>,
event: EventName,
trigger: String,
api: ApiDetails,
tags: Option<HashMap<String, Value>>,
) -> Self {
AuditEntry {
version,
deployment_id,
site_name: None,
time,
event,
entry_type: None,
trigger,
api,
remote_host: None,
request_id: None,
user_agent: None,
req_path: None,
req_host: None,
req_node: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
tags,
access_key: None,
parent_user: None,
error: None,
}
}
}
impl LogRecord for AuditEntry {
/// Serialize AuditEntry to JSON string
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
/// Get the timestamp of the audit entry
fn get_timestamp(&self) -> DateTime<Utc> {
self.time
pub fn event(mut self, event: EventName) -> Self {
self.0.event = event;
self
}
pub fn api(mut self, api_details: ApiDetails) -> Self {
self.0.api = api_details;
self
}
pub fn deployment_id(mut self, id: impl Into<String>) -> Self {
self.0.deployment_id = Some(id.into());
self
}
pub fn site_name(mut self, name: impl Into<String>) -> Self {
self.0.site_name = Some(name.into());
self
}
pub fn time(mut self, time: DateTime<Utc>) -> Self {
self.0.time = time;
self
}
pub fn entry_type(mut self, entry_type: impl Into<String>) -> Self {
self.0.entry_type = Some(entry_type.into());
self
}
pub fn remote_host(mut self, host: impl Into<String>) -> Self {
self.0.remote_host = Some(host.into());
self
}
pub fn request_id(mut self, id: impl Into<String>) -> Self {
self.0.request_id = Some(id.into());
self
}
pub fn user_agent(mut self, agent: impl Into<String>) -> Self {
self.0.user_agent = Some(agent.into());
self
}
pub fn req_path(mut self, path: impl Into<String>) -> Self {
self.0.req_path = Some(path.into());
self
}
pub fn req_host(mut self, host: impl Into<String>) -> Self {
self.0.req_host = Some(host.into());
self
}
pub fn req_node(mut self, node: impl Into<String>) -> Self {
self.0.req_node = Some(node.into());
self
}
pub fn req_claims(mut self, claims: HashMap<String, Value>) -> Self {
self.0.req_claims = Some(claims);
self
}
pub fn req_query(mut self, query: HashMap<String, String>) -> Self {
self.0.req_query = Some(query);
self
}
pub fn req_header(mut self, header: HashMap<String, String>) -> Self {
self.0.req_header = Some(header);
self
}
pub fn resp_header(mut self, header: HashMap<String, String>) -> Self {
self.0.resp_header = Some(header);
self
}
pub fn tags(mut self, tags: HashMap<String, Value>) -> Self {
self.0.tags = Some(tags);
self
}
pub fn access_key(mut self, key: impl Into<String>) -> Self {
self.0.access_key = Some(key.into());
self
}
pub fn parent_user(mut self, user: impl Into<String>) -> Self {
self.0.parent_user = Some(user.into());
self
}
pub fn error(mut self, error: impl Into<String>) -> Self {
self.0.error = Some(error.into());
self
}
/// Construct the final `AuditEntry`.
pub fn build(self) -> AuditEntry {
self.0
}
}

View File

@@ -21,7 +21,7 @@ pub type AuditResult<T> = Result<T, AuditError>;
#[derive(Error, Debug)]
pub enum AuditError {
#[error("Configuration error: {0}")]
Configuration(String),
Configuration(String, #[source] Option<Box<dyn std::error::Error + Send + Sync>>),
#[error("config not loaded")]
ConfigNotLoaded,
@@ -35,11 +35,14 @@ pub enum AuditError {
#[error("System already initialized")]
AlreadyInitialized,
#[error("Storage not available: {0}")]
StorageNotAvailable(String),
#[error("Failed to save configuration: {0}")]
SaveConfig(String),
SaveConfig(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("Failed to load configuration: {0}")]
LoadConfig(String),
LoadConfig(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
@@ -49,7 +52,4 @@ pub enum AuditError {
#[error("Join error: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("Server storage not initialized: {0}")]
ServerNotInitialized(String),
}

View File

@@ -15,7 +15,7 @@
use crate::{AuditEntry, AuditResult, AuditSystem};
use rustfs_ecstore::config::Config;
use std::sync::{Arc, OnceLock};
use tracing::{error, warn};
use tracing::{error, trace, warn};
/// Global audit system instance
static AUDIT_SYSTEM: OnceLock<Arc<AuditSystem>> = OnceLock::new();
@@ -30,6 +30,19 @@ pub fn audit_system() -> Option<Arc<AuditSystem>> {
AUDIT_SYSTEM.get().cloned()
}
/// A helper macro for executing closures if the global audit system is initialized.
/// If not initialized, log a warning and return `Ok(())`.
macro_rules! with_audit_system {
($async_closure:expr) => {
if let Some(system) = audit_system() {
(async move { $async_closure(system).await }).await
} else {
warn!("Audit system not initialized, operation skipped.");
Ok(())
}
};
}
/// Start the global audit system with configuration
pub async fn start_audit_system(config: Config) -> AuditResult<()> {
let system = init_audit_system();
@@ -38,32 +51,17 @@ pub async fn start_audit_system(config: Config) -> AuditResult<()> {
/// Stop the global audit system
pub async fn stop_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
system.close().await
} else {
warn!("Audit system not initialized, cannot stop");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.close().await })
}
/// Pause the global audit system
pub async fn pause_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
system.pause().await
} else {
warn!("Audit system not initialized, cannot pause");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.pause().await })
}
/// Resume the global audit system
pub async fn resume_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
system.resume().await
} else {
warn!("Audit system not initialized, cannot resume");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.resume().await })
}
/// Dispatch an audit log entry to all targets
@@ -72,23 +70,23 @@ pub async fn dispatch_audit_log(entry: Arc<AuditEntry>) -> AuditResult<()> {
if system.is_running().await {
system.dispatch(entry).await
} else {
// System not running, just drop the log entry without error
// The system is initialized but not running (for example, it is suspended). Silently discard log entries based on original logic.
// For debugging purposes, it can be useful to add a trace log here.
trace!("Audit system is not running, dropping audit entry.");
Ok(())
}
} else {
// System not initialized, just drop the log entry without error
// The system is not initialized at all. This is a more important state.
// It might be better to return an error or log a warning.
warn!("Audit system not initialized, dropping audit entry.");
// If this should be a hard failure, you can return Err(AuditError::NotInitialized("..."))
Ok(())
}
}
/// Reload the global audit system configuration
pub async fn reload_audit_config(config: Config) -> AuditResult<()> {
if let Some(system) = audit_system() {
system.reload_config(config).await
} else {
warn!("Audit system not initialized, cannot reload config");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.reload_config(config).await })
}
/// Check if the global audit system is running

View File

@@ -25,7 +25,7 @@ pub mod observability;
pub mod registry;
pub mod system;
pub use entity::{ApiDetails, AuditEntry, LogRecord, ObjectVersion};
pub use entity::{ApiDetails, AuditEntry, ObjectVersion};
pub use error::{AuditError, AuditResult};
pub use global::*;
pub use observability::{AuditMetrics, AuditMetricsReport, PerformanceValidation};

View File

@@ -21,12 +21,47 @@
//! - Error rate monitoring
//! - Queue depth monitoring
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::info;
const RUSTFS_AUDIT_METRICS_NAMESPACE: &str = "rustfs.audit.";
const M_AUDIT_EVENTS_TOTAL: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "events.total");
const M_AUDIT_EVENTS_FAILED: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "events.failed");
const M_AUDIT_DISPATCH_NS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "dispatch.ns");
const M_AUDIT_EPS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "eps");
const M_AUDIT_TARGET_OPS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "target.ops");
const M_AUDIT_CONFIG_RELOADS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "config.reloads");
const M_AUDIT_SYSTEM_STARTS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "system.starts");
const L_RESULT: &str = "result";
const L_STATUS: &str = "status";
const V_SUCCESS: &str = "success";
const V_FAILURE: &str = "failure";
/// One-time registration of indicator meta information
/// This function ensures that metric descriptors are registered only once.
pub fn init_observability_metrics() {
static METRICS_DESC_INIT: OnceLock<()> = OnceLock::new();
METRICS_DESC_INIT.get_or_init(|| {
// Event/Time-consuming
describe_counter!(M_AUDIT_EVENTS_TOTAL, "Total audit events (labeled by result).");
describe_counter!(M_AUDIT_EVENTS_FAILED, "Total failed audit events.");
describe_histogram!(M_AUDIT_DISPATCH_NS, "Dispatch time per event (ns).");
describe_gauge!(M_AUDIT_EPS, "Events per second since last reset.");
// Target operation/system event
describe_counter!(M_AUDIT_TARGET_OPS, "Total target operations (labeled by status).");
describe_counter!(M_AUDIT_CONFIG_RELOADS, "Total configuration reloads.");
describe_counter!(M_AUDIT_SYSTEM_STARTS, "Total system starts.");
});
}
/// Metrics collector for audit system observability
#[derive(Debug)]
pub struct AuditMetrics {
@@ -56,6 +91,7 @@ impl Default for AuditMetrics {
impl AuditMetrics {
/// Creates a new metrics collector
pub fn new() -> Self {
init_observability_metrics();
Self {
total_events_processed: AtomicU64::new(0),
total_events_failed: AtomicU64::new(0),
@@ -68,11 +104,28 @@ impl AuditMetrics {
}
}
// Suggestion: Call this auxiliary function in the existing "Successful Event Recording" method body to complete the instrumentation
#[inline]
fn emit_event_success_metrics(&self, dispatch_time: Duration) {
// count + histogram
counter!(M_AUDIT_EVENTS_TOTAL, L_RESULT => V_SUCCESS).increment(1);
histogram!(M_AUDIT_DISPATCH_NS).record(dispatch_time.as_nanos() as f64);
}
// Suggestion: Call this auxiliary function in the existing "Failure Event Recording" method body to complete the instrumentation
#[inline]
fn emit_event_failure_metrics(&self, dispatch_time: Duration) {
counter!(M_AUDIT_EVENTS_TOTAL, L_RESULT => V_FAILURE).increment(1);
counter!(M_AUDIT_EVENTS_FAILED).increment(1);
histogram!(M_AUDIT_DISPATCH_NS).record(dispatch_time.as_nanos() as f64);
}
/// Records a successful event dispatch
pub fn record_event_success(&self, dispatch_time: Duration) {
self.total_events_processed.fetch_add(1, Ordering::Relaxed);
self.total_dispatch_time_ns
.fetch_add(dispatch_time.as_nanos() as u64, Ordering::Relaxed);
self.emit_event_success_metrics(dispatch_time);
}
/// Records a failed event dispatch
@@ -80,27 +133,32 @@ impl AuditMetrics {
self.total_events_failed.fetch_add(1, Ordering::Relaxed);
self.total_dispatch_time_ns
.fetch_add(dispatch_time.as_nanos() as u64, Ordering::Relaxed);
self.emit_event_failure_metrics(dispatch_time);
}
/// Records a successful target operation
pub fn record_target_success(&self) {
self.target_success_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_TARGET_OPS, L_STATUS => V_SUCCESS).increment(1);
}
/// Records a failed target operation
pub fn record_target_failure(&self) {
self.target_failure_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_TARGET_OPS, L_STATUS => V_FAILURE).increment(1);
}
/// Records a configuration reload
pub fn record_config_reload(&self) {
self.config_reload_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_CONFIG_RELOADS).increment(1);
info!("Audit configuration reloaded");
}
/// Records a system start
pub fn record_system_start(&self) {
self.system_start_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_SYSTEM_STARTS).increment(1);
info!("Audit system started");
}
@@ -110,11 +168,14 @@ impl AuditMetrics {
let elapsed = reset_time.elapsed();
let total_events = self.total_events_processed.load(Ordering::Relaxed) + self.total_events_failed.load(Ordering::Relaxed);
if elapsed.as_secs_f64() > 0.0 {
let eps = if elapsed.as_secs_f64() > 0.0 {
total_events as f64 / elapsed.as_secs_f64()
} else {
0.0
}
};
// EPS is reported in gauge
gauge!(M_AUDIT_EPS).set(eps);
eps
}
/// Gets the average dispatch latency in milliseconds
@@ -166,6 +227,8 @@ impl AuditMetrics {
let mut reset_time = self.last_reset_time.write().await;
*reset_time = Instant::now();
// Reset EPS to zero after reset
gauge!(M_AUDIT_EPS).set(0.0);
info!("Audit metrics reset");
}

View File

@@ -14,6 +14,7 @@
use crate::{AuditEntry, AuditError, AuditResult};
use futures::{StreamExt, stream::FuturesUnordered};
use hashbrown::{HashMap, HashSet};
use rustfs_config::{
DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR,
MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE,
@@ -25,7 +26,6 @@ use rustfs_targets::{
Target, TargetError,
target::{ChannelTargetType, TargetType, mqtt::MQTTArgs, webhook::WebhookArgs},
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, warn};
@@ -251,7 +251,7 @@ impl AuditRegistry {
sections.extend(successes_by_section.keys().cloned());
for section_name in sections {
let mut section_map: HashMap<String, KVS> = HashMap::new();
let mut section_map: std::collections::HashMap<String, KVS> = std::collections::HashMap::new();
// The default entry (if present) is written back to `_`
if let Some(default_cfg) = section_defaults.get(&section_name) {
@@ -277,7 +277,7 @@ impl AuditRegistry {
// 7. Save the new configuration to the system
let Some(store) = rustfs_ecstore::new_object_layer_fn() else {
return Err(AuditError::ServerNotInitialized(
return Err(AuditError::StorageNotAvailable(
"Failed to save target configuration: server storage not initialized".to_string(),
));
};
@@ -286,7 +286,7 @@ impl AuditRegistry {
Ok(_) => info!("New audit configuration saved to system successfully"),
Err(e) => {
error!(error = %e, "Failed to save new audit configuration");
return Err(AuditError::SaveConfig(e.to_string()));
return Err(AuditError::SaveConfig(Box::new(e)));
}
}
}

View File

@@ -146,7 +146,7 @@ impl AuditSystem {
warn!("Audit system is already paused");
Ok(())
}
_ => Err(AuditError::Configuration("Cannot pause audit system in current state".to_string())),
_ => Err(AuditError::Configuration("Cannot pause audit system in current state".to_string(), None)),
}
}
@@ -164,7 +164,7 @@ impl AuditSystem {
warn!("Audit system is already running");
Ok(())
}
_ => Err(AuditError::Configuration("Cannot resume audit system in current state".to_string())),
_ => Err(AuditError::Configuration("Cannot resume audit system in current state".to_string(), None)),
}
}
@@ -460,7 +460,7 @@ impl AuditSystem {
info!(target_id = %target_id, "Target enabled");
Ok(())
} else {
Err(AuditError::Configuration(format!("Target not found: {target_id}")))
Err(AuditError::Configuration(format!("Target not found: {target_id}"), None))
}
}
@@ -473,7 +473,7 @@ impl AuditSystem {
info!(target_id = %target_id, "Target disabled");
Ok(())
} else {
Err(AuditError::Configuration(format!("Target not found: {target_id}")))
Err(AuditError::Configuration(format!("Target not found: {target_id}"), None))
}
}
@@ -487,7 +487,7 @@ impl AuditSystem {
info!(target_id = %target_id, "Target removed");
Ok(())
} else {
Err(AuditError::Configuration(format!("Target not found: {target_id}")))
Err(AuditError::Configuration(format!("Target not found: {target_id}"), None))
}
}

View File

@@ -52,7 +52,7 @@ async fn test_config_parsing_webhook() {
// We expect this to fail due to server storage not being initialized
// but the parsing should work correctly
match result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// This is expected in test environment
}
Err(e) => {

View File

@@ -73,7 +73,7 @@ async fn test_concurrent_target_creation() {
// Verify it fails with expected error (server not initialized)
match result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// Expected in test environment
}
Err(e) => {
@@ -103,17 +103,17 @@ async fn test_audit_log_dispatch_performance() {
use std::collections::HashMap;
let id = 1;
let mut req_header = HashMap::new();
let mut req_header = hashbrown::HashMap::new();
req_header.insert("authorization".to_string(), format!("Bearer test-token-{id}"));
req_header.insert("content-type".to_string(), "application/octet-stream".to_string());
let mut resp_header = HashMap::new();
let mut resp_header = hashbrown::HashMap::new();
resp_header.insert("x-response".to_string(), "ok".to_string());
let mut tags = HashMap::new();
let mut tags = hashbrown::HashMap::new();
tags.insert(format!("tag-{id}"), json!("sample"));
let mut req_query = HashMap::new();
let mut req_query = hashbrown::HashMap::new();
req_query.insert("id".to_string(), id.to_string());
let api_details = ApiDetails {

View File

@@ -35,7 +35,7 @@ async fn test_complete_audit_system_lifecycle() {
// Should fail in test environment but state handling should work
match start_result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// Expected in test environment
assert_eq!(system.get_state().await, system::AuditSystemState::Stopped);
}
@@ -168,7 +168,7 @@ async fn test_config_parsing_with_multiple_instances() {
// Should fail due to server storage not initialized, but parsing should work
match result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// Expected - parsing worked but save failed
}
Err(e) => {
@@ -182,48 +182,6 @@ async fn test_config_parsing_with_multiple_instances() {
}
}
// #[tokio::test]
// async fn test_environment_variable_precedence() {
// // Test that environment variables override config file settings
// // This test validates the ENV > file instance > file default precedence
// // Set some test environment variables
// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_ENABLE_TEST", "on");
// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_ENDPOINT_TEST", "http://env.example.com/audit");
// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_AUTH_TOKEN_TEST", "env-token");
// let mut registry = AuditRegistry::new();
//
// // Create config that should be overridden by env vars
// let mut config = Config(HashMap::new());
// let mut webhook_section = HashMap::new();
//
// let mut test_kvs = KVS::new();
// test_kvs.insert("enable".to_string(), "off".to_string()); // Should be overridden
// test_kvs.insert("endpoint".to_string(), "http://file.example.com/audit".to_string()); // Should be overridden
// test_kvs.insert("batch_size".to_string(), "10".to_string()); // Should remain from file
// webhook_section.insert("test".to_string(), test_kvs);
//
// config.0.insert("audit_webhook".to_string(), webhook_section);
//
// // Try to create targets - should use env vars for endpoint/enable, file for batch_size
// let result = registry.create_targets_from_config(&config).await;
// // Clean up env vars
// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_ENABLE_TEST");
// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_ENDPOINT_TEST");
// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_AUTH_TOKEN_TEST");
// // Should fail due to server storage, but precedence logic should work
// match result {
// Err(AuditError::ServerNotInitialized(_)) => {
// // Expected - precedence parsing worked but save failed
// }
// Err(e) => {
// println!("Environment precedence test error: {}", e);
// }
// Ok(_) => {
// println!("Unexpected success in environment precedence test");
// }
// }
// }
#[test]
fn test_target_type_validation() {
use rustfs_targets::target::TargetType;
@@ -315,19 +273,18 @@ fn create_sample_audit_entry_with_id(id: u32) -> AuditEntry {
use chrono::Utc;
use rustfs_targets::EventName;
use serde_json::json;
use std::collections::HashMap;
let mut req_header = HashMap::new();
let mut req_header = hashbrown::HashMap::new();
req_header.insert("authorization".to_string(), format!("Bearer test-token-{id}"));
req_header.insert("content-type".to_string(), "application/octet-stream".to_string());
let mut resp_header = HashMap::new();
let mut resp_header = hashbrown::HashMap::new();
resp_header.insert("x-response".to_string(), "ok".to_string());
let mut tags = HashMap::new();
let mut tags = hashbrown::HashMap::new();
tags.insert(format!("tag-{id}"), json!("sample"));
let mut req_query = HashMap::new();
let mut req_query = hashbrown::HashMap::new();
req_query.insert("id".to_string(), id.to_string());
let api_details = ApiDetails {

View File

@@ -145,7 +145,7 @@ pub const DEFAULT_LOG_ROTATION_TIME: &str = "hour";
/// It is used to keep the logs of the application.
/// Default value: 30
/// Environment variable: RUSTFS_OBS_LOG_KEEP_FILES
pub const DEFAULT_LOG_KEEP_FILES: u16 = 30;
pub const DEFAULT_LOG_KEEP_FILES: usize = 30;
/// Default log local logging enabled for rustfs
/// This is the default log local logging enabled for rustfs.

View File

@@ -12,30 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/// Profiler related environment variable names and default values
pub const ENV_ENABLE_PROFILING: &str = "RUSTFS_ENABLE_PROFILING";
// CPU profiling
pub const ENV_CPU_MODE: &str = "RUSTFS_PROF_CPU_MODE"; // off|continuous|periodic
/// Frequency of CPU profiling samples
pub const ENV_CPU_FREQ: &str = "RUSTFS_PROF_CPU_FREQ";
/// Interval between CPU profiling sessions (for periodic mode)
pub const ENV_CPU_INTERVAL_SECS: &str = "RUSTFS_PROF_CPU_INTERVAL_SECS";
/// Duration of each CPU profiling session (for periodic mode)
pub const ENV_CPU_DURATION_SECS: &str = "RUSTFS_PROF_CPU_DURATION_SECS";
// Memory profiling (jemalloc)
/// Memory profiling (jemalloc)
pub const ENV_MEM_PERIODIC: &str = "RUSTFS_PROF_MEM_PERIODIC";
/// Interval between memory profiling snapshots (for periodic mode)
pub const ENV_MEM_INTERVAL_SECS: &str = "RUSTFS_PROF_MEM_INTERVAL_SECS";
// Output directory
/// Output directory
pub const ENV_OUTPUT_DIR: &str = "RUSTFS_PROF_OUTPUT_DIR";
// Defaults
/// Defaults for profiler settings
pub const DEFAULT_ENABLE_PROFILING: bool = false;
/// CPU profiling
pub const DEFAULT_CPU_MODE: &str = "off";
/// Frequency of CPU profiling samples
pub const DEFAULT_CPU_FREQ: usize = 100;
/// Interval between CPU profiling sessions (for periodic mode)
pub const DEFAULT_CPU_INTERVAL_SECS: u64 = 300;
/// Duration of each CPU profiling session (for periodic mode)
pub const DEFAULT_CPU_DURATION_SECS: u64 = 60;
/// Memory profiling (jemalloc)
pub const DEFAULT_MEM_PERIODIC: bool = false;
/// Interval between memory profiling snapshots (for periodic mode)
pub const DEFAULT_MEM_INTERVAL_SECS: u64 = 300;
/// Output directory
pub const DEFAULT_OUTPUT_DIR: &str = ".";

View File

@@ -0,0 +1,19 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Metrics collection interval in milliseconds for system metrics (CPU, memory, disk, network).
pub const DEFAULT_METRICS_SYSTEM_INTERVAL_MS: u64 = 30000;
/// Environment variable for setting the metrics collection interval for system metrics.
pub const ENV_OBS_METRICS_SYSTEM_INTERVAL_MS: &str = "RUSTFS_OBS_METRICS_SYSTEM_INTERVAL_MS";

View File

@@ -14,7 +14,13 @@
// Observability Keys
mod metrics;
pub use metrics::*;
pub const ENV_OBS_ENDPOINT: &str = "RUSTFS_OBS_ENDPOINT";
pub const ENV_OBS_TRACE_ENDPOINT: &str = "RUSTFS_OBS_TRACE_ENDPOINT";
pub const ENV_OBS_METRIC_ENDPOINT: &str = "RUSTFS_OBS_METRIC_ENDPOINT";
pub const ENV_OBS_LOG_ENDPOINT: &str = "RUSTFS_OBS_LOG_ENDPOINT";
pub const ENV_OBS_USE_STDOUT: &str = "RUSTFS_OBS_USE_STDOUT";
pub const ENV_OBS_SAMPLE_RATIO: &str = "RUSTFS_OBS_SAMPLE_RATIO";
pub const ENV_OBS_METER_INTERVAL: &str = "RUSTFS_OBS_METER_INTERVAL";
@@ -65,6 +71,9 @@ mod tests {
#[test]
fn test_env_keys() {
assert_eq!(ENV_OBS_ENDPOINT, "RUSTFS_OBS_ENDPOINT");
assert_eq!(ENV_OBS_TRACE_ENDPOINT, "RUSTFS_OBS_TRACE_ENDPOINT");
assert_eq!(ENV_OBS_METRIC_ENDPOINT, "RUSTFS_OBS_METRIC_ENDPOINT");
assert_eq!(ENV_OBS_LOG_ENDPOINT, "RUSTFS_OBS_LOG_ENDPOINT");
assert_eq!(ENV_OBS_USE_STDOUT, "RUSTFS_OBS_USE_STDOUT");
assert_eq!(ENV_OBS_SAMPLE_RATIO, "RUSTFS_OBS_SAMPLE_RATIO");
assert_eq!(ENV_OBS_METER_INTERVAL, "RUSTFS_OBS_METER_INTERVAL");

View File

@@ -29,7 +29,7 @@ documentation = "https://docs.rs/rustfs-crypto/latest/rustfs_crypto/"
workspace = true
[dependencies]
aes-gcm = { workspace = true, features = ["std"], optional = true }
aes-gcm = { workspace = true, optional = true }
argon2 = { workspace = true, features = ["std"], optional = true }
cfg-if = { workspace = true }
chacha20poly1305 = { workspace = true, optional = true }

View File

@@ -19,127 +19,37 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
use aes_gcm::{Aes256Gcm, KeyInit as _};
use chacha20poly1305::ChaCha20Poly1305;
// 32: salt
// 1: id
// 12: nonce
const HEADER_LENGTH: usize = 45;
if data.len() < HEADER_LENGTH {
return Err(Error::ErrUnexpectedHeader);
}
let (salt, id, nonce) = (&data[..32], ID::try_from(data[32])?, &data[33..45]);
let data = &data[HEADER_LENGTH..];
let (salt, id, nonce_slice) = (&data[..32], ID::try_from(data[32])?, &data[33..45]);
let body = &data[HEADER_LENGTH..];
match id {
ID::Argon2idChaCHa20Poly1305 => {
let key = id.get_key(password, salt)?;
decrypt(ChaCha20Poly1305::new_from_slice(&key)?, nonce, data)
decrypt(ChaCha20Poly1305::new_from_slice(&key)?, nonce_slice, body)
}
_ => {
let key = id.get_key(password, salt)?;
decrypt(Aes256Gcm::new_from_slice(&key)?, nonce, data)
decrypt(Aes256Gcm::new_from_slice(&key)?, nonce_slice, body)
}
}
}
// use argon2::{Argon2, PasswordHasher};
// use argon2::password_hash::{SaltString};
// use aes_gcm::{Aes256Gcm, Key, Nonce}; // For AES-GCM
// use chacha20poly1305::{ChaCha20Poly1305, Key as ChaChaKey, Nonce as ChaChaNonce}; // For ChaCha20
// use pbkdf2::pbkdf2;
// use sha2::Sha256;
// use std::io::{self, Read};
// use thiserror::Error;
// #[derive(Debug, Error)]
// pub enum DecryptError {
// #[error("unexpected header")]
// UnexpectedHeader,
// #[error("invalid encryption algorithm ID")]
// InvalidAlgorithmId,
// #[error("IO error")]
// Io(#[from] io::Error),
// #[error("decryption error")]
// DecryptionError,
// }
// pub fn decrypt_data2<R: Read>(password: &str, mut data: R) -> Result<Vec<u8>, DecryptError> {
// // Parse the stream header
// let mut hdr = [0u8; 32 + 1 + 8];
// if data.read_exact(&mut hdr).is_err() {
// return Err(DecryptError::UnexpectedHeader);
// }
// let salt = &hdr[0..32];
// let id = hdr[32];
// let nonce = &hdr[33..41];
// let key = match id {
// // Argon2id + AES-GCM
// 0x01 => {
// let salt = SaltString::encode_b64(salt).map_err(|_| DecryptError::DecryptionError)?;
// let argon2 = Argon2::default();
// let hashed_key = argon2.hash_password(password.as_bytes(), &salt)
// .map_err(|_| DecryptError::DecryptionError)?;
// hashed_key.hash.unwrap().as_bytes().to_vec()
// }
// // Argon2id + ChaCha20Poly1305
// 0x02 => {
// let salt = SaltString::encode_b64(salt).map_err(|_| DecryptError::DecryptionError)?;
// let argon2 = Argon2::default();
// let hashed_key = argon2.hash_password(password.as_bytes(), &salt)
// .map_err(|_| DecryptError::DecryptionError)?;
// hashed_key.hash.unwrap().as_bytes().to_vec()
// }
// // PBKDF2 + AES-GCM
// // 0x03 => {
// // let mut key = [0u8; 32];
// // pbkdf2::<Sha256>(password.as_bytes(), salt, 10000, &mut key);
// // key.to_vec()
// // }
// _ => return Err(DecryptError::InvalidAlgorithmId),
// };
// // Decrypt data using the corresponding cipher
// let mut encrypted_data = Vec::new();
// data.read_to_end(&mut encrypted_data)?;
// let plaintext = match id {
// 0x01 => {
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
// let nonce = Nonce::from_slice(nonce);
// cipher
// .decrypt(nonce, encrypted_data.as_ref())
// .map_err(|_| DecryptError::DecryptionError)?
// }
// 0x02 => {
// let cipher = ChaCha20Poly1305::new(ChaChaKey::from_slice(&key));
// let nonce = ChaChaNonce::from_slice(nonce);
// cipher
// .decrypt(nonce, encrypted_data.as_ref())
// .map_err(|_| DecryptError::DecryptionError)?
// }
// 0x03 => {
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
// let nonce = Nonce::from_slice(nonce);
// cipher
// .decrypt(nonce, encrypted_data.as_ref())
// .map_err(|_| DecryptError::DecryptionError)?
// }
// _ => return Err(DecryptError::InvalidAlgorithmId),
// };
// Ok(plaintext)
// }
#[cfg(any(test, feature = "crypto"))]
#[inline]
fn decrypt<T: aes_gcm::aead::Aead>(stream: T, nonce: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Error> {
use crate::error::Error;
stream
.decrypt(aes_gcm::Nonce::from_slice(nonce), data)
.map_err(Error::ErrDecryptFailed)
use aes_gcm::AeadCore;
use aes_gcm::aead::array::Array;
use core::convert::TryFrom;
let nonce_arr: Array<u8, <T as AeadCore>::NonceSize> =
Array::try_from(nonce).map_err(|_| Error::ErrDecryptFailed(aes_gcm::aead::Error))?;
stream.decrypt(&nonce_arr, data).map_err(Error::ErrDecryptFailed)
}
#[cfg(not(any(test, feature = "crypto")))]

View File

@@ -43,7 +43,7 @@ pub fn encrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
if native_aes() {
encrypt(Aes256Gcm::new_from_slice(&key)?, &salt, id, data)
} else {
encrypt(ChaCha20Poly1305::new_from_slice(&key)?, &salt, id, data)
encrypt(chacha20poly1305::ChaCha20Poly1305::new_from_slice(&key)?, &salt, id, data)
}
}
}
@@ -56,16 +56,19 @@ fn encrypt<T: aes_gcm::aead::Aead>(
data: &[u8],
) -> Result<Vec<u8>, crate::Error> {
use crate::error::Error;
use aes_gcm::aead::rand_core::OsRng;
use aes_gcm::AeadCore;
use aes_gcm::aead::array::Array;
use rand::RngCore;
let nonce = T::generate_nonce(&mut OsRng);
let mut nonce: Array<u8, <T as AeadCore>::NonceSize> = Array::default();
rand::rng().fill_bytes(&mut nonce);
let encryptor = stream.encrypt(&nonce, data).map_err(Error::ErrEncryptFailed)?;
let mut ciphertext = Vec::with_capacity(salt.len() + 1 + nonce.len() + encryptor.len());
ciphertext.extend_from_slice(salt);
ciphertext.push(id as u8);
ciphertext.extend_from_slice(nonce.as_slice());
ciphertext.extend_from_slice(&nonce);
ciphertext.extend_from_slice(&encryptor);
Ok(ciphertext)

View File

@@ -106,6 +106,7 @@ serde_urlencoded.workspace = true
google-cloud-storage = { workspace = true }
google-cloud-auth = { workspace = true }
aws-config = { workspace = true }
faster-hex = { workspace = true }
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -34,9 +34,10 @@ use rustfs_protos::{
};
use std::{
collections::{HashMap, HashSet},
time::SystemTime,
time::{Duration, SystemTime},
};
use time::OffsetDateTime;
use tokio::time::timeout;
use tonic::Request;
use tracing::warn;
@@ -44,6 +45,8 @@ use shadow_rs::shadow;
shadow!(build);
const SERVER_PING_TIMEOUT: Duration = Duration::from_secs(1);
// pub const ITEM_OFFLINE: &str = "offline";
// pub const ITEM_INITIALIZING: &str = "initializing";
// pub const ITEM_ONLINE: &str = "online";
@@ -83,42 +86,45 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
endpoint.url.host_str().unwrap(),
endpoint.url.port().unwrap()
);
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let ping_task = async {
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let finished_data = fbb.finished_data();
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let finished_data = fbb.finished_data();
// Create the client
let mut client = node_service_time_out_client(&addr)
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let mut client = node_service_time_out_client(&addr)
.await
.map_err(|err| Error::other(err.to_string()))?;
let request = Request::new(PingRequest {
version: 1,
body: bytes::Bytes::copy_from_slice(finished_data),
});
let response: PingResponse = client.ping(request).await?.into_inner();
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{e}");
} else {
println!("ping_resp:body(flatbuffer): {ping_response_body:?}");
}
Ok(())
};
timeout(SERVER_PING_TIMEOUT, ping_task)
.await
.map_err(|err| Error::other(err.to_string()))?;
// Build the PingRequest
let request = Request::new(PingRequest {
version: 1,
body: bytes::Bytes::copy_from_slice(finished_data),
});
// Send the request and obtain the response
let response: PingResponse = client.ping(request).await?.into_inner();
// Print the response
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{e}");
} else {
println!("ping_resp:body(flatbuffer): {ping_response_body:?}");
}
Ok(())
.map_err(|_| Error::other("server ping timeout"))?
}
pub async fn get_local_server_property() -> ServerProperties {

View File

@@ -115,10 +115,9 @@ struct ExpiryTask {
impl ExpiryOp for ExpiryTask {
fn op_hash(&self) -> u64 {
let mut hasher = Sha256::new();
let _ = hasher.write(format!("{}", self.obj_info.bucket).as_bytes());
let _ = hasher.write(format!("{}", self.obj_info.name).as_bytes());
hasher.flush();
xxh64::xxh64(hasher.clone().finalize().as_slice(), XXHASH_SEED)
hasher.update(format!("{}", self.obj_info.bucket).as_bytes());
hasher.update(format!("{}", self.obj_info.name).as_bytes());
xxh64::xxh64(hasher.finalize().as_slice(), XXHASH_SEED)
}
fn as_any(&self) -> &dyn Any {
@@ -171,10 +170,9 @@ struct FreeVersionTask(ObjectInfo);
impl ExpiryOp for FreeVersionTask {
fn op_hash(&self) -> u64 {
let mut hasher = Sha256::new();
let _ = hasher.write(format!("{}", self.0.transitioned_object.tier).as_bytes());
let _ = hasher.write(format!("{}", self.0.transitioned_object.name).as_bytes());
hasher.flush();
xxh64::xxh64(hasher.clone().finalize().as_slice(), XXHASH_SEED)
hasher.update(format!("{}", self.0.transitioned_object.tier).as_bytes());
hasher.update(format!("{}", self.0.transitioned_object.name).as_bytes());
xxh64::xxh64(hasher.finalize().as_slice(), XXHASH_SEED)
}
fn as_any(&self) -> &dyn Any {
@@ -191,10 +189,9 @@ struct NewerNoncurrentTask {
impl ExpiryOp for NewerNoncurrentTask {
fn op_hash(&self) -> u64 {
let mut hasher = Sha256::new();
let _ = hasher.write(format!("{}", self.bucket).as_bytes());
let _ = hasher.write(format!("{}", self.versions[0].object_name).as_bytes());
hasher.flush();
xxh64::xxh64(hasher.clone().finalize().as_slice(), XXHASH_SEED)
hasher.update(format!("{}", self.bucket).as_bytes());
hasher.update(format!("{}", self.versions[0].object_name).as_bytes());
xxh64::xxh64(hasher.finalize().as_slice(), XXHASH_SEED)
}
fn as_any(&self) -> &dyn Any {
@@ -415,10 +412,9 @@ struct TransitionTask {
impl ExpiryOp for TransitionTask {
fn op_hash(&self) -> u64 {
let mut hasher = Sha256::new();
let _ = hasher.write(format!("{}", self.obj_info.bucket).as_bytes());
//let _ = hasher.write(format!("{}", self.obj_info.versions[0].object_name).as_bytes());
hasher.flush();
xxh64::xxh64(hasher.clone().finalize().as_slice(), XXHASH_SEED)
hasher.update(format!("{}", self.obj_info.bucket).as_bytes());
// hasher.update(format!("{}", self.obj_info.versions[0].object_name).as_bytes());
xxh64::xxh64(hasher.finalize().as_slice(), XXHASH_SEED)
}
fn as_any(&self) -> &dyn Any {
@@ -480,7 +476,7 @@ impl TransitionState {
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
let mut n = max_workers;
let tw = 8; //globalILMConfig.getTransitionWorkers();
let tw = 8; //globalILMConfig.getTransitionWorkers();
if tw > 0 {
n = tw;
}
@@ -760,9 +756,8 @@ pub async fn expire_transitioned_object(
pub fn gen_transition_objname(bucket: &str) -> Result<String, Error> {
let us = Uuid::new_v4().to_string();
let mut hasher = Sha256::new();
let _ = hasher.write(format!("{}/{}", get_global_deployment_id().unwrap_or_default(), bucket).as_bytes());
hasher.flush();
let hash = rustfs_utils::crypto::hex(hasher.clone().finalize().as_slice());
hasher.update(format!("{}/{}", get_global_deployment_id().unwrap_or_default(), bucket).as_bytes());
let hash = rustfs_utils::crypto::hex(hasher.finalize().as_slice());
let obj = format!("{}/{}/{}/{}", &hash[0..16], &us[0..2], &us[2..4], &us);
Ok(obj)
}

View File

@@ -20,7 +20,7 @@
use sha2::{Digest, Sha256};
use std::any::Any;
use std::io::{Cursor, Write};
use std::io::Write;
use xxhash_rust::xxh64;
use super::bucket_lifecycle_ops::{ExpiryOp, GLOBAL_ExpiryState, TransitionedObject};
@@ -128,10 +128,9 @@ pub struct Jentry {
impl ExpiryOp for Jentry {
fn op_hash(&self) -> u64 {
let mut hasher = Sha256::new();
let _ = hasher.write(format!("{}", self.tier_name).as_bytes());
let _ = hasher.write(format!("{}", self.obj_name).as_bytes());
hasher.flush();
xxh64::xxh64(hasher.clone().finalize().as_slice(), XXHASH_SEED)
hasher.update(format!("{}", self.tier_name).as_bytes());
hasher.update(format!("{}", self.obj_name).as_bytes());
xxh64::xxh64(hasher.finalize().as_slice(), XXHASH_SEED)
}
fn as_any(&self) -> &dyn Any {

View File

@@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{error::BucketMetadataError, metadata_sys::get_bucket_metadata_sys};
use crate::error::Result;
use super::metadata_sys::get_bucket_metadata_sys;
use crate::error::{Result, StorageError};
use rustfs_policy::policy::{BucketPolicy, BucketPolicyArgs};
use tracing::warn;
use tracing::info;
pub struct PolicySys {}
@@ -24,9 +24,8 @@ impl PolicySys {
match Self::get(args.bucket).await {
Ok(cfg) => return cfg.is_allowed(args),
Err(err) => {
let berr: BucketMetadataError = err.into();
if berr != BucketMetadataError::BucketPolicyNotFound {
warn!("config get err {:?}", berr);
if err != StorageError::ConfigNotFound {
info!("config get err {:?}", err);
}
}
}

View File

@@ -1136,23 +1136,21 @@ impl LocalDisk {
let name = path_join_buf(&[current.as_str(), entry.as_str()]);
if !dir_stack.is_empty() {
if let Some(pop) = dir_stack.last().cloned() {
if pop < name {
out.write_obj(&MetaCacheEntry {
name: pop.clone(),
..Default::default()
})
.await?;
while let Some(pop) = dir_stack.last().cloned()
&& pop < name
{
out.write_obj(&MetaCacheEntry {
name: pop.clone(),
..Default::default()
})
.await?;
if opts.recursive {
if let Err(er) = Box::pin(self.scan_dir(pop, prefix.clone(), opts, out, objs_returned)).await {
error!("scan_dir err {:?}", er);
}
}
dir_stack.pop();
if opts.recursive {
if let Err(er) = Box::pin(self.scan_dir(pop, prefix.clone(), opts, out, objs_returned)).await {
error!("scan_dir err {:?}", er);
}
}
dir_stack.pop();
}
let mut meta = MetaCacheEntry {

View File

@@ -26,9 +26,11 @@ use rustfs_madmin::metrics::RealtimeMetrics;
use rustfs_madmin::net::NetInfo;
use rustfs_madmin::{ItemState, ServerProperties};
use std::collections::hash_map::DefaultHasher;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::sync::OnceLock;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use tokio::time::timeout;
use tracing::{error, warn};
lazy_static! {
@@ -220,24 +222,21 @@ impl NotificationSys {
pub async fn server_info(&self) -> Vec<ServerProperties> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
let endpoints = get_global_endpoints();
let peer_timeout = Duration::from_secs(2);
for client in self.peer_clients.iter() {
let endpoints = endpoints.clone();
futures.push(async move {
if let Some(client) = client {
match client.server_info().await {
Ok(info) => info,
Err(_) => ServerProperties {
uptime: SystemTime::now()
.duration_since(*GLOBAL_BOOT_TIME.get().unwrap())
.unwrap_or_default()
.as_secs(),
version: get_commit_id(),
endpoint: client.host.to_string(),
state: ItemState::Offline.to_string().to_owned(),
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),
..Default::default()
},
}
let host = client.host.to_string();
call_peer_with_timeout(
peer_timeout,
&host,
|| client.server_info(),
|| offline_server_properties(&host, &endpoints),
)
.await
} else {
ServerProperties::default()
}
@@ -694,6 +693,43 @@ impl NotificationSys {
}
}
async fn call_peer_with_timeout<F, Fut>(
timeout_dur: Duration,
host_label: &str,
op: F,
fallback: impl FnOnce() -> ServerProperties,
) -> ServerProperties
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ServerProperties>> + Send,
{
match timeout(timeout_dur, op()).await {
Ok(Ok(info)) => info,
Ok(Err(err)) => {
warn!("peer {host_label} server_info failed: {err}");
fallback()
}
Err(_) => {
warn!("peer {host_label} server_info timed out after {:?}", timeout_dur);
fallback()
}
}
}
fn offline_server_properties(host: &str, endpoints: &EndpointServerPools) -> ServerProperties {
ServerProperties {
uptime: SystemTime::now()
.duration_since(*GLOBAL_BOOT_TIME.get().unwrap())
.unwrap_or_default()
.as_secs(),
version: get_commit_id(),
endpoint: host.to_string(),
state: ItemState::Offline.to_string().to_owned(),
disks: get_offline_disks(host, endpoints),
..Default::default()
}
}
fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec<rustfs_madmin::Disk> {
let mut offline_disks = Vec::new();
@@ -714,3 +750,57 @@ fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec
offline_disks
}
#[cfg(test)]
mod tests {
use super::*;
fn build_props(endpoint: &str) -> ServerProperties {
ServerProperties {
endpoint: endpoint.to_string(),
..Default::default()
}
}
#[tokio::test]
async fn call_peer_with_timeout_returns_value_when_fast() {
let result = call_peer_with_timeout(
Duration::from_millis(50),
"peer-1",
|| async { Ok::<_, Error>(build_props("fast")) },
|| build_props("fallback"),
)
.await;
assert_eq!(result.endpoint, "fast");
}
#[tokio::test]
async fn call_peer_with_timeout_uses_fallback_on_error() {
let result = call_peer_with_timeout(
Duration::from_millis(50),
"peer-2",
|| async { Err::<ServerProperties, _>(Error::other("boom")) },
|| build_props("fallback"),
)
.await;
assert_eq!(result.endpoint, "fallback");
}
#[tokio::test]
async fn call_peer_with_timeout_uses_fallback_on_timeout() {
let result = call_peer_with_timeout(
Duration::from_millis(5),
"peer-3",
|| async {
tokio::time::sleep(Duration::from_millis(25)).await;
Ok::<_, Error>(build_props("slow"))
},
|| build_props("fallback"),
)
.await;
assert_eq!(result.endpoint, "fallback");
}
}

View File

@@ -15,7 +15,7 @@
use crate::global::get_global_action_cred;
use base64::Engine as _;
use base64::engine::general_purpose;
use hmac::{Hmac, Mac};
use hmac::{Hmac, KeyInit, Mac};
use http::HeaderMap;
use http::HeaderValue;
use http::Method;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};
use bytes::Bytes;
use futures::lock::Mutex;
@@ -40,7 +40,7 @@ use crate::{
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_rio::{HttpReader, HttpWriter};
use tokio::io::AsyncWrite;
use tokio::{io::AsyncWrite, net::TcpStream, time::timeout};
use tonic::Request;
use tracing::info;
use uuid::Uuid;
@@ -54,6 +54,8 @@ pub struct RemoteDisk {
endpoint: Endpoint,
}
const REMOTE_DISK_ONLINE_PROBE_TIMEOUT: Duration = Duration::from_millis(750);
impl RemoteDisk {
pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result<Self> {
// let root = fs::canonicalize(ep.url.path()).await?;
@@ -83,11 +85,19 @@ impl DiskAPI for RemoteDisk {
#[tracing::instrument(skip(self))]
async fn is_online(&self) -> bool {
// TODO: connection status tracking
if node_service_time_out_client(&self.addr).await.is_ok() {
return true;
let Some(host) = self.endpoint.url.host_str().map(|host| host.to_string()) else {
return false;
};
let port = self.endpoint.url.port_or_known_default().unwrap_or(80);
match timeout(REMOTE_DISK_ONLINE_PROBE_TIMEOUT, TcpStream::connect((host, port))).await {
Ok(Ok(stream)) => {
drop(stream);
true
}
_ => false,
}
false
}
#[tracing::instrument(skip(self))]
@@ -957,6 +967,7 @@ impl DiskAPI for RemoteDisk {
#[cfg(test)]
mod tests {
use super::*;
use tokio::net::TcpListener;
use uuid::Uuid;
#[tokio::test]
@@ -1040,6 +1051,58 @@ mod tests {
assert!(path.to_string_lossy().contains("storage"));
}
#[tokio::test]
async fn test_remote_disk_is_online_detects_active_listener() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = url::Url::parse(&format!("http://{}:{}/data/rustfs0", addr.ip(), addr.port())).unwrap();
let endpoint = Endpoint {
url,
is_local: false,
pool_idx: 0,
set_idx: 0,
disk_idx: 0,
};
let disk_option = DiskOption {
cleanup: false,
health_check: false,
};
let remote_disk = RemoteDisk::new(&endpoint, &disk_option).await.unwrap();
assert!(remote_disk.is_online().await);
drop(listener);
}
#[tokio::test]
async fn test_remote_disk_is_online_detects_missing_listener() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let ip = addr.ip();
let port = addr.port();
drop(listener);
let url = url::Url::parse(&format!("http://{}:{}/data/rustfs0", ip, port)).unwrap();
let endpoint = Endpoint {
url,
is_local: false,
pool_idx: 0,
set_idx: 0,
disk_idx: 0,
};
let disk_option = DiskOption {
cleanup: false,
health_check: false,
};
let remote_disk = RemoteDisk::new(&endpoint, &disk_option).await.unwrap();
assert!(!remote_disk.is_online().await);
}
#[tokio::test]
async fn test_remote_disk_disk_id() {
let url = url::Url::parse("http://remote-server:9000").unwrap();

View File

@@ -88,7 +88,7 @@ use s3s::header::X_AMZ_RESTORE;
use sha2::{Digest, Sha256};
use std::hash::Hash;
use std::mem::{self};
use std::time::SystemTime;
use std::time::{Instant, SystemTime};
use std::{
collections::{HashMap, HashSet},
io::{Cursor, Write},
@@ -104,7 +104,7 @@ use tokio::{
use tokio::{
select,
sync::mpsc::{self, Sender},
time::interval,
time::{interval, timeout},
};
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -113,6 +113,8 @@ use uuid::Uuid;
pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024;
pub const MAX_PARTS_COUNT: usize = 10000;
const DISK_ONLINE_TIMEOUT: Duration = Duration::from_secs(1);
const DISK_HEALTH_CACHE_TTL: Duration = Duration::from_millis(750);
#[derive(Clone, Debug)]
pub struct SetDisks {
@@ -125,6 +127,23 @@ pub struct SetDisks {
pub set_index: usize,
pub pool_index: usize,
pub format: FormatV3,
disk_health_cache: Arc<RwLock<Vec<Option<DiskHealthEntry>>>>,
}
#[derive(Clone, Debug)]
struct DiskHealthEntry {
last_check: Instant,
online: bool,
}
impl DiskHealthEntry {
fn cached_value(&self) -> Option<bool> {
if self.last_check.elapsed() <= DISK_HEALTH_CACHE_TTL {
Some(self.online)
} else {
None
}
}
}
impl SetDisks {
@@ -150,8 +169,60 @@ impl SetDisks {
pool_index,
format,
set_endpoints,
disk_health_cache: Arc::new(RwLock::new(Vec::new())),
})
}
async fn cached_disk_health(&self, index: usize) -> Option<bool> {
let cache = self.disk_health_cache.read().await;
cache
.get(index)
.and_then(|entry| entry.as_ref().and_then(|state| state.cached_value()))
}
async fn update_disk_health(&self, index: usize, online: bool) {
let mut cache = self.disk_health_cache.write().await;
if cache.len() <= index {
cache.resize(index + 1, None);
}
cache[index] = Some(DiskHealthEntry {
last_check: Instant::now(),
online,
});
}
async fn is_disk_online_cached(&self, index: usize, disk: &DiskStore) -> bool {
if let Some(online) = self.cached_disk_health(index).await {
return online;
}
let disk_clone = disk.clone();
let online = timeout(DISK_ONLINE_TIMEOUT, async move { disk_clone.is_online().await })
.await
.unwrap_or(false);
self.update_disk_health(index, online).await;
online
}
async fn filter_online_disks(&self, disks: Vec<Option<DiskStore>>) -> (Vec<Option<DiskStore>>, usize) {
let mut filtered = Vec::with_capacity(disks.len());
let mut online_count = 0;
for (idx, disk) in disks.into_iter().enumerate() {
if let Some(disk_store) = disk {
if self.is_disk_online_cached(idx, &disk_store).await {
filtered.push(Some(disk_store));
online_count += 1;
} else {
filtered.push(None);
}
} else {
filtered.push(None);
}
}
(filtered, online_count)
}
fn format_lock_error(&self, bucket: &str, object: &str, mode: &str, err: &LockResult) -> String {
match err {
LockResult::Timeout => {
@@ -187,25 +258,9 @@ impl SetDisks {
}
async fn get_online_disks(&self) -> Vec<Option<DiskStore>> {
let mut disks = self.get_disks_internal().await;
// TODO: diskinfo filter online
let mut new_disk = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if let Some(d) = disk {
if d.is_online().await {
new_disk.push(disk.clone());
}
}
}
let mut rng = rand::rng();
disks.shuffle(&mut rng);
new_disk
let disks = self.get_disks_internal().await;
let (filtered, _) = self.filter_online_disks(disks).await;
filtered.into_iter().filter(|disk| disk.is_some()).collect()
}
async fn get_online_local_disks(&self) -> Vec<Option<DiskStore>> {
let mut disks = self.get_online_disks().await;
@@ -1268,13 +1323,13 @@ impl SetDisks {
if etag_only || mod_valid {
for part in meta.parts.iter() {
let _ = hasher.write(format!("part.{}", part.number).as_bytes())?;
let _ = hasher.write(format!("part.{}", part.size).as_bytes())?;
hasher.update(format!("part.{}", part.number).as_bytes());
hasher.update(format!("part.{}", part.size).as_bytes());
}
if !meta.deleted && meta.size != 0 {
let _ = hasher.write(format!("{}+{}", meta.erasure.data_blocks, meta.erasure.parity_blocks).as_bytes())?;
let _ = hasher.write(format!("{:?}", meta.erasure.distribution).as_bytes())?;
hasher.update(format!("{}+{}", meta.erasure.data_blocks, meta.erasure.parity_blocks).as_bytes());
hasher.update(format!("{:?}", meta.erasure.distribution).as_bytes());
}
if meta.is_remote() {
@@ -1285,8 +1340,6 @@ impl SetDisks {
// TODO: IsCompressed
hasher.flush()?;
meta_hashes[i] = Some(hex(hasher.clone().finalize().as_slice()));
hasher.reset();
@@ -3581,7 +3634,8 @@ impl ObjectIO for SetDisks {
#[tracing::instrument(level = "debug", skip(self, data,))]
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
let disks = self.disks.read().await;
let disks_snapshot = self.get_disks_internal().await;
let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
let _object_lock_guard = if !opts.no_lock {
@@ -3622,6 +3676,14 @@ impl ObjectIO for SetDisks {
write_quorum += 1
}
if filtered_online < write_quorum {
warn!(
"online disk snapshot {} below write quorum {} for {}/{}; returning erasure write quorum error",
filtered_online, write_quorum, bucket, object
);
return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
}
let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives);
fi.version_id = {
@@ -4901,7 +4963,16 @@ impl StorageAPI for SetDisks {
return Err(Error::other(format!("checksum mismatch: {checksum}")));
}
let disks = self.disks.read().await.clone();
let disks_snapshot = self.get_disks_internal().await;
let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
if filtered_online < write_quorum {
warn!(
"online disk snapshot {} below write quorum {} for multipart {}/{}; returning erasure write quorum error",
filtered_online, write_quorum, bucket, object
);
return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
}
let shuffle_disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
@@ -6480,9 +6551,11 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
}
let mut hasher = Md5::new();
let _ = hasher.write(&buf);
hasher.update(&buf);
format!("{:x}-{}", hasher.finalize(), parts.len())
let digest = hasher.finalize();
let etag_hex = faster_hex::hex_string(digest.as_slice());
format!("{}-{}", etag_hex, parts.len())
}
pub fn canonicalize_etag(etag: &str) -> String {
@@ -6562,6 +6635,26 @@ mod tests {
use std::collections::HashMap;
use time::OffsetDateTime;
#[test]
fn disk_health_entry_returns_cached_value_within_ttl() {
let entry = DiskHealthEntry {
last_check: Instant::now(),
online: true,
};
assert_eq!(entry.cached_value(), Some(true));
}
#[test]
fn disk_health_entry_expires_after_ttl() {
let entry = DiskHealthEntry {
last_check: Instant::now() - (DISK_HEALTH_CACHE_TTL + Duration::from_millis(100)),
online: true,
};
assert!(entry.cached_value().is_none());
}
#[test]
fn test_check_part_constants() {
// Test that all CHECK_PART constants have expected values

View File

@@ -19,12 +19,12 @@ use crate::config::KmsConfig;
use crate::config::LocalConfig;
use crate::error::{KmsError, Result};
use crate::types::*;
use aes_gcm::aead::rand_core::RngCore;
use aes_gcm::{
Aes256Gcm, Key, Nonce,
aead::{Aead, AeadCore, KeyInit, OsRng},
aead::{Aead, KeyInit},
};
use async_trait::async_trait;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
@@ -105,8 +105,9 @@ impl LocalKmsClient {
hasher.update(master_key.as_bytes());
hasher.update(b"rustfs-kms-local"); // Salt to prevent rainbow tables
let hash = hasher.finalize();
Ok(*Key::<Aes256Gcm>::from_slice(&hash))
let key = Key::<Aes256Gcm>::try_from(hash.as_slice())
.map_err(|_| KmsError::cryptographic_error("key", "Invalid key length"))?;
Ok(key)
}
/// Get the file path for a master key
@@ -117,7 +118,6 @@ impl LocalKmsClient {
/// Load a master key from disk
async fn load_master_key(&self, key_id: &str) -> Result<MasterKey> {
let key_path = self.master_key_path(key_id);
if !key_path.exists() {
return Err(KmsError::key_not_found(key_id));
}
@@ -127,9 +127,16 @@ impl LocalKmsClient {
// Decrypt key material if master cipher is available
let _key_material = if let Some(ref cipher) = self.master_cipher {
let nonce = Nonce::from_slice(&stored_key.nonce);
if stored_key.nonce.len() != 12 {
return Err(KmsError::cryptographic_error("nonce", "Invalid nonce length"));
}
let mut nonce_array = [0u8; 12];
nonce_array.copy_from_slice(&stored_key.nonce);
let nonce = Nonce::from(nonce_array);
cipher
.decrypt(nonce, stored_key.encrypted_key_material.as_ref())
.decrypt(&nonce, stored_key.encrypted_key_material.as_ref())
.map_err(|e| KmsError::cryptographic_error("decrypt", e.to_string()))?
} else {
stored_key.encrypted_key_material
@@ -155,7 +162,10 @@ impl LocalKmsClient {
// Encrypt key material if master cipher is available
let (encrypted_key_material, nonce) = if let Some(ref cipher) = self.master_cipher {
let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
let mut nonce_bytes = [0u8; 12];
rand::rng().fill(&mut nonce_bytes[..]);
let nonce = Nonce::from(nonce_bytes);
let encrypted = cipher
.encrypt(&nonce, key_material)
.map_err(|e| KmsError::cryptographic_error("encrypt", e.to_string()))?;
@@ -202,7 +212,7 @@ impl LocalKmsClient {
/// Generate a random 256-bit key
fn generate_key_material() -> Vec<u8> {
let mut key_material = vec![0u8; 32]; // 256 bits
OsRng.fill_bytes(&mut key_material);
rand::rng().fill(&mut key_material[..]);
key_material
}
@@ -219,9 +229,14 @@ impl LocalKmsClient {
// Decrypt key material if master cipher is available
let key_material = if let Some(ref cipher) = self.master_cipher {
let nonce = Nonce::from_slice(&stored_key.nonce);
if stored_key.nonce.len() != 12 {
return Err(KmsError::cryptographic_error("nonce", "Invalid nonce length"));
}
let mut nonce_array = [0u8; 12];
nonce_array.copy_from_slice(&stored_key.nonce);
let nonce = Nonce::from(nonce_array);
cipher
.decrypt(nonce, stored_key.encrypted_key_material.as_ref())
.decrypt(&nonce, stored_key.encrypted_key_material.as_ref())
.map_err(|e| KmsError::cryptographic_error("decrypt", e.to_string()))?
} else {
stored_key.encrypted_key_material
@@ -234,25 +249,39 @@ impl LocalKmsClient {
async fn encrypt_with_master_key(&self, key_id: &str, plaintext: &[u8]) -> Result<(Vec<u8>, Vec<u8>)> {
// Load the actual master key material
let key_material = self.get_key_material(key_id).await?;
let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(&key_material));
let key = Key::<Aes256Gcm>::try_from(key_material.as_slice())
.map_err(|_| KmsError::cryptographic_error("key", "Invalid key length"))?;
let cipher = Aes256Gcm::new(&key);
let mut nonce_bytes = [0u8; 12];
rand::rng().fill(&mut nonce_bytes[..]);
let nonce = Nonce::from(nonce_bytes);
let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
let ciphertext = cipher
.encrypt(&nonce, plaintext)
.map_err(|e| KmsError::cryptographic_error("encrypt", e.to_string()))?;
Ok((ciphertext, nonce.to_vec()))
Ok((ciphertext, nonce_bytes.to_vec()))
}
/// Decrypt data using a master key
async fn decrypt_with_master_key(&self, key_id: &str, ciphertext: &[u8], nonce: &[u8]) -> Result<Vec<u8>> {
if nonce.len() != 12 {
return Err(KmsError::cryptographic_error("nonce", "Invalid nonce length"));
}
// Load the actual master key material
let key_material = self.get_key_material(key_id).await?;
let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(&key_material));
let key = Key::<Aes256Gcm>::try_from(key_material.as_slice())
.map_err(|_| KmsError::cryptographic_error("key", "Invalid key length"))?;
let cipher = Aes256Gcm::new(&key);
let mut nonce_array = [0u8; 12];
nonce_array.copy_from_slice(nonce);
let nonce_ref = Nonce::from(nonce_array);
let nonce = Nonce::from_slice(nonce);
let plaintext = cipher
.decrypt(nonce, ciphertext)
.decrypt(&nonce_ref, ciphertext)
.map_err(|e| KmsError::cryptographic_error("decrypt", e.to_string()))?;
Ok(plaintext)
@@ -275,7 +304,7 @@ impl KmsClient for LocalKmsClient {
};
let mut plaintext_key = vec![0u8; key_length];
OsRng.fill_bytes(&mut plaintext_key);
rand::rng().fill(&mut plaintext_key[..]);
// Encrypt the data key with the master key
let (encrypted_key, nonce) = self.encrypt_with_master_key(&request.master_key_id, &plaintext_key).await?;
@@ -776,9 +805,14 @@ impl KmsBackend for LocalKmsBackend {
// Decrypt the existing key material to preserve it
let existing_key_material = if let Some(ref cipher) = self.client.master_cipher {
let nonce = Nonce::from_slice(&stored_key.nonce);
if stored_key.nonce.len() != 12 {
return Err(KmsError::cryptographic_error("nonce", "Invalid nonce length"));
}
let mut nonce_array = [0u8; 12];
nonce_array.copy_from_slice(&stored_key.nonce);
let nonce = Nonce::from(nonce_array);
cipher
.decrypt(nonce, stored_key.encrypted_key_material.as_ref())
.decrypt(&nonce, stored_key.encrypted_key_material.as_ref())
.map_err(|e| KmsError::cryptographic_error("decrypt", e.to_string()))?
} else {
stored_key.encrypted_key_material

View File

@@ -20,7 +20,6 @@ use async_trait::async_trait;
use std::collections::HashMap;
pub mod local;
pub mod vault;
/// Abstract KMS client interface that all backends must implement

View File

@@ -16,12 +16,12 @@
use crate::error::{KmsError, Result};
use crate::types::EncryptionAlgorithm;
use aes_gcm::aead::rand_core::RngCore;
use aes_gcm::{
Aes256Gcm, Key, Nonce,
aead::{Aead, KeyInit, OsRng},
aead::{Aead, KeyInit},
};
use chacha20poly1305::ChaCha20Poly1305;
use rand::Rng;
/// Trait for object encryption ciphers
#[cfg_attr(not(test), allow(dead_code))]
@@ -57,8 +57,8 @@ impl AesCipher {
return Err(KmsError::invalid_key_size(32, key.len()));
}
let key = Key::<Aes256Gcm>::from_slice(key);
let cipher = Aes256Gcm::new(key);
let key = Key::<Aes256Gcm>::try_from(key).map_err(|_| KmsError::cryptographic_error("key", "Invalid key length"))?;
let cipher = Aes256Gcm::new(&key);
Ok(Self { cipher })
}
@@ -70,12 +70,12 @@ impl ObjectCipher for AesCipher {
return Err(KmsError::invalid_key_size(12, iv.len()));
}
let nonce = Nonce::from_slice(iv);
let nonce = Nonce::try_from(iv).map_err(|_| KmsError::cryptographic_error("nonce", "Invalid nonce length"))?;
// AES-GCM includes the tag in the ciphertext
let ciphertext_with_tag = self
.cipher
.encrypt(nonce, aes_gcm::aead::Payload { msg: plaintext, aad })
.encrypt(&nonce, aes_gcm::aead::Payload { msg: plaintext, aad })
.map_err(KmsError::from_aes_gcm_error)?;
// Split ciphertext and tag
@@ -98,7 +98,7 @@ impl ObjectCipher for AesCipher {
return Err(KmsError::invalid_key_size(self.tag_size(), tag.len()));
}
let nonce = Nonce::from_slice(iv);
let nonce = Nonce::try_from(iv).map_err(|_| KmsError::cryptographic_error("nonce", "Invalid nonce length"))?;
// Combine ciphertext and tag for AES-GCM
let mut ciphertext_with_tag = ciphertext.to_vec();
@@ -107,7 +107,7 @@ impl ObjectCipher for AesCipher {
let plaintext = self
.cipher
.decrypt(
nonce,
&nonce,
aes_gcm::aead::Payload {
msg: &ciphertext_with_tag,
aad,
@@ -147,8 +147,8 @@ impl ChaCha20Cipher {
return Err(KmsError::invalid_key_size(32, key.len()));
}
let key = chacha20poly1305::Key::from_slice(key);
let cipher = ChaCha20Poly1305::new(key);
let key = chacha20poly1305::Key::try_from(key).map_err(|_| KmsError::cryptographic_error("key", "Invalid key length"))?;
let cipher = ChaCha20Poly1305::new(&key);
Ok(Self { cipher })
}
@@ -160,12 +160,13 @@ impl ObjectCipher for ChaCha20Cipher {
return Err(KmsError::invalid_key_size(12, iv.len()));
}
let nonce = chacha20poly1305::Nonce::from_slice(iv);
let nonce =
chacha20poly1305::Nonce::try_from(iv).map_err(|_| KmsError::cryptographic_error("nonce", "Invalid nonce length"))?;
// ChaCha20-Poly1305 includes the tag in the ciphertext
let ciphertext_with_tag = self
.cipher
.encrypt(nonce, chacha20poly1305::aead::Payload { msg: plaintext, aad })
.encrypt(&nonce, chacha20poly1305::aead::Payload { msg: plaintext, aad })
.map_err(KmsError::from_chacha20_error)?;
// Split ciphertext and tag
@@ -188,7 +189,8 @@ impl ObjectCipher for ChaCha20Cipher {
return Err(KmsError::invalid_key_size(self.tag_size(), tag.len()));
}
let nonce = chacha20poly1305::Nonce::from_slice(iv);
let nonce =
chacha20poly1305::Nonce::try_from(iv).map_err(|_| KmsError::cryptographic_error("nonce", "Invalid nonce length"))?;
// Combine ciphertext and tag for ChaCha20-Poly1305
let mut ciphertext_with_tag = ciphertext.to_vec();
@@ -197,7 +199,7 @@ impl ObjectCipher for ChaCha20Cipher {
let plaintext = self
.cipher
.decrypt(
nonce,
&nonce,
chacha20poly1305::aead::Payload {
msg: &ciphertext_with_tag,
aad,
@@ -241,7 +243,7 @@ pub fn generate_iv(algorithm: &EncryptionAlgorithm) -> Vec<u8> {
};
let mut iv = vec![0u8; iv_size];
OsRng.fill_bytes(&mut iv);
rand::rng().fill(&mut iv[..]);
iv
}

View File

@@ -35,7 +35,6 @@ chrono = { workspace = true, features = ["serde"] }
futures = { workspace = true }
form_urlencoded = { workspace = true }
hashbrown = { workspace = true }
once_cell = { workspace = true }
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
rayon = { workspace = true }
rumqttc = { workspace = true }

View File

@@ -12,11 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_targets::TargetError;
use rustfs_targets::arn::TargetID;
use rustfs_targets::{TargetError, arn::TargetID};
use std::io;
use thiserror::Error;
/// Errors related to the notification system's lifecycle.
#[derive(Debug, Error)]
pub enum LifecycleError {
/// Error indicating the system has already been initialized.
#[error("System has already been initialized")]
AlreadyInitialized,
/// Error indicating the system has not been initialized yet.
#[error("System has not been initialized")]
NotInitialized,
}
/// Error types for the notification system
#[derive(Debug, Error)]
pub enum NotificationError {
@@ -38,11 +49,8 @@ pub enum NotificationError {
#[error("Rule configuration error: {0}")]
RuleConfiguration(String),
#[error("System initialization error: {0}")]
Initialization(String),
#[error("Notification system has already been initialized")]
AlreadyInitialized,
#[error("System lifecycle error: {0}")]
Lifecycle(#[from] LifecycleError),
#[error("I/O error: {0}")]
Io(io::Error),
@@ -56,6 +64,9 @@ pub enum NotificationError {
#[error("Target '{0}' not found")]
TargetNotFound(TargetID),
#[error("Server not initialized")]
ServerNotInitialized,
#[error("System initialization error: {0}")]
Initialization(String),
#[error("Storage not available: {0}")]
StorageNotAvailable(String),
}

View File

@@ -276,3 +276,120 @@ impl EventArgs {
self.req_params.contains_key("x-rustfs-source-replication-request")
}
}
/// Builder for [`EventArgs`].
///
/// This builder provides a fluent API to construct an `EventArgs` instance,
/// ensuring that all required fields are provided.
///
/// # Example
///
/// ```ignore
/// let args = EventArgsBuilder::new(
/// EventName::ObjectCreatedPut,
/// "my-bucket",
/// object_info,
/// )
/// .host("localhost:9000")
/// .user_agent("my-app/1.0")
/// .build();
/// ```
#[derive(Debug, Clone, Default)]
pub struct EventArgsBuilder {
event_name: EventName,
bucket_name: String,
object: rustfs_ecstore::store_api::ObjectInfo,
req_params: HashMap<String, String>,
resp_elements: HashMap<String, String>,
version_id: String,
host: String,
user_agent: String,
}
impl EventArgsBuilder {
/// Creates a new builder with the required fields.
pub fn new(event_name: EventName, bucket_name: impl Into<String>, object: rustfs_ecstore::store_api::ObjectInfo) -> Self {
Self {
event_name,
bucket_name: bucket_name.into(),
object,
..Default::default()
}
}
/// Sets the event name.
pub fn event_name(mut self, event_name: EventName) -> Self {
self.event_name = event_name;
self
}
/// Sets the bucket name.
pub fn bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
self.bucket_name = bucket_name.into();
self
}
/// Sets the object information.
pub fn object(mut self, object: rustfs_ecstore::store_api::ObjectInfo) -> Self {
self.object = object;
self
}
/// Sets the request parameters.
pub fn req_params(mut self, req_params: HashMap<String, String>) -> Self {
self.req_params = req_params;
self
}
/// Adds a single request parameter.
pub fn req_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.req_params.insert(key.into(), value.into());
self
}
/// Sets the response elements.
pub fn resp_elements(mut self, resp_elements: HashMap<String, String>) -> Self {
self.resp_elements = resp_elements;
self
}
/// Adds a single response element.
pub fn resp_element(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.resp_elements.insert(key.into(), value.into());
self
}
/// Sets the version ID.
pub fn version_id(mut self, version_id: impl Into<String>) -> Self {
self.version_id = version_id.into();
self
}
/// Sets the host.
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = host.into();
self
}
/// Sets the user agent.
pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = user_agent.into();
self
}
/// Builds the final `EventArgs` instance.
///
/// This method consumes the builder and returns the constructed `EventArgs`.
pub fn build(self) -> EventArgs {
EventArgs {
event_name: self.event_name,
bucket_name: self.bucket_name,
object: self.object,
req_params: self.req_params,
resp_elements: self.resp_elements,
version_id: self.version_id,
host: self.host,
user_agent: self.user_agent,
}
}
}

View File

@@ -12,17 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{BucketNotificationConfig, Event, EventArgs, NotificationError, NotificationSystem};
use once_cell::sync::Lazy;
use crate::{BucketNotificationConfig, Event, EventArgs, LifecycleError, NotificationError, NotificationSystem};
use rustfs_ecstore::config::Config;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use rustfs_targets::{EventName, arn::TargetID};
use std::sync::{Arc, OnceLock};
use tracing::{error, instrument};
use tracing::error;
static NOTIFICATION_SYSTEM: OnceLock<Arc<NotificationSystem>> = OnceLock::new();
// Create a globally unique Notifier instance
static GLOBAL_NOTIFIER: Lazy<Notifier> = Lazy::new(|| Notifier {});
/// Initialize the global notification system with the given configuration.
/// This function should only be called once throughout the application life cycle.
@@ -34,7 +30,7 @@ pub async fn initialize(config: Config) -> Result<(), NotificationError> {
match NOTIFICATION_SYSTEM.set(Arc::new(system)) {
Ok(_) => Ok(()),
Err(_) => Err(NotificationError::AlreadyInitialized),
Err(_) => Err(NotificationError::Lifecycle(LifecycleError::AlreadyInitialized)),
}
}
@@ -49,14 +45,11 @@ pub fn is_notification_system_initialized() -> bool {
NOTIFICATION_SYSTEM.get().is_some()
}
/// Returns a reference to the global Notifier instance.
pub fn notifier_instance() -> &'static Notifier {
&GLOBAL_NOTIFIER
}
/// A module providing the public API for event notification.
pub mod notifier_global {
use super::*;
use tracing::instrument;
pub struct Notifier {}
impl Notifier {
/// Notify an event asynchronously.
/// This is the only entry point for all event notifications in the system.
/// # Parameter
@@ -67,8 +60,8 @@ impl Notifier {
///
/// # Using
/// This function is used to notify events in the system, such as object creation, deletion, or updates.
#[instrument(skip(self, args))]
pub async fn notify(&self, args: EventArgs) {
#[instrument(skip(args))]
pub async fn notify(args: EventArgs) {
// Dependency injection or service positioning mode obtain NotificationSystem instance
let notification_sys = match notification_system() {
// If the notification system itself cannot be retrieved, it will be returned directly
@@ -110,7 +103,6 @@ impl Notifier {
/// # Using
/// This function allows you to dynamically add notification rules for a specific bucket.
pub async fn add_bucket_notification_rule(
&self,
bucket_name: &str,
region: &str,
event_names: &[EventName],
@@ -137,7 +129,7 @@ impl Notifier {
// Get global NotificationSystem
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
None => return Err(NotificationError::Lifecycle(LifecycleError::NotInitialized)),
};
// Loading configuration
@@ -159,7 +151,6 @@ impl Notifier {
/// # Using
/// Supports notification rules for adding multiple event types, prefixes, suffixes, and targets to the same bucket in batches.
pub async fn add_event_specific_rules(
&self,
bucket_name: &str,
region: &str,
event_rules: &[(Vec<EventName>, String, String, Vec<TargetID>)],
@@ -176,10 +167,7 @@ impl Notifier {
}
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
let notification_sys = notification_system().ok_or(NotificationError::Lifecycle(LifecycleError::NotInitialized))?;
// Loading configuration
notification_sys
@@ -196,12 +184,9 @@ impl Notifier {
/// This function allows you to clear all notification rules for a specific bucket.
/// This is useful when you want to reset the notification configuration for a bucket.
///
pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> {
pub async fn clear_bucket_notification_rules(bucket_name: &str) -> Result<(), NotificationError> {
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
let notification_sys = notification_system().ok_or(NotificationError::Lifecycle(LifecycleError::NotInitialized))?;
// Clear configuration
notification_sys.remove_bucket_notification_config(bucket_name).await;

View File

@@ -199,7 +199,9 @@ impl NotificationSystem {
F: FnMut(&mut Config) -> bool, // The closure returns a boolean value indicating whether the configuration has been changed
{
let Some(store) = rustfs_ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::ServerNotInitialized);
return Err(NotificationError::StorageNotAvailable(
"Failed to save target configuration: server storage not initialized".to_string(),
));
};
let mut new_config = rustfs_ecstore::config::com::read_config_without_migrate(store.clone())

View File

@@ -18,18 +18,18 @@
//! It supports sending events to various targets
//! (like Webhook and MQTT) and includes features like event persistence and retry on failure.
pub mod error;
pub mod event;
mod error;
mod event;
pub mod factory;
pub mod global;
mod global;
pub mod integration;
pub mod notifier;
pub mod registry;
pub mod rules;
pub mod stream;
// Re-exports
pub use error::NotificationError;
pub use event::{Event, EventArgs};
pub use global::{initialize, is_notification_system_initialized, notification_system};
pub use error::{LifecycleError, NotificationError};
pub use event::{Event, EventArgs, EventArgsBuilder};
pub use global::{initialize, is_notification_system_initialized, notification_system, notifier_global};
pub use integration::NotificationSystem;
pub use rules::BucketNotificationConfig;

View File

@@ -45,7 +45,7 @@ opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] }
opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use rustfs_config::observability::{
ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL,
ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT,
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_ENDPOINT,
ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME,
ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, ENV_OBS_SAMPLE_RATIO,
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_USE_STDOUT,
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME,
@@ -23,6 +24,7 @@ use rustfs_config::{
USE_STDOUT,
};
use rustfs_utils::dirs::get_log_directory_to_string;
use rustfs_utils::{get_env_bool, get_env_f64, get_env_opt_str, get_env_str, get_env_u64, get_env_usize};
use serde::{Deserialize, Serialize};
use std::env;
@@ -55,6 +57,9 @@ use std::env;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct OtelConfig {
pub endpoint: String, // Endpoint for metric collection
pub trace_endpoint: Option<String>, // Endpoint for trace collection
pub metric_endpoint: Option<String>, // Endpoint for metric collection
pub log_endpoint: Option<String>, // Endpoint for log collection
pub use_stdout: Option<bool>, // Output to stdout
pub sample_ratio: Option<f64>, // Trace sampling ratio
pub meter_interval: Option<u64>, // Metric collection interval
@@ -68,7 +73,7 @@ pub struct OtelConfig {
pub log_filename: Option<String>, // The name of the log file
pub log_rotation_size_mb: Option<u64>, // Log file size cut threshold (MB)
pub log_rotation_time: Option<String>, // Logs are cut by time (Hour DayMinute Second)
pub log_keep_files: Option<u16>, // Number of log files to be retained
pub log_keep_files: Option<usize>, // Number of log files to be retained
}
impl OtelConfig {
@@ -83,62 +88,29 @@ impl OtelConfig {
} else {
env::var(ENV_OBS_ENDPOINT).unwrap_or_else(|_| "".to_string())
};
let mut use_stdout = env::var(ENV_OBS_USE_STDOUT)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(USE_STDOUT));
let mut use_stdout = get_env_bool(ENV_OBS_USE_STDOUT, USE_STDOUT);
if endpoint.is_empty() {
use_stdout = Some(true);
use_stdout = true;
}
OtelConfig {
endpoint,
use_stdout,
sample_ratio: env::var(ENV_OBS_SAMPLE_RATIO)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SAMPLE_RATIO)),
meter_interval: env::var(ENV_OBS_METER_INTERVAL)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(METER_INTERVAL)),
service_name: env::var(ENV_OBS_SERVICE_NAME)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(APP_NAME.to_string())),
service_version: env::var(ENV_OBS_SERVICE_VERSION)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_VERSION.to_string())),
environment: env::var(ENV_OBS_ENVIRONMENT)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(ENVIRONMENT.to_string())),
logger_level: env::var(ENV_OBS_LOGGER_LEVEL)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
log_stdout_enabled: env::var(ENV_OBS_LOG_STDOUT_ENABLED)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_OBS_LOG_STDOUT_ENABLED)),
trace_endpoint: get_env_opt_str(ENV_OBS_TRACE_ENDPOINT),
metric_endpoint: get_env_opt_str(ENV_OBS_METRIC_ENDPOINT),
log_endpoint: get_env_opt_str(ENV_OBS_LOG_ENDPOINT),
use_stdout: Some(use_stdout),
sample_ratio: Some(get_env_f64(ENV_OBS_SAMPLE_RATIO, SAMPLE_RATIO)),
meter_interval: Some(get_env_u64(ENV_OBS_METER_INTERVAL, METER_INTERVAL)),
service_name: Some(get_env_str(ENV_OBS_SERVICE_NAME, APP_NAME)),
service_version: Some(get_env_str(ENV_OBS_SERVICE_VERSION, SERVICE_VERSION)),
environment: Some(get_env_str(ENV_OBS_ENVIRONMENT, ENVIRONMENT)),
logger_level: Some(get_env_str(ENV_OBS_LOGGER_LEVEL, DEFAULT_LOG_LEVEL)),
log_stdout_enabled: Some(get_env_bool(ENV_OBS_LOG_STDOUT_ENABLED, DEFAULT_OBS_LOG_STDOUT_ENABLED)),
log_directory: Some(get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY)),
log_filename: env::var(ENV_OBS_LOG_FILENAME)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_OBS_LOG_FILENAME.to_string())),
log_rotation_size_mb: env::var(ENV_OBS_LOG_ROTATION_SIZE_MB)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB
log_rotation_time: env::var(ENV_OBS_LOG_ROTATION_TIME)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day"
log_keep_files: env::var(ENV_OBS_LOG_KEEP_FILES)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files
log_filename: Some(get_env_str(ENV_OBS_LOG_FILENAME, DEFAULT_OBS_LOG_FILENAME)),
log_rotation_size_mb: Some(get_env_u64(ENV_OBS_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB
log_rotation_time: Some(get_env_str(ENV_OBS_LOG_ROTATION_TIME, DEFAULT_LOG_ROTATION_TIME)), // Default to "Hour"
log_keep_files: Some(get_env_usize(ENV_OBS_LOG_KEEP_FILES, DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files
}
}
@@ -237,3 +209,12 @@ impl Default for AppConfig {
Self::new()
}
}
/// Check if the current environment is production
///
/// # Returns
/// true if production, false otherwise
///
pub fn is_production_environment() -> bool {
get_env_str(ENV_OBS_ENVIRONMENT, ENVIRONMENT).eq_ignore_ascii_case(DEFAULT_OBS_ENVIRONMENT_PRODUCTION)
}

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::telemetry::{OtelGuard, init_telemetry};
use crate::{AppConfig, SystemObserver};
use crate::{AppConfig, OtelGuard, SystemObserver, TelemetryError, telemetry::init_telemetry};
use std::sync::{Arc, Mutex};
use tokio::sync::{OnceCell, SetError};
use tracing::{error, info};
@@ -52,6 +51,8 @@ pub enum GlobalError {
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
#[error("Telemetry initialization failed: {0}")]
TelemetryError(#[from] TelemetryError),
}
/// Initialize the observability module
@@ -68,14 +69,17 @@ pub enum GlobalError {
///
/// # #[tokio::main]
/// # async fn main() {
/// # let guard = init_obs(None).await;
/// # match init_obs(None).await {
/// # Ok(guard) => {}
/// # Err(e) => { eprintln!("Failed to initialize observability: {}", e); }
/// # }
/// # }
/// ```
pub async fn init_obs(endpoint: Option<String>) -> OtelGuard {
pub async fn init_obs(endpoint: Option<String>) -> Result<OtelGuard, GlobalError> {
// Load the configuration file
let config = AppConfig::new_with_endpoint(endpoint);
let otel_guard = init_telemetry(&config.observability);
let otel_guard = init_telemetry(&config.observability)?;
// Server will be created per connection - this ensures isolation
tokio::spawn(async move {
// Record the PID-related metrics of the current process
@@ -90,7 +94,7 @@ pub async fn init_obs(endpoint: Option<String>) -> OtelGuard {
}
});
otel_guard
Ok(otel_guard)
}
/// Set the global guard for OpenTelemetry
@@ -107,7 +111,10 @@ pub async fn init_obs(endpoint: Option<String>) -> OtelGuard {
/// # use rustfs_obs::{ init_obs, set_global_guard};
///
/// # async fn init() -> Result<(), Box<dyn std::error::Error>> {
/// # let guard = init_obs(None).await;
/// # let guard = match init_obs(None).await{
/// # Ok(g) => g,
/// # Err(e) => { return Err(Box::new(e)); }
/// # };
/// # set_global_guard(guard)?;
/// # Ok(())
/// # }

View File

@@ -38,7 +38,19 @@
///
/// # #[tokio::main]
/// # async fn main() {
/// # let guard = init_obs(None).await;
/// # let _guard = match init_obs(None).await {
/// # Ok(g) => g,
/// # Err(e) => {
/// # panic!("Failed to initialize observability: {:?}", e);
/// # }
/// # };
/// # // Application logic here
/// # {
/// # // Simulate some work
/// # tokio::time::sleep(std::time::Duration::from_secs(2)).await;
/// # println!("Application is running...");
/// # }
/// # // Guard will be dropped here, flushing telemetry data
/// # }
/// ```
mod config;
@@ -47,6 +59,8 @@ mod metrics;
mod system;
mod telemetry;
pub use config::{AppConfig, OtelConfig};
pub use config::*;
pub use global::*;
pub use metrics::*;
pub use system::SystemObserver;
pub use telemetry::{OtelGuard, TelemetryError};

View File

@@ -17,10 +17,15 @@
/// audit related metric descriptors
///
/// This module contains the metric descriptors for the audit subsystem.
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
const TARGET_ID: &str = "target_id";
pub const RESULT: &str = "result"; // success / failure
pub const STATUS: &str = "status"; // success / failure
pub const SUCCESS: &str = "success";
pub const FAILURE: &str = "failure";
pub static AUDIT_FAILED_MESSAGES_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {
new_counter_md(

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// bucket level s3 metric descriptor
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
use std::sync::LazyLock;
pub static BUCKET_API_TRAFFIC_SENT_BYTES_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Bucket copy metric descriptor
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Bucket level replication metric descriptor

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Metric descriptors related to cluster configuration
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Erasure code set related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// The label for the pool ID

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Cluster health-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static HEALTH_DRIVES_OFFLINE_COUNT_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// IAM related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use std::sync::LazyLock;
pub static LAST_SYNC_DURATION_MILLIS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Notify the relevant metric descriptor
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use std::sync::LazyLock;
pub static NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Descriptors of metrics related to cluster object and bucket usage
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Bucket labels

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::metrics::{MetricName, MetricNamespace, MetricSubsystem, MetricType};
use crate::{MetricName, MetricNamespace, MetricSubsystem, MetricType};
use std::collections::HashSet;
/// MetricDescriptor - Metric descriptors

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::metrics::{MetricDescriptor, MetricName, MetricNamespace, MetricSubsystem, MetricType};
use crate::{MetricDescriptor, MetricName, MetricNamespace, MetricSubsystem, MetricType};
pub(crate) mod descriptor;
pub(crate) mod metric_name;
@@ -76,7 +76,7 @@ pub fn new_histogram_md(
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::subsystems;
use crate::subsystems;
#[test]
fn test_new_histogram_md() {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::metrics::entry::path_utils::format_path_to_metric_name;
use crate::entry::path_utils::format_path_to_metric_name;
/// The metrics subsystem is a subgroup of metrics within a namespace
/// The metrics subsystem, which represents a subgroup of metrics within a namespace
@@ -204,8 +204,8 @@ pub mod subsystems {
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::MetricType;
use crate::metrics::{MetricDescriptor, MetricName, MetricNamespace};
use crate::MetricType;
use crate::{MetricDescriptor, MetricName, MetricNamespace};
#[test]
fn test_metric_subsystem_formatting() {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// ILM-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static ILM_EXPIRY_PENDING_TASKS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// A descriptor for metrics related to webhook logs
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Define label constants for webhook metrics

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Metrics for replication subsystem
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -14,7 +14,7 @@
#![allow(dead_code)]
use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static API_REJECTED_AUTH_TOTAL_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Scanner-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static SCANNER_BUCKET_SCANS_FINISHED_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -14,7 +14,7 @@
#![allow(dead_code)]
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
/// CPU system-related metric descriptors
use std::sync::LazyLock;

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Drive-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// drive related labels

View File

@@ -20,7 +20,7 @@
/// These descriptors are initialized lazily using `std::sync::LazyLock` to ensure
/// they are only created when actually needed, improving performance and reducing
/// startup overhead.
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Total memory available on the node

View File

@@ -20,7 +20,7 @@
/// - Error counts for connection and general internode calls
/// - Network dial performance metrics
/// - Data transfer volume in both directions
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Total number of failed internode calls counter

View File

@@ -19,7 +19,7 @@
/// This module defines various system process metrics used for monitoring
/// the RustFS process performance, resource usage, and system integration.
/// Metrics are implemented using std::sync::LazyLock for thread-safe lazy initialization.
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Number of current READ locks on this peer

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use crate::{GlobalError, is_observability_enabled};
use opentelemetry::global::meter;
use opentelemetry::{global::meter, metrics::Meter};
use sysinfo::Pid;
mod attributes;
mod collector;
@@ -30,7 +31,8 @@ impl SystemObserver {
pub async fn init_process_observer() -> Result<(), GlobalError> {
if is_observability_enabled() {
let meter = meter("system");
return SystemObserver::init_process_observer_for_pid(meter, 30000).await;
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
return SystemObserver::init_process_observer_for_pid(meter, pid).await;
}
Ok(())
}
@@ -38,9 +40,12 @@ impl SystemObserver {
/// Initialize the metric collector for the specified PID process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
let pid = sysinfo::Pid::from_u32(pid);
let mut collector = collector::Collector::new(pid, meter, 30000)?;
pub async fn init_process_observer_for_pid(meter: Meter, pid: Pid) -> Result<(), GlobalError> {
let interval_ms = rustfs_utils::get_env_u64(
rustfs_config::observability::ENV_OBS_METRICS_SYSTEM_INTERVAL_MS,
rustfs_config::observability::DEFAULT_METRICS_SYSTEM_INTERVAL_MS,
);
let mut collector = collector::Collector::new(pid, meter, interval_ms)?;
collector.run().await
}
}

View File

@@ -14,17 +14,13 @@
use crate::config::OtelConfig;
use crate::global::IS_OBSERVABILITY_ENABLED;
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode,
WriteMode::{AsyncWith, BufferAndFlush},
style,
};
use flexi_logger::{DeferredNow, Record, WriteMode, WriteMode::AsyncWith, style};
use metrics::counter;
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::{Compression, Protocol, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
@@ -35,22 +31,22 @@ use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::observability::{ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL,
SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
SAMPLE_RATIO, SERVICE_VERSION,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
ENV_OBS_LOG_DIRECTORY,
},
};
use rustfs_utils::get_local_ip_with_default;
use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use tracing::info;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
@@ -74,20 +70,19 @@ pub struct OtelGuard {
meter_provider: Option<SdkMeterProvider>,
logger_provider: Option<SdkLoggerProvider>,
// Add a flexi_logger handle to keep the logging alive
_flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
// WorkerGuard for writing tracing files
_tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}
// Implement debug manually and avoid relying on all fields to implement debug
impl std::fmt::Debug for OtelGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelGuard")
.field("tracer_provider", &self.tracer_provider.is_some())
.field("meter_provider", &self.meter_provider.is_some())
.field("logger_provider", &self.logger_provider.is_some())
.field("_flexi_logger_handles", &self._flexi_logger_handles.is_some())
.field("_tracing_guard", &self._tracing_guard.is_some())
.field("flexi_logger_handles", &self.flexi_logger_handles.is_some())
.field("tracing_guard", &self.tracing_guard.is_some())
.finish()
}
}
@@ -110,9 +105,42 @@ impl Drop for OtelGuard {
eprintln!("Logger shutdown error: {err:?}");
}
}
if let Some(handle) = self.flexi_logger_handles.take() {
handle.shutdown();
println!("flexi_logger shutdown completed");
}
if let Some(guard) = self.tracing_guard.take() {
// The guard will be dropped here, flushing any remaining logs
drop(guard);
println!("Tracing guard dropped, flushing logs.");
}
}
}
#[derive(Debug)]
pub enum TelemetryError {
BuildSpanExporter(String),
BuildMetricExporter(String),
BuildLogExporter(String),
InstallMetricsRecorder(String),
SubscriberInit(String),
}
impl std::fmt::Display for TelemetryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TelemetryError::BuildSpanExporter(e) => write!(f, "Span exporter build failed: {e}"),
TelemetryError::BuildMetricExporter(e) => write!(f, "Metric exporter build failed: {e}"),
TelemetryError::BuildLogExporter(e) => write!(f, "Log exporter build failed: {e}"),
TelemetryError::InstallMetricsRecorder(e) => write!(f, "Install metrics recorder failed: {e}"),
TelemetryError::SubscriberInit(e) => write!(f, "Tracing subscriber init failed: {e}"),
}
}
}
impl std::error::Error for TelemetryError {}
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
Resource::builder()
@@ -141,456 +169,17 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
.build()
}
/// Initialize Telemetry
pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Environment-aware stdout configuration
// Check for explicit environment control via RUSTFS_OBS_ENVIRONMENT
let is_production = environment.to_lowercase() == DEFAULT_OBS_ENVIRONMENT_PRODUCTION;
// Default stdout behavior based on environment
let default_use_stdout = if is_production {
false // Disable stdout in production for security and log aggregation
} else {
USE_STDOUT // Use configured default for dev/test environments
};
let use_stdout = config.use_stdout.unwrap_or(default_use_stdout);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
// Configure flexi_logger to cut by time and size
let mut flexi_logger_handle = None;
if !endpoint.is_empty() {
// Pre-create resource objects to avoid repeated construction
let res = resource(config);
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if (0.0..1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone());
let tracer_provider = if endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let builder = if use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
};
builder.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
};
// initialize meter provider
let meter_provider = {
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
if endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(meter_interval));
} else {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
}
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
};
match metrics_exporter_opentelemetry::Recorder::builder("order-service").install_global() {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to set global metrics recorder: {e:?}");
}
}
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
if endpoint.is_empty() {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
} else {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
}
builder.build()
};
// configuring tracing
{
// configure the formatting layer
let fmt_layer = {
let enable_color = std::io::stdout().is_terminal();
let mut layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json()
.with_current_span(true)
.with_span_list(true);
let span_event = if is_production {
FmtSpan::CLOSE
} else {
// Only add full span events tracking in the development environment
FmtSpan::FULL
};
layer = layer.with_span_events(span_event);
layer.with_filter(build_env_filter(logger_level, None))
};
let filter = build_env_filter(logger_level, None);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure registry to avoid repeated calls to filter methods
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
Some(fmt_layer)
} else {
None
})
.with(OpenTelemetryLayer::new(tracer))
.with(otel_layer)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
IS_OBSERVABILITY_ENABLED.set(true).ok();
}
}
counter!("rustfs.start.total").increment(1);
return OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
_flexi_logger_handles: flexi_logger_handle,
_tracing_guard: None,
};
}
// Obtain the log directory and file name configuration
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
// Enhanced error handling for directory creation
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: Failed to create log directory '{log_directory}': {e}");
eprintln!("Ensure the parent directory exists and you have write permissions.");
eprintln!("Attempting to continue with logging, but file logging may fail.");
} else {
eprintln!("Log directory ready: {log_directory}");
}
#[cfg(unix)]
{
// Linux/macOS Setting Permissions with better error handling
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => {
eprintln!("WARNING: Failed to set log directory permissions for '{log_directory}': {e}");
eprintln!("This may affect log file access. Consider checking directory ownership and permissions.");
}
}
}
if endpoint.is_empty() && !is_production {
// Create a file appender (rolling by day), add the -tracing suffix to the file name to avoid conflicts
// let file_appender = tracing_appender::rolling::hourly(log_directory, format!("{log_filename}-tracing.log"));
let file_appender = RollingFileAppender::builder()
.rotation(Rotation::HOURLY) // rotate log files once every hour
.filename_prefix(format!("{log_filename}-tracing")) // log file names will be prefixed with `myapp.`
.filename_suffix("log") // log file names will be suffixed with `.log`
.build(log_directory) // try to build an appender that stores log files in `/var/log`
.expect("initializing rolling file appender failed");
let (nb_writer, guard) = tracing_appender::non_blocking(file_appender);
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(nb_writer) // Specify writing file
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id
let env_filter = build_env_filter(logger_level, None);
// Use registry() to register fmt_layer directly to ensure trace_id is output to the log
tracing_subscriber::registry()
.with(env_filter)
.with(ErrorLayer::default())
.with(fmt_layer)
.with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
let stdout_fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(std::io::stdout) // Specify writing file
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id;
Some(stdout_fmt_layer)
} else {
None
})
.init();
info!("Tracing telemetry initialized for non-production with trace_id logging.");
IS_OBSERVABILITY_ENABLED.set(false).ok();
return OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: None,
_tracing_guard: Some(guard),
};
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger level '{logger_level}': {e}. Using default 'info' level.");
LogSpecification::info()
});
// Environment-aware stdout configuration
// In production: disable stdout completely (Duplicate::None)
// In development/test: use level-based filtering
let level_filter = if is_production {
flexi_logger::Duplicate::None // No stdout output in production
} else {
// Convert the logger_level string to the corresponding LevelFilter for dev/test
match logger_level.to_lowercase().as_str() {
"trace" => flexi_logger::Duplicate::Trace,
"debug" => flexi_logger::Duplicate::Debug,
"info" => flexi_logger::Duplicate::Info,
"warn" | "warning" => flexi_logger::Duplicate::Warn,
"error" => flexi_logger::Duplicate::Error,
"off" => flexi_logger::Duplicate::None,
_ => flexi_logger::Duplicate::Info, // the default is info
}
};
// Choose write mode based on environment
let write_mode = if is_production {
get_env_async_with().unwrap_or_else(|| {
eprintln!(
"Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables."
);
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
})
} else {
BufferAndFlush
};
// Configure the flexi_logger with enhanced error handling
let mut flexi_logger_builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
flexi_logger::Logger::with(log_spec.clone())
})
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.write_mode(write_mode)
.append(); // Avoid clearing existing logs at startup
// Environment-aware stdout configuration
flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter);
// Only add stdout formatting and startup messages in non-production environments
if !is_production {
flexi_logger_builder = flexi_logger_builder
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.print_message(); // Startup information output to console
}
let flexi_logger_result = flexi_logger_builder.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
// Environment-aware success messages
if is_production {
eprintln!("Production logging initialized: file-only mode to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging disabled in production environment for security and log aggregation.");
} else {
eprintln!("Development/Test logging initialized with file logging to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging enabled for debugging. Environment: {environment}");
}
// Log rotation configuration details
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => {
eprintln!("Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files")
}
(Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"),
(None, Some(size)) => {
eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files")
}
_ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"),
}
} else {
eprintln!("CRITICAL: Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
eprintln!("Possible causes:");
eprintln!(" 1. Insufficient permissions to write to log directory: {log_directory}");
eprintln!(" 2. Log directory does not exist or is not accessible");
eprintln!(" 3. Invalid log configuration parameters");
eprintln!(" 4. Disk space issues");
eprintln!("Application will continue but logging to files will not work properly.");
}
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
_tracing_guard: None,
}
}
// Read the AsyncWith parameter from the environment variable
fn get_env_async_with() -> Option<WriteMode> {
let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::<u64>().ok());
let pool_capa = get_env_usize(ENV_OBS_LOG_POOL_CAPA, DEFAULT_OBS_LOG_POOL_CAPA);
let message_capa = get_env_usize(ENV_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_MESSAGE_CAPA);
let flush_ms = get_env_u64(ENV_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_FLUSH_MS);
match (pool_capa, message_capa, flush_ms) {
(Some(pool), Some(msg), Some(flush)) => Some(AsyncWith {
pool_capa: pool,
message_capa: msg,
flush_interval: Duration::from_millis(flush),
}),
_ => None,
}
Some(AsyncWith {
pool_capa,
message_capa,
flush_interval: Duration::from_millis(flush_ms),
})
}
fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter {
@@ -655,9 +244,344 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
)
}
/// stdout + span information (fix: retain WorkerGuard to avoid releasing after initialization)
fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
let env_filter = build_env_filter(logger_level, None);
let (nb, guard) = tracing_appender::non_blocking(std::io::stdout());
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(nb)
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL });
tracing_subscriber::registry()
.with(env_filter)
.with(ErrorLayer::default())
.with(fmt_layer)
.init();
IS_OBSERVABILITY_ENABLED.set(false).ok();
counter!("rustfs.start.total").increment(1);
info!("Init stdout logging (level: {})", logger_level);
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
flexi_logger_handles: None,
tracing_guard: Some(guard),
}
}
/// File rolling log (size switching + number retained)
fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
use flexi_logger::{
Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming,
WriteMode::{AsyncWith, BufferAndFlush},
};
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: create log dir '{}': {e}", log_directory);
}
#[cfg(unix)]
{
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
let _ = fs::set_permissions(log_directory, Permissions::from_mode(0o755));
}
// parsing level
let log_spec = LogSpecification::parse(logger_level)
.unwrap_or_else(|_| LogSpecification::parse(DEFAULT_LOG_LEVEL).unwrap_or(LogSpecification::error()));
// Switch by size (MB), Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
// write mode
let write_mode = get_env_async_with().unwrap_or(if is_production {
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
} else {
BufferAndFlush
});
// Build
let mut builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
if !is_production {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
}
flexi_logger::Logger::with(log_spec.clone())
})
.format_for_stderr(format_with_color)
.format_for_stdout(format_with_color)
.format_for_files(format_for_file)
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files))
.write_mode(write_mode)
.append()
.use_utc();
// Optional copy to stdout (for local observation)
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::All);
} else {
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::None);
}
let handle = match builder.start() {
Ok(h) => Some(h),
Err(e) => {
eprintln!("ERROR: start flexi_logger failed: {e}");
None
}
};
IS_OBSERVABILITY_ENABLED.set(false).ok();
counter!("rustfs.start.total").increment(1);
info!(
"Init file logging at '{}', roll size {:?}MB, keep {}",
log_directory, config.log_rotation_size_mb, keep_files
);
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
flexi_logger_handles: handle,
tracing_guard: None,
}
}
/// Observability (HTTP export, supports three sub-endpoints; if not, fallback to unified endpoint)
fn init_observability_http(config: &OtelConfig, logger_level: &str, is_production: bool) -> Result<OtelGuard, TelemetryError> {
// Resources and sampling
let res = resource(config);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME).to_owned();
let use_stdout = config.use_stdout.unwrap_or(!is_production);
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if (0.0..1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
// Endpoint
let root_ep = config.endpoint.as_str();
let trace_ep = config.trace_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep);
let metric_ep = config.metric_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep);
let log_ep = config.log_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep);
// TracerHTTP
let tracer_provider = {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(trace_ep)
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Zstd)
.build()
.map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?;
let mut builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone())
.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default());
}
let provider = builder.build();
global::set_tracer_provider(provider.clone());
provider
};
// MeterHTTP
let meter_provider = {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(metric_ep)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Zstd)
.build()
.map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?;
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
let provider = builder.build();
global::set_meter_provider(provider.clone());
provider
};
// metrics crate -> OTel
let _ = metrics_exporter_opentelemetry::Recorder::builder(service_name.clone()).install_global();
// LoggerHTTP
let logger_provider = {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_endpoint(log_ep)
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Zstd)
.build()
.map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?;
let mut builder = SdkLoggerProvider::builder().with_resource(res);
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
builder.build()
};
// Tracing layer
let fmt_layer_opt = {
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
let enable_color = std::io::stdout().is_terminal();
let mut layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json()
.with_current_span(true)
.with_span_list(true);
let span_event = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL };
layer = layer.with_span_events(span_event);
Some(layer.with_filter(build_env_filter(logger_level, None)))
} else {
None
}
};
let filter = build_env_filter(logger_level, None);
let otel_bridge = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(service_name.to_string());
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(fmt_layer_opt)
.with(OpenTelemetryLayer::new(tracer))
.with(otel_bridge)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
IS_OBSERVABILITY_ENABLED.set(true).ok();
counter!("rustfs.start.total").increment(1);
info!(
"Init observability (HTTP): trace='{}', metric='{}', log='{}'",
trace_ep, metric_ep, log_ep
);
Ok(OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
flexi_logger_handles: None,
tracing_guard: None,
})
}
/// Initialize Telemetry,Entrance: three rules
pub(crate) fn init_telemetry(config: &OtelConfig) -> Result<OtelGuard, TelemetryError> {
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
let is_production = environment.eq_ignore_ascii_case(DEFAULT_OBS_ENVIRONMENT_PRODUCTION);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
// Rule 3: Observability (any endpoint is enabled if it is not empty)
let has_obs = !config.endpoint.is_empty()
|| config.trace_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false)
|| config.metric_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false)
|| config.log_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false);
if has_obs {
return init_observability_http(config, logger_level, is_production);
}
// Rule 2: The user has explicitly customized the log directory (determined by whether ENV_OBS_LOG_DIRECTORY is set)
let user_set_log_dir = env::var(ENV_OBS_LOG_DIRECTORY).is_ok();
if user_set_log_dir {
return Ok(init_file_logging(config, logger_level, is_production));
}
// Rule 1: Default stdout (error level)
Ok(init_stdout_logging(config, DEFAULT_LOG_LEVEL, is_production))
}
#[cfg(test)]
mod tests {
use super::*;
use rustfs_config::USE_STDOUT;
#[test]
fn test_production_environment_detection() {

View File

@@ -39,6 +39,7 @@ serde = { workspace = true }
bytes.workspace = true
reqwest.workspace = true
tokio-util.workspace = true
faster-hex.workspace = true
futures.workspace = true
rustfs-utils = { workspace = true, features = ["io", "hash", "compress"] }
serde_json.workspace = true

View File

@@ -20,6 +20,7 @@ use aes_gcm::aead::Aead;
use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
use pin_project_lite::pin_project;
use rustfs_utils::{put_uvarint, put_uvarint_len};
use std::io::Error;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
@@ -98,13 +99,13 @@ where
} else {
// Encrypt the chunk
let cipher = Aes256Gcm::new_from_slice(this.key).expect("key");
let nonce = Nonce::from_slice(this.nonce);
let nonce = Nonce::try_from(this.nonce.as_slice()).map_err(|_| Error::other("invalid nonce length"))?;
let plaintext = &temp_buf.filled()[..n];
let plaintext_len = plaintext.len();
let crc = crc32fast::hash(plaintext);
let ciphertext = cipher
.encrypt(nonce, plaintext)
.map_err(|e| std::io::Error::other(format!("encrypt error: {e}")))?;
.encrypt(&nonce, plaintext)
.map_err(|e| Error::other(format!("encrypt error: {e}")))?;
let int_len = put_uvarint_len(plaintext_len as u64);
let clen = int_len + ciphertext.len() + 4;
// Header: 8 bytes
@@ -352,7 +353,7 @@ where
let Some(payload_len) = len.checked_sub(4) else {
tracing::error!("invalid encrypted block length: typ={} len={} header={:?}", typ, len, this.header_buf);
return Poll::Ready(Err(std::io::Error::other("Invalid encrypted block length")));
return Poll::Ready(Err(Error::other("Invalid encrypted block length")));
};
if this.ciphertext_buf.is_none() {
@@ -390,10 +391,10 @@ where
let ciphertext = &ciphertext_buf[uvarint_len as usize..];
let cipher = Aes256Gcm::new_from_slice(this.key).expect("key");
let nonce = Nonce::from_slice(this.current_nonce);
let nonce = Nonce::try_from(this.current_nonce.as_slice()).map_err(|_| Error::other("invalid nonce length"))?;
let plaintext = cipher
.decrypt(nonce, ciphertext)
.map_err(|e| std::io::Error::other(format!("decrypt error: {e}")))?;
.decrypt(&nonce, ciphertext)
.map_err(|e| Error::other(format!("decrypt error: {e}")))?;
debug!(
part = *this.current_part,
@@ -405,7 +406,7 @@ where
this.ciphertext_buf.take();
*this.ciphertext_read = 0;
*this.ciphertext_len = 0;
return Poll::Ready(Err(std::io::Error::other("Plaintext length mismatch")));
return Poll::Ready(Err(Error::other("Plaintext length mismatch")));
}
let actual_crc = crc32fast::hash(&plaintext);
@@ -413,7 +414,7 @@ where
this.ciphertext_buf.take();
*this.ciphertext_read = 0;
*this.ciphertext_len = 0;
return Poll::Ready(Err(std::io::Error::other("CRC32 mismatch")));
return Poll::Ready(Err(Error::other("CRC32 mismatch")));
}
*this.buffer = plaintext;

View File

@@ -120,7 +120,8 @@ mod tests {
let data = b"hello world";
let mut hasher = Md5::new();
hasher.update(data);
let expected = format!("{:x}", hasher.finalize());
let hex = faster_hex::hex_string(hasher.finalize().as_slice());
let expected = hex.to_string();
let reader = BufReader::new(&data[..]);
let reader = Box::new(WarpReader::new(reader));
let mut etag_reader = EtagReader::new(reader, None);
@@ -139,7 +140,8 @@ mod tests {
let data = b"";
let mut hasher = Md5::new();
hasher.update(data);
let expected = format!("{:x}", hasher.finalize());
let hex = faster_hex::hex_string(hasher.finalize().as_slice());
let expected = hex.to_string();
let reader = BufReader::new(&data[..]);
let reader = Box::new(WarpReader::new(reader));
let mut etag_reader = EtagReader::new(reader, None);
@@ -158,7 +160,8 @@ mod tests {
let data = b"abc123";
let mut hasher = Md5::new();
hasher.update(data);
let expected = format!("{:x}", hasher.finalize());
let hex = faster_hex::hex_string(hasher.finalize().as_slice());
let expected = hex.to_string();
let reader = BufReader::new(&data[..]);
let reader = Box::new(WarpReader::new(reader));
let mut etag_reader = EtagReader::new(reader, None);
@@ -195,15 +198,12 @@ mod tests {
rand::rng().fill(&mut data[..]);
let mut hasher = Md5::new();
hasher.update(&data);
let cloned_data = data.clone();
let expected = format!("{:x}", hasher.finalize());
let hex = faster_hex::hex_string(hasher.finalize().as_slice());
let expected = hex.to_string();
let reader = Cursor::new(data.clone());
let reader = Box::new(WarpReader::new(reader));
let mut etag_reader = EtagReader::new(reader, None);
let mut buf = Vec::new();
let n = etag_reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(n, size);

View File

@@ -660,7 +660,8 @@ mod tests {
let mut hasher = Md5::new();
hasher.update(&data);
let expected = format!("{:x}", hasher.finalize());
let hex = faster_hex::hex_string(hasher.finalize().as_slice());
let expected = hex.to_string();
println!("expected: {expected}");

View File

@@ -38,7 +38,7 @@ pub fn is_sha256_checksum(s: &str) -> bool {
/// `hmac_sha1(key, data)`
pub fn hmac_sha1(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> [u8; 20] {
use hmac::{Hmac, Mac};
use hmac::{Hmac, KeyInit, Mac};
use sha1::Sha1;
let mut m = <Hmac<Sha1>>::new_from_slice(key.as_ref()).unwrap();
@@ -48,7 +48,7 @@ pub fn hmac_sha1(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> [u8; 20] {
/// `hmac_sha256(key, data)`
pub fn hmac_sha256(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> [u8; 32] {
use hmac::{Hmac, Mac};
use hmac::{Hmac, KeyInit, Mac};
use sha2::Sha256;
let mut m = Hmac::<Sha256>::new_from_slice(key.as_ref()).unwrap();

390
crates/utils/src/envs.rs Normal file
View File

@@ -0,0 +1,390 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 8-bit type: signed i8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `i8`: The parsed value as i8 if successful, otherwise the default value.
///
pub fn get_env_i8(key: &str, default: i8) -> i8 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 8-bit type: signed i8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i8>`: The parsed value as i8 if successful, otherwise None
///
pub fn get_env_opt_i8(key: &str) -> Option<i8> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 8-bit type: unsigned u8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `u8`: The parsed value as u8 if successful, otherwise the default value.
///
pub fn get_env_u8(key: &str, default: u8) -> u8 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 8-bit type: unsigned u8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u8>`: The parsed value as u8 if successful, otherwise None
///
pub fn get_env_opt_u8(key: &str) -> Option<u8> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 16-bit type: signed i16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `i16`: The parsed value as i16 if successful, otherwise the default value.
///
pub fn get_env_i16(key: &str, default: i16) -> i16 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 16-bit type: signed i16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i16>`: The parsed value as i16 if successful, otherwise None
///
pub fn get_env_opt_i16(key: &str) -> Option<i16> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 16-bit type: unsigned u16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `u16`: The parsed value as u16 if successful, otherwise the default value.
///
pub fn get_env_u16(key: &str, default: u16) -> u16 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 16-bit type: unsigned u16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u16>`: The parsed value as u16 if successful, otherwise None
///
pub fn get_env_u16_opt(key: &str) -> Option<u16> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 16-bit type: unsigned u16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u16>`: The parsed value as u16 if successful, otherwise None
///
pub fn get_env_opt_u16(key: &str) -> Option<u16> {
get_env_u16_opt(key)
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 32-bit type: signed i32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `i32`: The parsed value as i32 if successful, otherwise the default value.
///
pub fn get_env_i32(key: &str, default: i32) -> i32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 32-bit type: signed i32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i32>`: The parsed value as i32 if successful, otherwise None
///
pub fn get_env_opt_i32(key: &str) -> Option<i32> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 32-bit type: unsigned u32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `u32`: The parsed value as u32 if successful, otherwise the default value.
///
pub fn get_env_u32(key: &str, default: u32) -> u32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 32-bit type: unsigned u32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u32>`: The parsed value as u32 if successful, otherwise None
///
pub fn get_env_opt_u32(key: &str) -> Option<u32> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `f32`: The parsed value as f32 if successful, otherwise the default value
///
pub fn get_env_f32(key: &str, default: f32) -> f32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<f32>`: The parsed value as f32 if successful, otherwise None
///
pub fn get_env_opt_f32(key: &str) -> Option<f32> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `i64`: The parsed value as i64 if successful, otherwise the default value
///
pub fn get_env_i64(key: &str, default: i64) -> i64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i64>`: The parsed value as i64 if successful, otherwise None
///
pub fn get_env_opt_i64(key: &str) -> Option<i64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, returning Option<Option<i64>> if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<Option<i64>>`: The parsed value as i64 if successful, otherwise None
///
pub fn get_env_opt_opt_i64(key: &str) -> Option<Option<i64>> {
env::var(key).ok().map(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `u64`: The parsed value as u64 if successful, otherwise the default value.
///
pub fn get_env_u64(key: &str, default: u64) -> u64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u64>`: The parsed value as u64 if successful, otherwise None
///
pub fn get_env_opt_u64(key: &str) -> Option<u64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `f64`: The parsed value as f64 if successful, otherwise the default value.
///
pub fn get_env_f64(key: &str, default: f64) -> f64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<f64>`: The parsed value as f64 if successful, otherwise None
///
pub fn get_env_opt_f64(key: &str) -> Option<f64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `usize`: The parsed value as usize if successful, otherwise the default value.
///
pub fn get_env_usize(key: &str, default: usize) -> usize {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<usize>`: The parsed value as usize if successful, otherwise None
///
pub fn get_env_usize_opt(key: &str) -> Option<usize> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<usize>`: The parsed value as usize if successful, otherwise None
///
pub fn get_env_opt_usize(key: &str) -> Option<usize> {
get_env_usize_opt(key)
}
/// Retrieve an environment variable as a String, with a default value if not set.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default string value to return if the environment variable is not set.
///
/// #Returns
/// - `String`: The environment variable value if set, otherwise the default value.
///
pub fn get_env_str(key: &str, default: &str) -> String {
env::var(key).unwrap_or_else(|_| default.to_string())
}
/// Retrieve an environment variable as a String, returning None if not set.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<String>`: The environment variable value if set, otherwise None.
///
pub fn get_env_opt_str(key: &str) -> Option<String> {
env::var(key).ok()
}
/// Retrieve an environment variable as a boolean, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default boolean value to return if the environment variable is not set or cannot be parsed.
///
/// #Returns
/// - `bool`: The parsed boolean value if successful, otherwise the default value.
///
pub fn get_env_bool(key: &str, default: bool) -> bool {
env::var(key)
.ok()
.and_then(|v| match v.to_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
.unwrap_or(default)
}
/// Retrieve an environment variable as a boolean, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<bool>`: The parsed boolean value if successful, otherwise None.
///
pub fn get_env_opt_bool(key: &str) -> Option<bool> {
env::var(key).ok().and_then(|v| match v.to_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
}

View File

@@ -81,8 +81,8 @@ pub mod sys;
#[cfg(feature = "sys")]
pub use sys::user_agent::*;
#[cfg(feature = "sys")]
pub use sys::envs::*;
#[cfg(feature = "notify")]
pub use notify::*;
mod envs;
pub use envs::*;

View File

@@ -14,6 +14,8 @@
use bytes::Bytes;
use futures::{Stream, StreamExt, pin_mut};
#[cfg(test)]
use std::sync::MutexGuard;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
@@ -71,18 +73,41 @@ fn clear_dns_cache() {
}
#[cfg(test)]
pub fn set_mock_dns_resolver<F>(resolver: F)
where
F: Fn(&str) -> std::io::Result<HashSet<IpAddr>> + Send + Sync + 'static,
{
*CUSTOM_DNS_RESOLVER.write().unwrap() = Some(Arc::new(resolver));
static DNS_RESOLVER_TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
#[cfg(test)]
fn reset_dns_resolver_inner() {
*CUSTOM_DNS_RESOLVER.write().unwrap() = None;
clear_dns_cache();
}
#[cfg(test)]
pub fn reset_dns_resolver() {
*CUSTOM_DNS_RESOLVER.write().unwrap() = None;
pub struct MockResolverGuard {
_lock: MutexGuard<'static, ()>,
}
#[cfg(test)]
impl Drop for MockResolverGuard {
fn drop(&mut self) {
reset_dns_resolver_inner();
}
}
#[cfg(test)]
pub fn set_mock_dns_resolver<F>(resolver: F) -> MockResolverGuard
where
F: Fn(&str) -> std::io::Result<HashSet<IpAddr>> + Send + Sync + 'static,
{
let lock = DNS_RESOLVER_TEST_LOCK.lock().unwrap();
*CUSTOM_DNS_RESOLVER.write().unwrap() = Some(Arc::new(resolver));
clear_dns_cache();
MockResolverGuard { _lock: lock }
}
#[cfg(test)]
pub fn reset_dns_resolver() {
let _lock = DNS_RESOLVER_TEST_LOCK.lock().unwrap();
reset_dns_resolver_inner();
}
/// helper for validating if the provided arg is an ip address.
@@ -403,7 +428,7 @@ mod test {
#[test]
fn test_is_local_host() {
set_mock_dns_resolver(mock_resolver);
let _resolver_guard = set_mock_dns_resolver(mock_resolver);
// Test localhost domain
let localhost_host = Host::Domain("localhost");
@@ -429,13 +454,11 @@ mod test {
// Test invalid domain should return error
let invalid_host = Host::Domain("invalid.nonexistent.domain.example");
assert!(is_local_host(invalid_host, 0, 0).is_err());
reset_dns_resolver();
}
#[tokio::test]
async fn test_get_host_ip() {
set_mock_dns_resolver(mock_resolver);
let _resolver_guard = set_mock_dns_resolver(mock_resolver);
// Test IPv4 address
let ipv4_host = Host::Ipv4(Ipv4Addr::new(192, 168, 1, 1));
@@ -462,8 +485,6 @@ mod test {
// Test invalid domain
let invalid_host = Host::Domain("invalid.nonexistent.domain.example");
assert!(get_host_ip(invalid_host).await.is_err());
reset_dns_resolver();
}
#[test]

View File

@@ -1,42 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
pub fn get_env_usize(key: &str, default: usize) -> usize {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub fn get_env_u64(key: &str, default: u64) -> u64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub fn get_env_u32(key: &str, default: u32) -> u32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub fn get_env_str(key: &str, default: &str) -> String {
env::var(key).unwrap_or_else(|_| default.to_string())
}
pub fn get_env_opt_u64(key: &str) -> Option<u64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
pub fn get_env_bool(key: &str, default: bool) -> bool {
env::var(key)
.ok()
.and_then(|v| match v.to_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
.unwrap_or(default)
}

View File

@@ -12,5 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod envs;
pub(crate) mod user_agent;

View File

@@ -16,7 +16,7 @@ This guide describes the configuration surfaces for the RustFS Key Management Se
| CLI flag | Env variable | Description |
|-----------------------------|--------------------------------|-------------|
| `--kms-enable` | `RUSTFS_KMS_ENABLE` | Enables KMS at startup. Defaults to `false`. |
| `--kms-backend <local|vault>` | `RUSTFS_KMS_BACKEND` | Selects the backend implementation. Defaults to `local`. |
| `--kms-backend <local\|vault>` | `RUSTFS_KMS_BACKEND` | Selects the backend implementation. Defaults to `local`. |
| `--kms-key-dir <path>` | `RUSTFS_KMS_KEY_DIR` | Required when `kms-backend=local`; directory that stores wrapped master keys. |
| `--kms-vault-address <url>` | `RUSTFS_KMS_VAULT_ADDRESS` | Vault base URL (e.g. `https://vault.example.com:8200`). |
| `--kms-vault-token <token>` | `RUSTFS_KMS_VAULT_TOKEN` | Token used for Vault authentication. Prefer AppRole or short-lived tokens. |

View File

@@ -112,7 +112,6 @@ mime_guess = { workspace = true }
pin-project-lite.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
scopeguard.workspace = true
shadow-rs = { workspace = true, features = ["build", "metadata"] }
sysinfo = { workspace = true, features = ["multithread"] }
thiserror = { workspace = true }

View File

@@ -134,7 +134,7 @@ impl Operation for NotificationTarget {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
@@ -300,7 +300,7 @@ impl Operation for ListNotificationTargets {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 2. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
@@ -351,7 +351,7 @@ impl Operation for ListTargetsArns {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 2. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
@@ -401,7 +401,7 @@ impl Operation for RemoveNotificationTarget {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};

View File

@@ -41,6 +41,143 @@ impl ApiError {
source: Some(error.into()),
}
}
pub fn error_code_to_message(code: &S3ErrorCode) -> String {
match code {
S3ErrorCode::InvalidRequest => "Invalid Request".to_string(),
S3ErrorCode::InvalidArgument => "Invalid argument".to_string(),
S3ErrorCode::InvalidStorageClass => "Invalid storage class.".to_string(),
S3ErrorCode::AccessDenied => "Access Denied.".to_string(),
S3ErrorCode::BadDigest => "The Content-Md5 you specified did not match what we received.".to_string(),
S3ErrorCode::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size.".to_string(),
S3ErrorCode::EntityTooLarge => "Your proposed upload exceeds the maximum allowed object size.".to_string(),
S3ErrorCode::InternalError => "We encountered an internal error, please try again.".to_string(),
S3ErrorCode::InvalidAccessKeyId => "The Access Key Id you provided does not exist in our records.".to_string(),
S3ErrorCode::InvalidBucketName => "The specified bucket is not valid.".to_string(),
S3ErrorCode::InvalidDigest => "The Content-Md5 you specified is not valid.".to_string(),
S3ErrorCode::InvalidRange => "The requested range is not satisfiable".to_string(),
S3ErrorCode::MalformedXML => "The XML you provided was not well-formed or did not validate against our published schema.".to_string(),
S3ErrorCode::MissingContentLength => "You must provide the Content-Length HTTP header.".to_string(),
S3ErrorCode::MissingSecurityHeader => "Your request was missing a required header".to_string(),
S3ErrorCode::MissingRequestBodyError => "Request body is empty.".to_string(),
S3ErrorCode::NoSuchBucket => "The specified bucket does not exist".to_string(),
S3ErrorCode::NoSuchBucketPolicy => "The bucket policy does not exist".to_string(),
S3ErrorCode::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist".to_string(),
S3ErrorCode::NoSuchKey => "The specified key does not exist.".to_string(),
S3ErrorCode::NoSuchUpload => "The specified multipart upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.".to_string(),
S3ErrorCode::NoSuchVersion => "The specified version does not exist.".to_string(),
S3ErrorCode::NotImplemented => "A header you provided implies functionality that is not implemented".to_string(),
S3ErrorCode::PreconditionFailed => "At least one of the pre-conditions you specified did not hold".to_string(),
S3ErrorCode::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided. Check your key and signing method.".to_string(),
S3ErrorCode::MethodNotAllowed => "The specified method is not allowed against this resource.".to_string(),
S3ErrorCode::InvalidPart => "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.".to_string(),
S3ErrorCode::InvalidPartOrder => "The list of parts was not in ascending order. The parts list must be specified in order by part number.".to_string(),
S3ErrorCode::InvalidObjectState => "The operation is not valid for the current state of the object.".to_string(),
S3ErrorCode::AuthorizationHeaderMalformed => "The authorization header is malformed; the region is wrong; expecting 'us-east-1'.".to_string(),
S3ErrorCode::MalformedPOSTRequest => "The body of your POST request is not well-formed multipart/form-data.".to_string(),
S3ErrorCode::BucketNotEmpty => "The bucket you tried to delete is not empty".to_string(),
S3ErrorCode::BucketAlreadyExists => "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Please select a different name and try again.".to_string(),
S3ErrorCode::BucketAlreadyOwnedByYou => "Your previous request to create the named bucket succeeded and you already own it.".to_string(),
S3ErrorCode::AllAccessDisabled => "All access to this resource has been disabled.".to_string(),
S3ErrorCode::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document.".to_string(),
S3ErrorCode::IncompleteBody => "You did not provide the number of bytes specified by the Content-Length HTTP header.".to_string(),
S3ErrorCode::RequestTimeTooSkewed => "The difference between the request time and the server's time is too large.".to_string(),
S3ErrorCode::InvalidRegion => "Region does not match.".to_string(),
S3ErrorCode::SlowDown => "Resource requested is unreadable, please reduce your request rate".to_string(),
S3ErrorCode::KeyTooLongError => "Your key is too long".to_string(),
S3ErrorCode::NoSuchTagSet => "The TagSet does not exist".to_string(),
S3ErrorCode::ObjectLockConfigurationNotFoundError => "Object Lock configuration does not exist for this bucket".to_string(),
S3ErrorCode::InvalidBucketState => "Object Lock configuration cannot be enabled on existing buckets".to_string(),
S3ErrorCode::NoSuchCORSConfiguration => "The CORS configuration does not exist".to_string(),
S3ErrorCode::NoSuchWebsiteConfiguration => "The specified bucket does not have a website configuration".to_string(),
S3ErrorCode::NoSuchObjectLockConfiguration => "The specified object does not have a ObjectLock configuration".to_string(),
S3ErrorCode::MetadataTooLarge => "Your metadata headers exceed the maximum allowed metadata size.".to_string(),
S3ErrorCode::ServiceUnavailable => "The service is unavailable. Please retry.".to_string(),
S3ErrorCode::Busy => "The service is unavailable. Please retry.".to_string(),
S3ErrorCode::EmptyRequestBody => "Request body cannot be empty.".to_string(),
S3ErrorCode::UnauthorizedAccess => "You are not authorized to perform this operation".to_string(),
S3ErrorCode::ExpressionTooLong => "The SQL expression is too long: The maximum byte-length for the SQL expression is 256 KB.".to_string(),
S3ErrorCode::IllegalSqlFunctionArgument => "Illegal argument was used in the SQL function.".to_string(),
S3ErrorCode::InvalidKeyPath => "Key path in the SQL expression is invalid.".to_string(),
S3ErrorCode::InvalidCompressionFormat => "The file is not in a supported compression format. Only GZIP is supported at this time.".to_string(),
S3ErrorCode::InvalidFileHeaderInfo => "The FileHeaderInfo is invalid. Only NONE, USE, and IGNORE are supported.".to_string(),
S3ErrorCode::InvalidJsonType => "The JsonType is invalid. Only DOCUMENT and LINES are supported at this time.".to_string(),
S3ErrorCode::InvalidQuoteFields => "The QuoteFields is invalid. Only ALWAYS and ASNEEDED are supported.".to_string(),
S3ErrorCode::InvalidRequestParameter => "The value of a parameter in SelectRequest element is invalid. Check the service API documentation and try again.".to_string(),
S3ErrorCode::InvalidDataSource => "Invalid data source type. Only CSV and JSON are supported at this time.".to_string(),
S3ErrorCode::InvalidExpressionType => "The ExpressionType is invalid. Only SQL expressions are supported at this time.".to_string(),
S3ErrorCode::InvalidDataType => "The SQL expression contains an invalid data type.".to_string(),
S3ErrorCode::InvalidTextEncoding => "Invalid encoding type. Only UTF-8 encoding is supported at this time.".to_string(),
S3ErrorCode::InvalidTableAlias => "The SQL expression contains an invalid table alias.".to_string(),
S3ErrorCode::MissingRequiredParameter => "The SelectRequest entity is missing a required parameter. Check the service documentation and try again.".to_string(),
S3ErrorCode::ObjectSerializationConflict => "The SelectRequest entity can only contain one of CSV or JSON. Check the service documentation and try again.".to_string(),
S3ErrorCode::UnsupportedSqlOperation => "Encountered an unsupported SQL operation.".to_string(),
S3ErrorCode::UnsupportedSqlStructure => "Encountered an unsupported SQL structure. Check the SQL Reference.".to_string(),
S3ErrorCode::UnsupportedSyntax => "Encountered invalid syntax.".to_string(),
S3ErrorCode::UnsupportedRangeHeader => "Range header is not supported for this operation.".to_string(),
S3ErrorCode::LexerInvalidChar => "The SQL expression contains an invalid character.".to_string(),
S3ErrorCode::LexerInvalidOperator => "The SQL expression contains an invalid literal.".to_string(),
S3ErrorCode::LexerInvalidLiteral => "The SQL expression contains an invalid operator.".to_string(),
S3ErrorCode::LexerInvalidIONLiteral => "The SQL expression contains an invalid operator.".to_string(),
S3ErrorCode::ParseExpectedDatePart => "Did not find the expected date part in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedKeyword => "Did not find the expected keyword in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedTokenType => "Did not find the expected token in the SQL expression.".to_string(),
S3ErrorCode::ParseExpected2TokenTypes => "Did not find the expected token in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedNumber => "Did not find the expected number in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedRightParenBuiltinFunctionCall => "Did not find the expected right parenthesis character in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedTypeName => "Did not find the expected type name in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedWhenClause => "Did not find the expected WHEN clause in the SQL expression. CASE is not supported.".to_string(),
S3ErrorCode::ParseUnsupportedToken => "The SQL expression contains an unsupported token.".to_string(),
S3ErrorCode::ParseUnsupportedLiteralsGroupBy => "The SQL expression contains an unsupported use of GROUP BY.".to_string(),
S3ErrorCode::ParseExpectedMember => "The SQL expression contains an unsupported use of MEMBER.".to_string(),
S3ErrorCode::ParseUnsupportedSelect => "The SQL expression contains an unsupported use of SELECT.".to_string(),
S3ErrorCode::ParseUnsupportedCase => "The SQL expression contains an unsupported use of CASE.".to_string(),
S3ErrorCode::ParseUnsupportedCaseClause => "The SQL expression contains an unsupported use of CASE.".to_string(),
S3ErrorCode::ParseUnsupportedAlias => "The SQL expression contains an unsupported use of ALIAS.".to_string(),
S3ErrorCode::ParseUnsupportedSyntax => "The SQL expression contains unsupported syntax.".to_string(),
S3ErrorCode::ParseUnknownOperator => "The SQL expression contains an invalid operator.".to_string(),
S3ErrorCode::ParseMissingIdentAfterAt => "Did not find the expected identifier after the @ symbol in the SQL expression.".to_string(),
S3ErrorCode::ParseUnexpectedOperator => "The SQL expression contains an unexpected operator.".to_string(),
S3ErrorCode::ParseUnexpectedTerm => "The SQL expression contains an unexpected term.".to_string(),
S3ErrorCode::ParseUnexpectedToken => "The SQL expression contains an unexpected token.".to_string(),
S3ErrorCode::ParseExpectedExpression => "Did not find the expected SQL expression.".to_string(),
S3ErrorCode::ParseExpectedLeftParenAfterCast => "Did not find expected the left parenthesis in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedLeftParenValueConstructor => "Did not find expected the left parenthesis in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedLeftParenBuiltinFunctionCall => "Did not find the expected left parenthesis in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedArgumentDelimiter => "Did not find the expected argument delimiter in the SQL expression.".to_string(),
S3ErrorCode::ParseCastArity => "The SQL expression CAST has incorrect arity.".to_string(),
S3ErrorCode::ParseInvalidTypeParam => "The SQL expression contains an invalid parameter value.".to_string(),
S3ErrorCode::ParseEmptySelect => "The SQL expression contains an empty SELECT.".to_string(),
S3ErrorCode::ParseSelectMissingFrom => "GROUP is not supported in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedIdentForGroupName => "GROUP is not supported in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedIdentForAlias => "Did not find the expected identifier for the alias in the SQL expression.".to_string(),
S3ErrorCode::ParseUnsupportedCallWithStar => "Only COUNT with (*) as a parameter is supported in the SQL expression.".to_string(),
S3ErrorCode::ParseNonUnaryAgregateFunctionCall => "Only one argument is supported for aggregate functions in the SQL expression.".to_string(),
S3ErrorCode::ParseMalformedJoin => "JOIN is not supported in the SQL expression.".to_string(),
S3ErrorCode::ParseExpectedIdentForAt => "Did not find the expected identifier for AT name in the SQL expression.".to_string(),
S3ErrorCode::ParseAsteriskIsNotAloneInSelectList => "Other expressions are not allowed in the SELECT list when '*' is used without dot notation in the SQL expression.".to_string(),
S3ErrorCode::ParseCannotMixSqbAndWildcardInSelectList => "Cannot mix [] and * in the same expression in a SELECT list in SQL expression.".to_string(),
S3ErrorCode::ParseInvalidContextForWildcardInSelectList => "Invalid use of * in SELECT list in the SQL expression.".to_string(),
S3ErrorCode::IncorrectSqlFunctionArgumentType => "Incorrect type of arguments in function call in the SQL expression.".to_string(),
S3ErrorCode::ValueParseFailure => "Time stamp parse failure in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorInvalidArguments => "Incorrect number of arguments in the function call in the SQL expression.".to_string(),
S3ErrorCode::IntegerOverflow => "Int overflow or underflow in the SQL expression.".to_string(),
S3ErrorCode::LikeInvalidInputs => "Invalid argument given to the LIKE clause in the SQL expression.".to_string(),
S3ErrorCode::CastFailed => "Attempt to convert from one data type to another using CAST failed in the SQL expression.".to_string(),
S3ErrorCode::InvalidCast => "Attempt to convert from one data type to another using CAST failed in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorInvalidTimestampFormatPattern => "Time stamp format pattern requires additional fields in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorInvalidTimestampFormatPatternSymbolForParsing => "Time stamp format pattern contains a valid format symbol that cannot be applied to time stamp parsing in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorTimestampFormatPatternDuplicateFields => "Time stamp format pattern contains multiple format specifiers representing the time stamp field in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorTimestampFormatPatternHourClockAmPmMismatch => "Time stamp format pattern contains unterminated token in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorUnterminatedTimestampFormatPatternToken => "Time stamp format pattern contains an invalid token in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorInvalidTimestampFormatPatternToken => "Time stamp format pattern contains an invalid token in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorInvalidTimestampFormatPatternSymbol => "Time stamp format pattern contains an invalid symbol in the SQL expression.".to_string(),
S3ErrorCode::EvaluatorBindingDoesNotExist => "A column name or a path provided does not exist in the SQL expression".to_string(),
S3ErrorCode::InvalidColumnIndex => "The column index is invalid. Please check the service documentation and try again.".to_string(),
S3ErrorCode::UnsupportedFunction => "Encountered an unsupported SQL function.".to_string(),
_ => code.as_str().to_string(),
}
}
}
impl From<ApiError> for S3Error {
@@ -87,9 +224,14 @@ impl From<StorageError> for ApiError {
_ => S3ErrorCode::InternalError,
};
let message = if code == S3ErrorCode::InternalError {
err.to_string()
} else {
ApiError::error_code_to_message(&code)
};
ApiError {
code,
message: err.to_string(),
message,
source: Some(Box::new(err)),
}
}
@@ -186,7 +328,6 @@ mod tests {
let api_error: ApiError = storage_error.into();
assert_eq!(api_error.code, S3ErrorCode::NoSuchBucket);
assert!(api_error.message.contains("test-bucket"));
assert!(api_error.source.is_some());
// Test that source can be downcast back to StorageError

View File

@@ -58,7 +58,7 @@ use rustfs_ecstore::{
update_erasure_type,
};
use rustfs_iam::init_iam_sys;
use rustfs_notify::global::notifier_instance;
use rustfs_notify::notifier_global;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_targets::arn::TargetID;
use rustfs_utils::net::parse_and_resolve_address;
@@ -89,7 +89,6 @@ const LOGO: &str = r#"
#[instrument]
fn print_server_info() {
let current_year = chrono::Utc::now().year();
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
@@ -112,10 +111,13 @@ async fn async_main() -> Result<()> {
init_license(opt.license.clone());
// Initialize Observability
let guard = init_obs(Some(opt.clone().obs_endpoint)).await;
// print startup logo
info!("{}", LOGO);
let guard = match init_obs(Some(opt.clone().obs_endpoint)).await {
Ok(g) => g,
Err(e) => {
println!("Failed to initialize observability: {}", e);
return Err(Error::other(e));
}
};
// Store in global storage
match set_global_guard(guard).map_err(Error::other) {
@@ -126,6 +128,9 @@ async fn async_main() -> Result<()> {
}
}
// print startup logo
info!("{}", LOGO);
// Initialize performance profiling if enabled
#[cfg(not(target_os = "windows"))]
profiling::init_from_env().await;
@@ -512,8 +517,7 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str);
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str);
if let Err(e) = notifier_instance()
.add_event_specific_rules(bucket, region, &event_rules)
if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules)
.await
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))
{

View File

@@ -145,15 +145,15 @@ pub(crate) fn get_tokio_runtime_builder() -> tokio::runtime::Builder {
)
});
}
println!(
"Starting Tokio runtime with configured parameters:\n\
if !rustfs_obs::is_production_environment() {
println!(
"Starting Tokio runtime with configured parameters:\n\
worker_threads: {worker_threads}, max_blocking_threads: {max_blocking_threads}, \
thread_stack_size: {thread_stack_size}, thread_keep_alive: {thread_keep_alive}, \
global_queue_interval: {global_queue_interval}, event_interval: {event_interval}, \
max_io_events_per_tick: {max_io_events_per_tick}, thread_name: {thread_name}"
);
);
}
builder
}

View File

@@ -14,6 +14,8 @@
use crate::auth::get_condition_values;
use crate::error::ApiError;
use crate::storage::entity;
use crate::storage::helper::OperationHelper;
use crate::storage::options::{filter_object_metadata, get_content_sha256};
use crate::storage::{
access::{ReqInfo, authorize_request},
@@ -84,7 +86,7 @@ use rustfs_kms::{
service_manager::get_global_encryption_service,
types::{EncryptionMetadata, ObjectEncryptionContext},
};
use rustfs_notify::global::notifier_instance;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_policy::{
auth,
policy::{
@@ -102,11 +104,10 @@ use rustfs_targets::{
EventName,
arn::{TargetID, TargetIDError},
};
use rustfs_utils::http::{AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE};
use rustfs_utils::{
CompressionAlgorithm,
CompressionAlgorithm, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent,
http::{
AMZ_BUCKET_REPLICATION_STATUS,
AMZ_BUCKET_REPLICATION_STATUS, AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE,
headers::{
AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING, AMZ_RESTORE_EXPIRY_DAYS, AMZ_RESTORE_REQUEST_DATE,
RESERVED_METADATA_PREFIX_LOWER,
@@ -353,6 +354,7 @@ impl FS {
}
async fn put_object_extract(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject").suppress_event();
let input = req.input;
let PutObjectInput {
@@ -495,20 +497,20 @@ impl FS {
..Default::default()
};
let event_args = rustfs_notify::event::EventArgs {
let event_args = rustfs_notify::EventArgs {
event_name: EventName::ObjectCreatedPut,
bucket_name: bucket.clone(),
object: _obj_info.clone(),
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
req_params: extract_req_params_header(&req.headers),
resp_elements: extract_resp_elements(&S3Response::new(output.clone())),
version_id: version_id.clone(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
host: get_request_host(&req.headers),
user_agent: get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
notifier_global::notify(event_args).await;
});
}
}
@@ -575,7 +577,9 @@ impl FS {
checksum_crc64nvme,
..Default::default()
};
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
}
@@ -602,6 +606,7 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let helper = OperationHelper::new(&req, EventName::BucketCreated, "s3:CreateBucket");
let CreateBucketInput {
bucket,
object_lock_enabled_for_bucket,
@@ -628,28 +633,15 @@ impl S3 for FS {
let output = CreateBucketOutput::default();
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::BucketCreated,
bucket_name: bucket.clone(),
object: ObjectInfo { ..Default::default() },
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: String::new(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
/// Copy an object from one location to another
#[instrument(level = "debug", skip(self, req))]
async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedCopy, "s3:CopyObject");
let CopyObjectInput {
copy_source,
bucket,
@@ -830,28 +822,12 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(object_info).version_id(version_id);
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedCopy,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn restore_object(&self, req: S3Request<RestoreObjectInput>) -> S3Result<S3Response<RestoreObjectOutput>> {
@@ -902,7 +878,7 @@ impl S3 for FS {
}
//let mut api_err;
let mut _status_code = http::StatusCode::OK;
let mut _status_code = StatusCode::OK;
let mut already_restored = false;
if let Err(_err) = rreq.validate(store.clone()) {
//api_err = to_api_err(ErrMalformedXML);
@@ -919,7 +895,7 @@ impl S3 for FS {
));
}
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
_status_code = http::StatusCode::ACCEPTED;
_status_code = StatusCode::ACCEPTED;
already_restored = true;
}
}
@@ -1086,12 +1062,13 @@ impl S3 for FS {
restore_output_path: None,
};
return Ok(S3Response::with_headers(output, header));
Ok(S3Response::with_headers(output, header))
}
/// Delete a bucket
#[instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let helper = OperationHelper::new(&req, EventName::BucketRemoved, "s3:DeleteBucket");
let input = req.input;
// TODO: DeleteBucketInput doesn't have force parameter?
let Some(store) = new_object_layer_fn() else {
@@ -1109,28 +1086,15 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::BucketRemoved,
bucket_name: input.bucket,
object: ObjectInfo { ..Default::default() },
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})),
version_id: String::new(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteBucketOutput {}))
let result = Ok(S3Response::new(DeleteBucketOutput {}));
let _ = helper.complete(&result);
result
}
/// Delete an object
#[instrument(level = "debug", skip(self, req))]
async fn delete_object(&self, mut req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObject");
let DeleteObjectInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -1233,32 +1197,20 @@ impl S3 for FS {
EventName::ObjectRemovedDelete
};
let event_args = rustfs_notify::event::EventArgs {
event_name,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: key.clone(),
bucket: bucket.clone(),
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})),
version_id: version_id.map(|v| v.to_string()).unwrap_or_default(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
helper = helper.event_name(event_name);
helper = helper
.object(obj_info)
.version_id(version_id.map(|v| v.to_string()).unwrap_or_default());
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
/// Delete multiple objects
#[instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObjects").suppress_event();
let DeleteObjectsInput { bucket, delete, .. } = req.input;
if delete.objects.is_empty() || delete.objects.len() > 1000 {
@@ -1393,10 +1345,12 @@ impl S3 for FS {
.map(|v| v.as_ref().map(|v| v.clone().into()))
.collect::<Vec<Option<DiskError>>>() as &[Option<DiskError>],
) {
return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string()));
let result = Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string()));
let _ = helper.complete(&result);
return result;
}
for (i, err) in errs.into_iter().enumerate() {
for (i, err) in errs.iter().enumerate() {
let obj = dobjs[i].clone();
// let replication_state = obj.replication_state.clone().unwrap_or_default();
@@ -1426,7 +1380,7 @@ impl S3 for FS {
continue;
}
if let Some(err) = err {
if let Some(err) = err.clone() {
delete_results[*didx].error = Some(Error {
code: Some(err.to_string()),
key: Some(object_to_delete[i].object_name.clone()),
@@ -1481,39 +1435,39 @@ impl S3 for FS {
}
}
// Asynchronous call will not block the response of the current request
let req_headers = req.headers.clone();
tokio::spawn(async move {
for dobj in dobjs {
let version_id = match dobj.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let mut event_name = EventName::ObjectRemovedDelete;
if dobj.delete_marker {
event_name = EventName::ObjectRemovedDeleteMarkerCreated;
}
for res in delete_results {
if let Some(dobj) = res.delete_object {
let event_name = if dobj.delete_marker {
EventName::ObjectRemovedDeleteMarkerCreated
} else {
EventName::ObjectRemovedDelete
};
let event_args = EventArgsBuilder::new(
event_name,
bucket.clone(),
ObjectInfo {
name: dobj.object_name.clone(),
bucket: bucket.clone(),
..Default::default()
},
)
.version_id(dobj.version_id.map(|v| v.to_string()).unwrap_or_default())
.req_params(extract_req_params_header(&req_headers))
.resp_elements(extract_resp_elements(&S3Response::new(DeleteObjectsOutput::default())))
.host(get_request_host(&req_headers))
.user_agent(get_request_user_agent(&req_headers))
.build();
let event_args = rustfs_notify::event::EventArgs {
event_name,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: dobj.object_name,
bucket: bucket.clone(),
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectsOutput {
..Default::default()
})),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
notifier_instance().notify(event_args).await;
notifier_global::notify(event_args).await;
}
}
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
/// Get bucket location
@@ -1548,6 +1502,7 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject");
// mc get 3
let GetObjectInput {
@@ -1879,27 +1834,12 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGet,
bucket_name: bucket.clone(),
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(GetObjectOutput { ..Default::default() })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(event_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
@@ -1921,6 +1861,7 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedHead, "s3:HeadObject");
// mc get 2
let HeadObjectInput {
bucket,
@@ -2055,27 +1996,13 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGet,
bucket_name: bucket,
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(event_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
Ok(S3Response::new(output))
result
}
#[instrument(level = "debug", skip(self))]
@@ -2165,6 +2092,9 @@ impl S3 for FS {
let prefix = prefix.unwrap_or_default();
let max_keys = max_keys.unwrap_or(1000);
if max_keys < 0 {
return Err(S3Error::with_message(S3ErrorCode::InvalidArgument, "Invalid max keys".to_string()));
}
let delimiter = delimiter.filter(|v| !v.is_empty());
let start_after = start_after.filter(|v| !v.is_empty());
@@ -2341,6 +2271,7 @@ impl S3 for FS {
// #[instrument(level = "debug", skip(self, req))]
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject");
if req
.headers
.get("X-Amz-Meta-Snowball-Auto-Extract")
@@ -2357,7 +2288,6 @@ impl S3 for FS {
return Err(s3_error!(InvalidStorageClass));
}
}
let event_version_id = input.version_id.as_ref().map(|v| v.to_string()).unwrap_or_default();
let PutObjectInput {
body,
bucket,
@@ -2392,6 +2322,10 @@ impl S3 for FS {
}
};
if size == -1 {
return Err(s3_error!(UnexpectedContent));
}
let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string()))));
// let body = Box::new(StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string())))));
@@ -2605,7 +2539,6 @@ impl S3 for FS {
.put_object(&bucket, &key, &mut reader, &opts)
.await
.map_err(ApiError::from)?;
let event_info = obj_info.clone();
let e_tag = obj_info.etag.clone().map(|etag| to_s3s_etag(&etag));
let repoptions =
@@ -2664,23 +2597,9 @@ impl S3 for FS {
..Default::default()
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPut,
bucket_name: bucket.clone(),
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: event_version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
@@ -2688,6 +2607,7 @@ impl S3 for FS {
&self,
req: S3Request<CreateMultipartUploadInput>,
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:CreateMultipartUpload");
let CreateMultipartUploadInput {
bucket,
key,
@@ -2819,8 +2739,6 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let object_name = key.clone();
let bucket_name = bucket.clone();
let output = CreateMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),
@@ -2833,31 +2751,9 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedCompleteMultipartUpload,
bucket_name: bucket_name.clone(),
object: ObjectInfo {
name: object_name,
bucket: bucket_name,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
@@ -3423,6 +3319,7 @@ impl S3 for FS {
&self,
req: S3Request<CompleteMultipartUploadInput>,
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedCompleteMultipartUpload, "s3:CompleteMultipartUpload");
let input = req.input;
let CompleteMultipartUploadInput {
multipart_upload,
@@ -3529,6 +3426,26 @@ impl S3 for FS {
}
let output = CompleteMultipartUploadOutput {
bucket: Some(bucket.clone()),
key: Some(key.clone()),
e_tag: obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)),
location: Some("us-east-1".to_string()),
server_side_encryption: server_side_encryption.clone(), // TDD: Return encryption info
ssekms_key_id: ssekms_key_id.clone(), // TDD: Return KMS key ID if present
checksum_crc32: checksum_crc32.clone(),
checksum_crc32c: checksum_crc32c.clone(),
checksum_sha1: checksum_sha1.clone(),
checksum_sha256: checksum_sha256.clone(),
checksum_crc64nvme: checksum_crc64nvme.clone(),
checksum_type: checksum_type.clone(),
..Default::default()
};
info!(
"TDD: Created output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let helper_output = entity::CompleteMultipartUploadOutput {
bucket: Some(bucket.clone()),
key: Some(key.clone()),
e_tag: obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)),
@@ -3543,10 +3460,6 @@ impl S3 for FS {
checksum_type,
..Default::default()
};
info!(
"TDD: Created output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let mt2 = HashMap::new();
let repoptions =
@@ -3562,6 +3475,8 @@ impl S3 for FS {
"TDD: About to return S3Response with output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let helper_result = Ok(S3Response::new(helper_output));
let _ = helper.complete(&helper_result);
Ok(S3Response::new(output))
}
@@ -3648,6 +3563,7 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn put_object_tagging(&self, req: S3Request<PutObjectTaggingInput>) -> S3Result<S3Response<PutObjectTaggingOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutTagging, "s3:PutObjectTagging");
let PutObjectTaggingInput {
bucket,
key: object,
@@ -3659,11 +3575,6 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// Validate tag_set length doesn't exceed 10
if tagging.tag_set.len() > 10 {
return Err(s3_error!(InvalidArgument, "Object tags cannot be greater than 10"));
}
let mut tag_keys = std::collections::HashSet::with_capacity(tagging.tag_set.len());
for tag in &tagging.tag_set {
let key = tag
@@ -3704,31 +3615,12 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutTagging,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: object.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(PutObjectTaggingOutput { version_id: None })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
let result = Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self))]
@@ -3758,6 +3650,7 @@ impl S3 for FS {
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedDeleteTagging, "s3:DeleteObjectTagging");
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input.clone();
let Some(store) = new_object_layer_fn() else {
@@ -3771,31 +3664,12 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedDeleteTagging,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: object.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectTaggingOutput { version_id: None })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
let result = Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self))]
@@ -4389,7 +4263,7 @@ impl S3 for FS {
let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default());
// Purge old rules and resolve new rules in parallel
let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket);
let clear_rules = notifier_global::clear_bucket_notification_rules(&bucket);
let parse_rules = async {
let mut event_rules = Vec::new();
@@ -4417,8 +4291,7 @@ impl S3 for FS {
clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?;
// Add a new notification rule
notifier_instance()
.add_event_specific_rules(&bucket, &region, &event_rules)
notifier_global::add_event_specific_rules(&bucket, &region, &event_rules)
.await
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?;
@@ -4530,6 +4403,7 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectAttributesInput>,
) -> S3Result<S3Response<GetObjectAttributesOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedAttributes, "s3:GetObjectAttributes");
let GetObjectAttributesInput { bucket, key, .. } = req.input.clone();
let Some(store) = new_object_layer_fn() else {
@@ -4548,31 +4422,19 @@ impl S3 for FS {
object_parts: None,
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedAttributes,
bucket_name: bucket.clone(),
object: ObjectInfo {
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper
.object(ObjectInfo {
name: key.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
})
.version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn put_object_acl(&self, req: S3Request<PutObjectAclInput>) -> S3Result<S3Response<PutObjectAclOutput>> {
@@ -4688,6 +4550,7 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectLegalHoldInput>,
) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetLegalHold, "s3:GetObjectLegalHold");
let GetObjectLegalHoldInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -4730,33 +4593,19 @@ impl S3 for FS {
}),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGetLegalHold,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.object(object_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn put_object_legal_hold(
&self,
req: S3Request<PutObjectLegalHoldInput>,
) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, "s3:PutObjectLegalHold");
let PutObjectLegalHoldInput {
bucket,
key,
@@ -4809,33 +4658,19 @@ impl S3 for FS {
let output = PutObjectLegalHoldOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutLegalHold,
bucket_name: bucket.clone(),
object: info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn get_object_retention(
&self,
req: S3Request<GetObjectRetentionInput>,
) -> S3Result<S3Response<GetObjectRetentionOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetRetention, "s3:GetObjectRetention");
let GetObjectRetentionInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -4870,33 +4705,19 @@ impl S3 for FS {
let output = GetObjectRetentionOutput {
retention: Some(ObjectLockRetention { mode, retain_until_date }),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGetRetention,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(object_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn put_object_retention(
&self,
req: S3Request<PutObjectRetentionInput>,
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, "s3:PutObjectRetention");
let PutObjectRetentionInput {
bucket,
key,
@@ -4945,27 +4766,12 @@ impl S3 for FS {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutRetention,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.object(object_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
}

View File

@@ -0,0 +1,63 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use s3s::dto::{
BucketKeyEnabled, BucketName, ChecksumCRC32, ChecksumCRC32C, ChecksumCRC64NVME, ChecksumSHA1, ChecksumSHA256, ChecksumType,
ETag, Expiration, Location, ObjectKey, ObjectVersionId, RequestCharged, SSEKMSKeyId, ServerSideEncryption,
};
#[derive(Debug, Clone, Default)]
pub struct CompleteMultipartUploadOutput {
pub bucket: Option<BucketName>,
pub bucket_key_enabled: Option<BucketKeyEnabled>,
pub checksum_crc32: Option<ChecksumCRC32>,
pub checksum_crc32c: Option<ChecksumCRC32C>,
pub checksum_crc64nvme: Option<ChecksumCRC64NVME>,
pub checksum_sha1: Option<ChecksumSHA1>,
pub checksum_sha256: Option<ChecksumSHA256>,
pub checksum_type: Option<ChecksumType>,
pub e_tag: Option<ETag>,
pub expiration: Option<Expiration>,
pub key: Option<ObjectKey>,
pub location: Option<Location>,
pub request_charged: Option<RequestCharged>,
pub ssekms_key_id: Option<SSEKMSKeyId>,
pub server_side_encryption: Option<ServerSideEncryption>,
pub version_id: Option<ObjectVersionId>,
}
impl From<s3s::dto::CompleteMultipartUploadOutput> for CompleteMultipartUploadOutput {
fn from(output: s3s::dto::CompleteMultipartUploadOutput) -> Self {
Self {
bucket: output.bucket,
bucket_key_enabled: output.bucket_key_enabled,
checksum_crc32: output.checksum_crc32,
checksum_crc32c: output.checksum_crc32c,
checksum_crc64nvme: output.checksum_crc64nvme,
checksum_sha1: output.checksum_sha1,
checksum_sha256: output.checksum_sha256,
checksum_type: output.checksum_type,
e_tag: output.e_tag,
expiration: output.expiration,
key: output.key,
location: output.location,
request_charged: output.request_charged,
ssekms_key_id: output.ssekms_key_id,
server_side_encryption: output.server_side_encryption,
version_id: output.version_id,
}
}
}

View File

@@ -0,0 +1,209 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use http::StatusCode;
use rustfs_audit::{
entity::{ApiDetails, ApiDetailsBuilder, AuditEntryBuilder},
global::AuditLogger,
};
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_targets::EventName;
use rustfs_utils::{
extract_req_params, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent,
};
use s3s::{S3Request, S3Response, S3Result};
use std::future::Future;
use tokio::runtime::{Builder, Handle};
/// Schedules an asynchronous task on the current runtime;
/// if there is no runtime, creates a minimal runtime execution on a new thread.
fn spawn_background<F>(fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
if let Ok(handle) = Handle::try_current() {
drop(handle.spawn(fut));
} else {
std::thread::spawn(|| {
if let Ok(rt) = Builder::new_current_thread().enable_all().build() {
rt.block_on(fut);
}
});
}
}
/// A unified helper structure for building and distributing audit logs and event notifications via RAII mode at the end of an S3 operation scope.
pub struct OperationHelper {
audit_builder: Option<AuditEntryBuilder>,
api_builder: ApiDetailsBuilder,
event_builder: Option<EventArgsBuilder>,
start_time: std::time::Instant,
}
impl OperationHelper {
/// Create a new OperationHelper for S3 requests.
pub fn new(req: &S3Request<impl Send + Sync>, event: EventName, trigger: &'static str) -> Self {
// Parse path -> bucket/object
let path = req.uri.path().trim_start_matches('/');
let mut segs = path.splitn(2, '/');
let bucket = segs.next().unwrap_or("").to_string();
let object_key = segs.next().unwrap_or("").to_string();
// Infer remote address
let remote_host = req
.headers
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.or_else(|| req.headers.get("x-real-ip").and_then(|v| v.to_str().ok()))
.unwrap_or("")
.to_string();
// Initialize audit builder
let mut api_builder = ApiDetailsBuilder::new().name(trigger);
if !bucket.is_empty() {
api_builder = api_builder.bucket(&bucket);
}
if !object_key.is_empty() {
api_builder = api_builder.object(&object_key);
}
// Audit builder
let mut audit_builder = AuditEntryBuilder::new("1.0", event, trigger, ApiDetails::default())
.remote_host(remote_host)
.user_agent(get_request_user_agent(&req.headers))
.req_host(get_request_host(&req.headers))
.req_path(req.uri.path().to_string())
.req_query(extract_req_params(req));
if let Some(req_id) = req.headers.get("x-amz-request-id") {
if let Ok(id_str) = req_id.to_str() {
audit_builder = audit_builder.request_id(id_str);
}
}
// initialize event builder
// object is a placeholder that must be set later using the `object()` method.
let event_builder = EventArgsBuilder::new(event, bucket, ObjectInfo::default())
.host(get_request_host(&req.headers))
.user_agent(get_request_user_agent(&req.headers))
.req_params(extract_req_params_header(&req.headers));
Self {
audit_builder: Some(audit_builder),
api_builder,
event_builder: Some(event_builder),
start_time: std::time::Instant::now(),
}
}
/// Sets the ObjectInfo for event notification.
pub fn object(mut self, object_info: ObjectInfo) -> Self {
if let Some(builder) = self.event_builder.take() {
self.event_builder = Some(builder.object(object_info));
}
self
}
/// Set the version ID for event notifications.
pub fn version_id(mut self, version_id: impl Into<String>) -> Self {
if let Some(builder) = self.event_builder.take() {
self.event_builder = Some(builder.version_id(version_id));
}
self
}
/// Set the event name for event notifications.
pub fn event_name(mut self, event_name: EventName) -> Self {
if let Some(builder) = self.event_builder.take() {
self.event_builder = Some(builder.event_name(event_name));
}
if let Some(builder) = self.audit_builder.take() {
self.audit_builder = Some(builder.event(event_name));
}
self
}
/// Complete operational details from S3 results.
/// This method should be called immediately before the function returns.
/// It consumes and prepares auxiliary structures for use during `drop`.
pub fn complete(mut self, result: &S3Result<S3Response<impl Send + Sync>>) -> Self {
// Complete audit log
if let Some(builder) = self.audit_builder.take() {
let (status, status_code, error_msg) = match result {
Ok(res) => ("success".to_string(), res.status.unwrap_or(StatusCode::OK).as_u16() as i32, None),
Err(e) => (
"failure".to_string(),
e.status_code().unwrap_or(StatusCode::BAD_REQUEST).as_u16() as i32,
e.message().map(|s| s.to_string()),
),
};
let ttr = self.start_time.elapsed();
let api_details = self
.api_builder
.clone()
.status(status)
.status_code(status_code)
.time_to_response(format!("{:.2?}", ttr))
.time_to_response_in_ns(ttr.as_nanos().to_string())
.build();
let mut final_builder = builder.api(api_details.clone());
if let Some(err) = error_msg {
final_builder = final_builder.error(err);
}
self.audit_builder = Some(final_builder);
self.api_builder = ApiDetailsBuilder(api_details); // Store final details for Drop use
}
// Completion event notification (only on success)
if let (Some(builder), Ok(res)) = (self.event_builder.take(), result) {
self.event_builder = Some(builder.resp_elements(extract_resp_elements(res)));
}
self
}
/// Suppresses the automatic event notification on drop.
pub fn suppress_event(mut self) -> Self {
self.event_builder = None;
self
}
}
impl Drop for OperationHelper {
fn drop(&mut self) {
// Distribute audit logs
if let Some(builder) = self.audit_builder.take() {
spawn_background(async move {
AuditLogger::log(builder.build()).await;
});
}
// Distribute event notification (only on success)
if self.api_builder.0.status.as_deref() == Some("success") {
if let Some(builder) = self.event_builder.take() {
let event_args = builder.build();
// Avoid generating notifications for copy requests
if !event_args.is_replication_request() {
spawn_background(async move {
notifier_global::notify(event_args).await;
});
}
}
}
}
}

View File

@@ -14,6 +14,7 @@
pub mod access;
pub mod ecfs;
// pub mod error;
pub(crate) mod entity;
pub(crate) mod helper;
pub mod options;
pub mod tonic_service;

View File

@@ -51,7 +51,14 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
# export RUSTFS_TLS_PATH="./deploy/certs"
# Observability related configuration
#export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector address
#export RUSTFS_OBS_ENDPOINT=http://localhost:4318 # OpenTelemetry Collector address
# RustFS OR OTEL exporter configuration
#export RUSTFS_OBS_TRACE_ENDPOINT=http://localhost:4318 # OpenTelemetry Collector trace address http://localhost:4318/v1/traces
#export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
#export RUSTFS_OBS_METRIC_ENDPOINT=http://localhost:9090/api/v1/otlp # OpenTelemetry Collector metric address
#export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://localhost:9090/api/v1/otlp/v1/metrics
#export RUSTFS_OBS_LOG_ENDPOINT=http://loki:3100/otlp # OpenTelemetry Collector logs address http://loki:3100/otlp/v1/logs
#export OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=http://loki:3100/otlp/v1/logs
#export RUSTFS_OBS_USE_STDOUT=false # Whether to use standard output
#export RUSTFS_OBS_SAMPLE_RATIO=2.0 # Sample ratio, between 0.0-1.0, 0.0 means no sampling, 1.0 means full sampling
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
@@ -59,7 +66,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # Service version
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOG_STDOUT_ENABLED=true # Whether to enable local stdout logging
export RUSTFS_OBS_LOG_STDOUT_ENABLED=false # Whether to enable local stdout logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB