Merge branch 'main' of github.com:rustfs/s3-rustfs into feature/observability-metrics

# Conflicts:
#	Cargo.lock
#	README.md
#	README_ZH.md
#	crates/obs/Cargo.toml
#	deploy/config/obs-zh.example.toml
#	scripts/run.sh
This commit is contained in:
houseme
2025-05-30 10:52:02 +08:00
31 changed files with 1571 additions and 761 deletions

View File

@@ -0,0 +1,75 @@
# OpenObserve + OpenTelemetry Collector
[![OpenObserve](https://img.shields.io/badge/OpenObserve-OpenSource-blue.svg)](https://openobserve.org)
[![OpenTelemetry](https://img.shields.io/badge/OpenTelemetry-Collector-green.svg)](https://opentelemetry.io/)
English | [中文](README_ZH.md)
This directory contains the configuration files for setting up an observability stack with OpenObserve and OpenTelemetry
Collector.
### Overview
This setup provides a complete observability solution for your applications:
- **OpenObserve**: A modern, open-source observability platform for logs, metrics, and traces.
- **OpenTelemetry Collector**: Collects and processes telemetry data before sending it to OpenObserve.
### Setup Instructions
1. **Prerequisites**:
- Docker and Docker Compose installed
- Sufficient memory resources (minimum 2GB recommended)
2. **Starting the Services**:
```bash
cd .docker/openobserve-otel
docker compose -f docker-compose.yml up -d
```
3. **Accessing the Dashboard**:
- OpenObserve UI: http://localhost:5080
- Default credentials:
- Username: root@rustfs.com
- Password: rustfs123
### Configuration
#### OpenObserve Configuration
The OpenObserve service is configured with:
- Root user credentials
- Data persistence through a volume mount
- Memory cache enabled
- Health checks
- Exposed ports:
- 5080: HTTP API and UI
- 5081: OTLP gRPC
#### OpenTelemetry Collector Configuration
The collector is configured to:
- Receive telemetry data via OTLP (HTTP and gRPC)
- Collect logs from files
- Process data in batches
- Export data to OpenObserve
- Manage memory usage
### Integration with Your Application
To send telemetry data from your application, configure your OpenTelemetry SDK to send data to:
- OTLP gRPC: `localhost:4317`
- OTLP HTTP: `localhost:4318`
For example, in a Rust application using the `rustfs-obs` library:
```bash
export RUSTFS_OBS_ENDPOINT=http://localhost:4317
export RUSTFS_OBS_SERVICE_NAME=yourservice
export RUSTFS_OBS_SERVICE_VERSION=1.0.0
export RUSTFS_OBS_ENVIRONMENT=development
```

View File

@@ -0,0 +1,75 @@
# OpenObserve + OpenTelemetry Collector
[![OpenObserve](https://img.shields.io/badge/OpenObserve-OpenSource-blue.svg)](https://openobserve.org)
[![OpenTelemetry](https://img.shields.io/badge/OpenTelemetry-Collector-green.svg)](https://opentelemetry.io/)
[English](README.md) | 中文
## 中文
本目录包含搭建 OpenObserve 和 OpenTelemetry Collector 可观测性栈的配置文件。
### 概述
此设置为应用程序提供了完整的可观测性解决方案:
- **OpenObserve**:现代化、开源的可观测性平台,用于日志、指标和追踪。
- **OpenTelemetry Collector**:收集和处理遥测数据,然后将其发送到 OpenObserve。
### 设置说明
1. **前提条件**
- 已安装 Docker 和 Docker Compose
- 足够的内存资源(建议至少 2GB
2. **启动服务**
```bash
cd .docker/openobserve-otel
docker compose -f docker-compose.yml up -d
```
3. **访问仪表板**
- OpenObserve UIhttp://localhost:5080
- 默认凭据:
- 用户名root@rustfs.com
- 密码rustfs123
### 配置
#### OpenObserve 配置
OpenObserve 服务配置:
- 根用户凭据
- 通过卷挂载实现数据持久化
- 启用内存缓存
- 健康检查
- 暴露端口:
- 5080HTTP API 和 UI
- 5081OTLP gRPC
#### OpenTelemetry Collector 配置
收集器配置为:
- 通过 OTLPHTTP 和 gRPC接收遥测数据
- 从文件中收集日志
- 批处理数据
- 将数据导出到 OpenObserve
- 管理内存使用
### 与应用程序集成
要从应用程序发送遥测数据,将 OpenTelemetry SDK 配置为发送数据到:
- OTLP gRPC:`localhost:4317`
- OTLP HTTP:`localhost:4318`
例如,在使用 `rustfs-obs` 库的 Rust 应用程序中:
```bash
export RUSTFS_OBS_ENDPOINT=http://localhost:4317
export RUSTFS_OBS_SERVICE_NAME=yourservice
export RUSTFS_OBS_SERVICE_VERSION=1.0.0
export RUSTFS_OBS_ENVIRONMENT=development
```

View File

@@ -6,11 +6,12 @@ services:
ZO_ROOT_USER_EMAIL: "root@rustfs.com"
ZO_ROOT_USER_PASSWORD: "rustfs123"
ZO_TRACING_HEADER_KEY: "Authorization"
ZO_TRACING_HEADER_VALUE: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU="
ZO_TRACING_HEADER_VALUE: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ="
ZO_DATA_DIR: "/data"
ZO_MEMORY_CACHE_ENABLED: "true"
ZO_MEMORY_CACHE_MAX_SIZE: "256"
RUST_LOG: "info"
TZ: Asia/Shanghai
ports:
- "5080:5080"
- "5081:5081"
@@ -44,9 +45,11 @@ services:
- "13133:13133" # Health check
- "1777:1777" # pprof
- "55679:55679" # zpages
- "1888:1888" # Metrics
- "8888:8888" # Prometheus metrics
- "8889:8889" # Additional metrics endpoint
depends_on:
openobserve:
condition: service_healthy
- openobserve
networks:
- otel-network
deploy:

View File

@@ -20,9 +20,9 @@ processors:
exporters:
otlphttp/openobserve:
endpoint: http://openobserve:5080/api/default
endpoint: http://openobserve:5080/api/default # http://127.0.0.1:5080/api/default
headers:
Authorization: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU="
Authorization: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ="
stream-name: default
organization: default
compression: gzip
@@ -32,6 +32,21 @@ exporters:
max_interval: 30s
max_elapsed_time: 300s
timeout: 10s
otlp/openobserve:
endpoint: openobserve:5081 # http://127.0.0.1:5080/api/default
headers:
Authorization: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ="
stream-name: default
organization: default
compression: gzip
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
timeout: 10s
tls:
insecure: true
extensions:
health_check:
@@ -47,15 +62,15 @@ service:
traces:
receivers: [ otlp ]
processors: [ memory_limiter, batch ]
exporters: [ otlphttp/openobserve ]
exporters: [ otlp/openobserve ]
metrics:
receivers: [ otlp ]
processors: [ memory_limiter, batch ]
exporters: [ otlphttp/openobserve ]
exporters: [ otlp/openobserve ]
logs:
receivers: [ otlp, filelog ]
processors: [ memory_limiter, batch ]
exporters: [ otlphttp/openobserve ]
exporters: [ otlp/openobserve ]
telemetry:
logs:
level: "info" # Collector 日志级别

View File

@@ -21,7 +21,7 @@ jobs:
- { profile: release, target: x86_64-unknown-linux-gnu, glibc: "default" }
- { profile: release, target: aarch64-apple-darwin, glibc: "default" }
#- { profile: release, target: aarch64-unknown-linux-gnu, glibc: "default" }
#- { profile: release, target: aarch64-unknown-linux-musl, glibc: "default" }
- { profile: release, target: aarch64-unknown-linux-musl, glibc: "default" }
#- { profile: release, target: x86_64-pc-windows-msvc, glibc: "default" }
exclude:
# Linux targets on non-Linux systems
@@ -413,4 +413,4 @@ jobs:
with:
name: rustfs-packages
pattern: "rustfs-*"
delete-merged: true
delete-merged: true

1
.gitignore vendored
View File

@@ -7,6 +7,7 @@
/data
.devcontainer
rustfs/static/*
!rustfs/static/.gitkeep
vendor
cli/rustfs-gui/embedded-rustfs/rustfs
deploy/config/obs.toml

340
Cargo.lock generated
View File

@@ -834,9 +834,9 @@ dependencies = [
[[package]]
name = "backon"
version = "1.5.0"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd0b50b1b78dbadd44ab18b3c794e496f3a139abb9fbc27d9c94c4eebbb96496"
checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7"
dependencies = [
"fastrand",
"gloo-timers",
@@ -1782,6 +1782,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@@ -1825,9 +1834,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.2.0-rc.2"
version = "0.2.0-rc.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "170d71b5b14dec99db7739f6fc7d6ec2db80b78c3acb77db48392ccc3d8a9ea0"
checksum = "8a23fa214dea9efd4dacee5a5614646b30216ae0f05d4bb51bafb50e9da1c5be"
dependencies = [
"hybrid-array",
]
@@ -1934,7 +1943,7 @@ dependencies = [
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.10",
"parking_lot_core 0.9.11",
]
[[package]]
@@ -1948,7 +1957,7 @@ dependencies = [
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.10",
"parking_lot_core 0.9.11",
]
[[package]]
@@ -1995,7 +2004,7 @@ dependencies = [
"itertools 0.14.0",
"log",
"object_store",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"parquet",
"rand 0.8.5",
"regex",
@@ -2025,7 +2034,7 @@ dependencies = [
"futures",
"itertools 0.14.0",
"log",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
]
[[package]]
@@ -2137,7 +2146,7 @@ dependencies = [
"futures",
"log",
"object_store",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"rand 0.8.5",
"tempfile",
"url",
@@ -2273,7 +2282,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-plan",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"paste",
]
@@ -2414,7 +2423,7 @@ dependencies = [
"indexmap 2.9.0",
"itertools 0.14.0",
"log",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"pin-project-lite",
"tokio",
]
@@ -2562,7 +2571,7 @@ checksum = "6c478574b20020306f98d61c8ca3322d762e1ff08117422ac6106438605ea516"
dependencies = [
"block-buffer 0.11.0-rc.4",
"const-oid 0.10.1",
"crypto-common 0.2.0-rc.2",
"crypto-common 0.2.0-rc.3",
"subtle",
]
@@ -2955,7 +2964,7 @@ dependencies = [
"futures-util",
"generational-box",
"once_cell",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"rustc-hash 1.1.0",
"tracing",
"warnings",
@@ -3287,6 +3296,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "erased-serde"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7"
dependencies = [
"serde",
"typeid",
]
[[package]]
name = "errno"
version = "0.3.12"
@@ -3401,6 +3420,27 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flexi_logger"
version = "0.30.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb03342077df16d5b1400d7bed00156882846d7a479ff61a6f10594bcc3423d8"
dependencies = [
"chrono",
"crossbeam-channel",
"crossbeam-queue",
"log",
"notify-debouncer-mini",
"nu-ansi-term 0.50.1",
"regex",
"serde",
"serde_derive",
"thiserror 2.0.12",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "flume"
version = "0.11.1"
@@ -3476,6 +3516,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futf"
version = "0.1.5"
@@ -3702,7 +3751,7 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a673cf4fb0ea6a91aa86c08695756dfe875277a912cdbf33db9a9f62d47ed82b"
dependencies = [
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"tracing",
]
@@ -4510,6 +4559,26 @@ dependencies = [
"cfb",
]
[[package]]
name = "inotify"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3"
dependencies = [
"bitflags 2.9.1",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "inout"
version = "0.1.4"
@@ -4735,6 +4804,26 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "kqueue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "kuchikiki"
version = "0.8.2"
@@ -5010,9 +5099,9 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.12"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765"
dependencies = [
"autocfg",
"scopeguard",
@@ -5023,6 +5112,10 @@ name = "log"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
dependencies = [
"serde",
"value-bag",
]
[[package]]
name = "longest-increasing-subsequence"
@@ -5240,6 +5333,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c"
dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.59.0",
]
@@ -5419,6 +5513,43 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "notify"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943"
dependencies = [
"bitflags 2.9.1",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"notify-types",
"walkdir",
"windows-sys 0.59.0",
]
[[package]]
name = "notify-debouncer-mini"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a689eb4262184d9a1727f9087cd03883ea716682ab03ed24efec57d7716dccb8"
dependencies = [
"log",
"notify",
"notify-types",
"tempfile",
]
[[package]]
name = "notify-types"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
[[package]]
name = "ntapi"
version = "0.4.1"
@@ -5438,6 +5569,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "nugine-rust-utils"
version = "0.3.1"
@@ -5837,7 +5977,7 @@ dependencies = [
"futures",
"humantime",
"itertools 0.13.0",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"percent-encoding",
"snafu",
"tokio",
@@ -6073,12 +6213,12 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.12.3"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
dependencies = [
"lock_api",
"parking_lot_core 0.9.10",
"parking_lot_core 0.9.11",
]
[[package]]
@@ -6097,9 +6237,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.10"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
dependencies = [
"cfg-if",
"libc",
@@ -6806,7 +6946,7 @@ dependencies = [
"derive_builder",
"futures",
"lazy_static",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"s3s",
"snafu",
"tokio",
@@ -7164,9 +7304,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "reqwest"
version = "0.12.16"
version = "0.12.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bf597b113be201cb2269b4c39b39a804d01b99ee95a4278f0ed04e45cff1c71"
checksum = "e98ff6b0dbbe4d5a37318f433d4fc82babd21631f194d370409ceb2e40b2f0b5"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -7608,9 +7748,8 @@ version = "0.0.1"
dependencies = [
"async-trait",
"chrono",
"config",
"lazy_static",
"local-ip-address",
"flexi_logger",
"nu-ansi-term 0.50.1",
"nvml-wrapper",
"opentelemetry",
"opentelemetry-appender-tracing",
@@ -7980,6 +8119,15 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "serde_fmt"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4"
dependencies = [
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.140"
@@ -8357,9 +8505,9 @@ dependencies = [
[[package]]
name = "snafu"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019"
checksum = "320b01e011bf8d5d7a4a4a4be966d9160968935849c83b918827f6a435e7f627"
dependencies = [
"backtrace",
"snafu-derive",
@@ -8367,9 +8515,9 @@ dependencies = [
[[package]]
name = "snafu-derive"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917"
checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@@ -8502,7 +8650,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f"
dependencies = [
"new_debug_unreachable",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"phf_shared 0.11.3",
"precomputed-hash",
"serde",
@@ -8554,6 +8702,84 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "sval"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc9739f56c5d0c44a5ed45473ec868af02eb896af8c05f616673a31e1d1bb09"
[[package]]
name = "sval_buffer"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f39b07436a8c271b34dad5070c634d1d3d76d6776e938ee97b4a66a5e8003d0b"
dependencies = [
"sval",
"sval_ref",
]
[[package]]
name = "sval_dynamic"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffcb072d857431bf885580dacecf05ed987bac931230736739a79051dbf3499b"
dependencies = [
"sval",
]
[[package]]
name = "sval_fmt"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f214f427ad94a553e5ca5514c95c6be84667cbc5568cce957f03f3477d03d5c"
dependencies = [
"itoa 1.0.15",
"ryu",
"sval",
]
[[package]]
name = "sval_json"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ed34b32e638dec9a99c8ac92d0aa1220d40041026b625474c2b6a4d6f4feb"
dependencies = [
"itoa 1.0.15",
"ryu",
"sval",
]
[[package]]
name = "sval_nested"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14bae8fcb2f24fee2c42c1f19037707f7c9a29a0cda936d2188d48a961c4bb2a"
dependencies = [
"sval",
"sval_buffer",
"sval_ref",
]
[[package]]
name = "sval_ref"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a4eaea3821d3046dcba81d4b8489421da42961889902342691fb7eab491d79e"
dependencies = [
"sval",
]
[[package]]
name = "sval_serde"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172dd4aa8cb3b45c8ac8f3b4111d644cd26938b0643ede8f93070812b87fb339"
dependencies = [
"serde",
"sval",
"sval_nested",
]
[[package]]
name = "syn"
version = "1.0.109"
@@ -8671,7 +8897,7 @@ dependencies = [
"ndk-sys",
"objc",
"once_cell",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"raw-window-handle 0.5.2",
"raw-window-handle 0.6.2",
"scopeguard",
@@ -8922,7 +9148,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot 0.12.3",
"parking_lot 0.12.4",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@@ -9312,7 +9538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"nu-ansi-term 0.46.0",
"once_cell",
"regex",
"serde",
@@ -9419,6 +9645,12 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "typeid"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
[[package]]
name = "typenum"
version = "1.18.0"
@@ -9587,6 +9819,42 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "value-bag"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5"
dependencies = [
"value-bag-serde1",
"value-bag-sval2",
]
[[package]]
name = "value-bag-serde1"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35540706617d373b118d550d41f5dfe0b78a0c195dc13c6815e92e2638432306"
dependencies = [
"erased-serde",
"serde",
"serde_fmt",
]
[[package]]
name = "value-bag-sval2"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe7e140a2658cc16f7ee7a86e413e803fc8f9b5127adc8755c19f9fefa63a52"
dependencies = [
"sval",
"sval_buffer",
"sval_dynamic",
"sval_fmt",
"sval_json",
"sval_ref",
"sval_serde",
]
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@@ -76,6 +76,7 @@ dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
dotenvy = "0.15.7"
flatbuffers = "25.2.10"
flexi_logger = { version = "0.30.2", features = ["trc"] }
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
@@ -106,6 +107,7 @@ mime = "0.3.17"
mime_guess = "2.0.5"
netif = "0.1.6"
nix = { version = "0.30.1", features = ["fs"] }
nu-ansi-term = "0.50.1"
num_cpus = { version = "1.16.0" }
nvml-wrapper = "0.10.0"
object_store = "0.11.2"

View File

@@ -60,7 +60,7 @@ export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001"
# Observability config
export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
export RUSTFS_OBS_ENDPOINT="http://localhost:4317"
# Event message configuration
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml"
@@ -73,9 +73,9 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
./rustfs /data/rustfs
```
### Observability Stack
## Observability Stack Otel and OpenObserve
#### Deployment
### OpenTelemetry Collector 和 Jaeger、Grafana、Prometheus、Loki
1. Navigate to the observability directory:
```bash
@@ -93,24 +93,29 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
- Jaeger: `http://localhost:16686`
- Prometheus: `http://localhost:9090`
#### Configuring Observability
#### Configure observability
1. Copy the example configuration:
```
OpenTelemetry Collector address(endpoint): http://localhost:4317
```
---
### OpenObserve and OpenTelemetry Collector
1. Navigate to the OpenObserve and OpenTelemetry directory:
```bash
cd deploy/config
cp obs.example.toml obs.toml
cd .docker/openobserve-otel
```
2. Edit `obs.toml` with the following parameters:
| Parameter | Description | Example |
|----------------------|-----------------------------------|-----------------------|
| endpoint | OpenTelemetry Collector address | http://localhost:4317 |
| service_name | Service name | rustfs |
| service_version | Service version | 1.0.0 |
| environment | Runtime environment | production |
| meter_interval | Metrics export interval (seconds) | 30 |
| sample_ratio | Sampling ratio | 1.0 |
| use_stdout | Output to console | true/false |
| logger_level | Log level | info |
| local_logging_enable | stdout | true/false |
2. Start the OpenObserve and OpenTelemetry Collector services:
```bash
docker compose -f docker-compose.yml up -d
```
3. Access the OpenObserve UI:
OpenObserve UI: `http://localhost:5080`
- Default credentials:
- Username: `root@rustfs.com`
- Password: `rustfs123`
- Exposed ports:
- 5080: HTTP API and UI
- 5081: OTLP gRPC

View File

@@ -60,7 +60,7 @@ export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001"
# 可观测性配置
export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
export RUSTFS_OBS_ENDPOINT="http://localhost:4317"
# 事件消息配置
#export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml"
@@ -72,9 +72,9 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
./rustfs /data/rustfs
```
### 可观测性系统
## 可观测性系统 Otel 和 OpenObserve
#### 部署
### OpenTelemetry Collector 和 Jaeger、Grafana、Prometheus、Loki
1. 进入可观测性目录:
```bash
@@ -94,22 +94,28 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml"
#### 配置可观测性
1. 复制示例配置:
```
OpenTelemetry Collector 地址(endpoint): http://localhost:4317
```
---
### OpenObserve 和 OpenTelemetry Collector
1. 进入 OpenObserve 和 OpenTelemetry 目录:
```bash
cd deploy/config
cp obs.example.toml obs.toml
cd .docker/openobserve-otel
```
2. 启动 OpenObserve 和 OpenTelemetry Collector 服务:
```bash
docker compose -f docker-compose.yml up -d
```
3. 访问 OpenObserve UI
OpenObserve UI: `http://localhost:5080`
- 默认凭据:
- 用户名:`root@rustfs.com`
- 密码:`rustfs123`
- 开放端口:
- 5080HTTP API 和 UI
- 5081OTLP gRPC
2. 编辑 `obs.toml` 配置文件,参数如下:
| 配置项 | 说明 | 示例值 |
|----------------------|----------------------------|-----------------------|
| endpoint | OpenTelemetry Collector 地址 | http://localhost:4317 |
| service_name | 服务名称 | rustfs |
| service_version | 服务版本 | 1.0.0 |
| environment | 运行环境 | production |
| meter_interval | 指标导出间隔 (秒) | 30 |
| sample_ratio | 采样率 | 1.0 |
| use_stdout | 是否输出到控制台 | true/false |
| logger_level | 日志级别 | info |
| local_logging_enable | 控制台是否答应日志 | true/false |

View File

@@ -56,14 +56,13 @@ pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
/// Example: RUSTFS_SECRET_KEY=rustfsadmin
/// Example: --secret-key rustfsadmin
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Default configuration file for observability
/// Default value: config/obs.toml
/// Environment variable: RUSTFS_OBS_CONFIG
/// Command line argument: --obs-config
/// Example: RUSTFS_OBS_CONFIG=config/obs.toml
/// Example: --obs-config config/obs.toml
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "./deploy/config/obs.toml";
/// Default OBS configuration endpoint
/// Environment variable: DEFAULT_OBS_ENDPOINT
/// Command line argument: --obs-endpoint
/// Example: DEFAULT_OBS_ENDPOINT="http://localost:4317"
/// Example: --obs-endpoint http://localost:4317
pub const DEFAULT_OBS_ENDPOINT: &str = "";
/// Default TLS key for rustfs
/// This is the default key for TLS.
@@ -90,6 +89,41 @@ pub const DEFAULT_CONSOLE_PORT: u16 = 9001;
/// This is the default address for rustfs console.
pub const DEFAULT_CONSOLE_ADDRESS: &str = concat!(":", DEFAULT_CONSOLE_PORT);
/// Default log filename for rustfs
/// This is the default log filename for rustfs.
/// It is used to store the logs of the application.
/// Default value: rustfs.log
/// Environment variable: RUSTFS_OBSERVABILITY_LOG_FILENAME
pub const DEFAULT_LOG_FILENAME: &str = "rustfs.log";
/// Default log directory for rustfs
/// This is the default log directory for rustfs.
/// It is used to store the logs of the application.
/// Default value: logs
/// Environment variable: RUSTFS_OBSERVABILITY_LOG_DIRECTORY
pub const DEFAULT_LOG_DIR: &str = "logs";
/// Default log rotation size mb for rustfs
/// This is the default log rotation size for rustfs.
/// It is used to rotate the logs of the application.
/// Default value: 100 MB
/// Environment variable: RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB
pub const DEFAULT_LOG_ROTATION_SIZE_MB: u64 = 100;
/// Default log rotation time for rustfs
/// This is the default log rotation time for rustfs.
/// It is used to rotate the logs of the application.
/// Default value: hour, eg: day,hour,minute,second
/// Environment variable: RUSTFS_OBSERVABILITY_LOG_ROTATION_TIME
pub const DEFAULT_LOG_ROTATION_TIME: &str = "day";
/// Default log keep files for rustfs
/// This is the default log keep files for rustfs.
/// It is used to keep the logs of the application.
/// Default value: 30
/// Environment variable: RUSTFS_OBSERVABILITY_LOG_KEEP_FILES
pub const DEFAULT_LOG_KEEP_FILES: u16 = 30;
#[cfg(test)]
mod tests {
use super::*;
@@ -154,10 +188,6 @@ mod tests {
#[test]
fn test_file_path_constants() {
// Test file path related constants
assert_eq!(DEFAULT_OBS_CONFIG, "./deploy/config/obs.toml");
assert!(DEFAULT_OBS_CONFIG.ends_with(".toml"), "Config file should be TOML format");
assert_eq!(RUSTFS_TLS_KEY, "rustfs_key.pem");
assert!(RUSTFS_TLS_KEY.ends_with(".pem"), "TLS key should be PEM format");
@@ -219,7 +249,6 @@ mod tests {
ENVIRONMENT,
DEFAULT_ACCESS_KEY,
DEFAULT_SECRET_KEY,
DEFAULT_OBS_CONFIG,
RUSTFS_TLS_KEY,
RUSTFS_TLS_CERT,
DEFAULT_ADDRESS,

View File

@@ -20,9 +20,8 @@ kafka = ["dep:rdkafka"]
rustfs-config = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
lazy_static = { workspace = true }
local-ip-address = { workspace = true }
flexi_logger = { workspace = true, features = ["trc", "kv"] }
nu-ansi-term = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }

View File

@@ -1,5 +1,7 @@
use config::{Config, File, FileFormat};
use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_FILENAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB,
DEFAULT_LOG_ROTATION_TIME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
};
use serde::{Deserialize, Serialize};
use std::env;
@@ -22,54 +24,94 @@ pub struct OtelConfig {
pub environment: Option<String>, // Environment
pub logger_level: Option<String>, // Logger level
pub local_logging_enabled: Option<bool>, // Local logging enabled
}
/// Helper function: Extract observable configuration from environment variables
fn extract_otel_config_from_env() -> OtelConfig {
OtelConfig {
endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()),
use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(USE_STDOUT)),
sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SAMPLE_RATIO)),
meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(METER_INTERVAL)),
service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(APP_NAME.to_string())),
service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_VERSION.to_string())),
environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(ENVIRONMENT.to_string())),
logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(false)),
}
// 新增 flexi_logger 相关配置
pub log_directory: Option<String>, // 日志文件目录
pub log_filename: Option<String>, // 日志文件名称
pub log_rotation_size_mb: Option<u64>, // 日志文件大小切割阈值 (MB)
pub log_rotation_time: Option<String>, // 日志按时间切割 (Hour, Day)
pub log_keep_files: Option<u16>, // 保留日志文件数量
}
impl OtelConfig {
/// Helper function: Extract observable configuration from environment variables
pub fn extract_otel_config_from_env(endpoint: Option<String>) -> OtelConfig {
let endpoint = if let Some(endpoint) = endpoint {
if endpoint.is_empty() {
env::var("RUSTFS_OBS_ENDPOINT").unwrap_or_else(|_| "".to_string())
} else {
endpoint
}
} else {
env::var("RUSTFS_OBS_ENDPOINT").unwrap_or_else(|_| "".to_string())
};
let mut use_stdout = env::var("RUSTFS_OBS_USE_STDOUT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(USE_STDOUT));
if endpoint.is_empty() {
use_stdout = Some(true);
}
OtelConfig {
endpoint,
use_stdout,
sample_ratio: env::var("RUSTFS_OBS_SAMPLE_RATIO")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SAMPLE_RATIO)),
meter_interval: env::var("RUSTFS_OBS_METER_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(METER_INTERVAL)),
service_name: env::var("RUSTFS_OBS_SERVICE_NAME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(APP_NAME.to_string())),
service_version: env::var("RUSTFS_OBS_SERVICE_VERSION")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_VERSION.to_string())),
environment: env::var("RUSTFS_OBS_ENVIRONMENT")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(ENVIRONMENT.to_string())),
logger_level: env::var("RUSTFS_OBS_LOGGER_LEVEL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
local_logging_enabled: env::var("RUSTFS_OBS_LOCAL_LOGGING_ENABLED")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(false)),
log_directory: env::var("RUSTFS_OBS_LOG_DIRECTORY")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_DIR.to_string())),
log_filename: env::var("RUSTFS_OBS_LOG_FILENAME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_FILENAME.to_string())),
log_rotation_size_mb: env::var("RUSTFS_OBS_LOG_ROTATION_SIZE_MB")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB
log_rotation_time: env::var("RUSTFS_OBS_LOG_ROTATION_TIME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day"
log_keep_files: env::var("RUSTFS_OBS_LOG_KEEP_FILES")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files
}
}
/// Create a new instance of OtelConfig with default values
///
/// # Returns
/// A new instance of OtelConfig
pub fn new() -> Self {
extract_otel_config_from_env()
Self::extract_otel_config_from_env(None)
}
}
@@ -122,6 +164,12 @@ pub struct WebhookSinkConfig {
impl WebhookSinkConfig {
pub fn new() -> Self {
Self::default()
}
}
impl Default for WebhookSinkConfig {
fn default() -> Self {
Self {
endpoint: env::var("RUSTFS_SINKS_WEBHOOK_ENDPOINT")
.ok()
@@ -137,12 +185,6 @@ impl WebhookSinkConfig {
}
}
impl Default for WebhookSinkConfig {
fn default() -> Self {
Self::new()
}
}
/// File Sink Configuration - Add buffering parameters
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct FileSinkConfig {
@@ -160,7 +202,6 @@ impl FileSinkConfig {
eprintln!("Failed to create log directory: {}", e);
return "rustfs/rustfs.log".to_string();
}
println!("Using log directory: {:?}", temp_dir);
temp_dir
.join("rustfs.log")
.to_str()
@@ -173,9 +214,18 @@ impl FileSinkConfig {
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(Self::get_default_log_path),
buffer_size: Some(8192),
flush_interval_ms: Some(1000),
flush_threshold: Some(100),
buffer_size: env::var("RUSTFS_SINKS_FILE_BUFFER_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(8192)),
flush_interval_ms: env::var("RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(1000)),
flush_threshold: env::var("RUSTFS_SINKS_FILE_FLUSH_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(100)),
}
}
}
@@ -260,6 +310,14 @@ impl AppConfig {
logger: Some(LoggerConfig::default()),
}
}
pub fn new_with_endpoint(endpoint: Option<String>) -> Self {
Self {
observability: OtelConfig::extract_otel_config_from_env(endpoint),
sinks: vec![SinkConfig::new()],
logger: Some(LoggerConfig::new()),
}
}
}
// implement default for AppConfig
@@ -269,9 +327,6 @@ impl Default for AppConfig {
}
}
/// Default configuration file name
const DEFAULT_CONFIG_FILE: &str = "obs";
/// Loading the configuration file
/// Supports TOML, YAML and .env formats, read in order by priority
///
@@ -288,51 +343,6 @@ const DEFAULT_CONFIG_FILE: &str = "obs";
///
/// let config = load_config(None);
/// ```
pub fn load_config(config_dir: Option<String>) -> AppConfig {
let config_dir = if let Some(path) = config_dir {
// If a path is provided, check if it's empty
if path.is_empty() {
// If empty, use the default config file name
DEFAULT_CONFIG_FILE.to_string()
} else {
// Use the provided path
let path = std::path::Path::new(&path);
if path.extension().is_some() {
// If path has extension, use it as is (extension will be added by Config::builder)
path.with_extension("").to_string_lossy().into_owned()
} else {
// If path is a directory, append the default config file name
path.to_string_lossy().into_owned()
}
}
} else {
// If no path provided, use current directory + default config file
match env::current_dir() {
Ok(dir) => dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned(),
Err(_) => {
eprintln!("Warning: Failed to get current directory, using default config file");
DEFAULT_CONFIG_FILE.to_string()
}
}
};
// Log using proper logging instead of println when possible
println!("Using config file base: {}", config_dir);
let app_config = Config::builder()
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false))
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false))
.build()
.unwrap_or_default();
match app_config.try_deserialize::<AppConfig>() {
Ok(app_config) => {
println!("Parsed AppConfig: {:?}", app_config);
app_config
}
Err(e) => {
println!("Failed to deserialize config: {}", e);
AppConfig::default()
}
}
pub fn load_config(config: Option<String>) -> AppConfig {
AppConfig::new_with_endpoint(config)
}

View File

@@ -23,7 +23,7 @@ pub struct Collector {
impl Collector {
pub fn new(pid: Pid, meter: opentelemetry::metrics::Meter, interval_ms: u64) -> Result<Self, GlobalError> {
let mut system = System::new_all();
let mut system = System::new();
let attributes = ProcessAttributes::new(pid, &mut system)?;
let core_count = System::physical_core_count().ok_or(GlobalError::CoreCountError)?;
let metrics = Metrics::new(&meter);
@@ -52,7 +52,7 @@ impl Collector {
fn collect(&mut self) -> Result<(), GlobalError> {
self.system
.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), true);
.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), false);
// refresh the network interface list and statistics
self.networks.refresh(false);

View File

@@ -1,4 +1,7 @@
// Added flexi_logger related dependencies
use crate::OtelConfig;
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
@@ -13,7 +16,9 @@ use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
};
use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
};
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
use std::borrow::Cow;
@@ -47,23 +52,44 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte
/// // When it's dropped, all telemetry components are properly shut down
/// drop(otel_guard);
/// ```
#[derive(Debug)]
// Implement Debug trait correctly, rather than using derive, as some fields may not have implemented Debug
pub struct OtelGuard {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
logger_provider: SdkLoggerProvider,
tracer_provider: Option<SdkTracerProvider>,
meter_provider: Option<SdkMeterProvider>,
logger_provider: Option<SdkLoggerProvider>,
// Add a flexi_logger handle to keep the logging alive
_flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
}
// Implement debug manually and avoid relying on all fields to implement debug
impl std::fmt::Debug for OtelGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelGuard")
.field("tracer_provider", &self.tracer_provider.is_some())
.field("meter_provider", &self.meter_provider.is_some())
.field("logger_provider", &self.logger_provider.is_some())
.field("_flexi_logger_handles", &self._flexi_logger_handles.is_some())
.finish()
}
}
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("Tracer shutdown error: {:?}", err);
if let Some(provider) = self.tracer_provider.take() {
if let Err(err) = provider.shutdown() {
eprintln!("Tracer shutdown error: {:?}", err);
}
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("Meter shutdown error: {:?}", err);
if let Some(provider) = self.meter_provider.take() {
if let Err(err) = provider.shutdown() {
eprintln!("Meter shutdown error: {:?}", err);
}
}
if let Err(err) = self.logger_provider.shutdown() {
eprintln!("Logger shutdown error: {:?}", err);
if let Some(provider) = self.logger_provider.take() {
if let Err(err) = provider.shutdown() {
eprintln!("Logger shutdown error: {:?}", err);
}
}
}
}
@@ -106,156 +132,258 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Pre-create resource objects to avoid repeated construction
let res = resource(config);
// Configure flexi_logger to cut by time and size
let mut flexi_logger_handle = None;
if !endpoint.is_empty() {
// Pre-create resource objects to avoid repeated construction
let res = resource(config);
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone());
let tracer_provider = if endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let builder = if use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
};
builder.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone());
// initialize meter provider
let meter_provider = {
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
let tracer_provider = if endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let builder = if use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
if endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(meter_interval));
} else {
builder.with_batch_exporter(exporter)
};
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
}
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
};
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
if endpoint.is_empty() {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
} else {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
}
builder.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
};
// configuring tracing
{
// configure the formatting layer
let fmt_layer = {
let enable_color = std::io::stdout().is_terminal();
let mut layer = tracing_subscriber::fmt::layer()
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true);
// initialize meter provider
let meter_provider = {
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
// Only add full span events tracking in the development environment
if environment != ENVIRONMENT {
layer = layer.with_span_events(FmtSpan::FULL);
}
if endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(meter_interval));
} else {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
layer.with_filter(build_env_filter(logger_level, None))
};
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.build(),
);
let filter = build_env_filter(logger_level, None);
let otel_filter = build_env_filter(logger_level, None);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter);
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
// Configure registry to avoid repeated calls to filter methods
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(if config.local_logging_enabled.unwrap_or(false) {
Some(fmt_layer)
} else {
None
})
.with(OpenTelemetryLayer::new(tracer))
.with(otel_layer)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
}
}
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
};
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
if endpoint.is_empty() {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
} else {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
_flexi_logger_handles: flexi_logger_handle,
}
} else {
// Obtain the log directory and file name configuration
let log_directory = config.log_directory.as_deref().unwrap_or("logs");
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
builder.build()
};
// configuring tracing
{
// configure the formatting layer
let fmt_layer = {
let enable_color = std::io::stdout().is_terminal();
let mut layer = tracing_subscriber::fmt::layer()
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true);
// Only add full span events tracking in the development environment
if environment != ENVIRONMENT {
layer = layer.with_span_events(FmtSpan::FULL);
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
layer.with_filter(build_env_filter(logger_level, None))
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
let filter = build_env_filter(logger_level, None);
let otel_filter = build_env_filter(logger_level, None);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter);
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Configure registry to avoid repeated calls to filter methods
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(if config.local_logging_enabled.unwrap_or(false) {
Some(fmt_layer)
} else {
None
})
.with(OpenTelemetryLayer::new(tracer))
.with(otel_layer)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or(LogSpecification::info());
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
// Convert the logger_level string to the corresponding LevelFilter
let level_filter = match logger_level.to_lowercase().as_str() {
"trace" => flexi_logger::Duplicate::Trace,
"debug" => flexi_logger::Duplicate::Debug,
"info" => flexi_logger::Duplicate::Info,
"warn" | "warning" => flexi_logger::Duplicate::Warn,
"error" => flexi_logger::Duplicate::Error,
"off" => flexi_logger::Duplicate::None,
_ => flexi_logger::Duplicate::Info, // the default is info
};
// Configure the flexi_logger
let flexi_logger_result = flexi_logger::Logger::with(log_spec)
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suffix("log"),
)
.rotate(rotation_criterion, Naming::Timestamps, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.duplicate_to_stdout(level_filter) // Use dynamic levels
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.write_mode(WriteMode::Async)
.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
info!("Flexi logger initialized with file logging to {}/{}.log", log_directory, log_filename);
// Log logging of log cutting conditions
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => info!(
"Log rotation configured for: every {} or when size exceeds {}MB, keeping {} files",
time, size, keep_files
),
(Some(time), None) => info!("Log rotation configured for: every {}, keeping {} files", time, keep_files),
(None, Some(size)) => {
info!("Log rotation configured for: when size exceeds {}MB, keeping {} files", size, keep_files)
}
_ => info!("Log rotation configured for: daily, keeping {} files", keep_files),
}
} else {
eprintln!("Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
}
}
OtelGuard {
tracer_provider,
meter_provider,
logger_provider,
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
}
}
}
@@ -271,3 +399,50 @@ fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilte
filter
}
/// Custom Log Formatter Function - Terminal Output (with Color)
fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> {
let level = record.level();
let level_style = style(level);
// Get the current thread information
let binding = std::thread::current();
let thread_name = binding.name().unwrap_or("unnamed");
let thread_id = format!("{:?}", std::thread::current().id());
writeln!(
w,
"{} {} [{}] [{}:{}] [{}:{}] {}",
now.now().format("%Y-%m-%d %H:%M:%S%.6f"),
level_style.paint(level.to_string()),
Color::Magenta.paint(record.target()),
Color::Blue.paint(record.file().unwrap_or("unknown")),
Color::Blue.paint(record.line().unwrap_or(0).to_string()),
Color::Green.paint(thread_name),
Color::Green.paint(thread_id),
record.args()
)
}
/// Custom Log Formatter - File Output (No Color)
fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> {
let level = record.level();
// Get the current thread information
let binding = std::thread::current();
let thread_name = binding.name().unwrap_or("unnamed");
let thread_id = format!("{:?}", std::thread::current().id());
writeln!(
w,
"{} {} [{}] [{}:{}] [{}:{}] {}",
now.now().format("%Y-%m-%d %H:%M:%S%.6f"),
level,
record.target(),
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
thread_name,
thread_id,
record.args()
)
}

View File

@@ -1,28 +0,0 @@
#RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
#RUSTFS__OBSERVABILITY__USE_STDOUT=true
#RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
#RUSTFS__OBSERVABILITY__METER_INTERVAL=30
#RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
#RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
#RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
#RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug
#
#RUSTFS__SINKS_0__type=Kakfa
#RUSTFS__SINKS_0__brokers=localhost:9092
#RUSTFS__SINKS_0__topic=logs
#RUSTFS__SINKS_0__batch_size=100
#RUSTFS__SINKS_0__batch_timeout_ms=1000
#
#RUSTFS__SINKS_1__type=Webhook
#RUSTFS__SINKS_1__endpoint=http://localhost:8080/webhook
#RUSTFS__SINKS_1__auth_token=you-auth-token
#RUSTFS__SINKS_1__batch_size=100
#RUSTFS__SINKS_1__batch_timeout_ms=1000
#
#RUSTFS__SINKS_2__type=File
#RUSTFS__SINKS_2__path=./deploy/logs/rustfs.log
#RUSTFS__SINKS_2__buffer_size=10
#RUSTFS__SINKS_2__flush_interval_ms=1000
#RUSTFS__SINKS_2__flush_threshold=100
#
#RUSTFS__LOGGER__QUEUE_CAPACITY=10

View File

@@ -1,29 +0,0 @@
# config.toml
store_path = "./deploy/logs/event_store"
channel_capacity = 5000
[[adapters]]
type = "Webhook"
endpoint = "http://127.0.0.1:3020/webhook"
auth_token = "your-auth-token"
max_retries = 3
timeout = 50
[adapters.custom_headers]
custom_server = "value_server"
custom_client = "value_client"
#[[adapters]]
#type = "Kafka"
#brokers = "localhost:9092"
#topic = "notifications"
#max_retries = 3
#timeout = 60
#
#[[adapters]]
#type = "Mqtt"
#broker = "mqtt.example.com"
#port = 1883
#client_id = "event-notifier"
#topic = "events"
#max_retries = 3

View File

@@ -1,34 +0,0 @@
[observability]
endpoint = "http://localhost:4317" # 可观测性数据上报的终端地址,默认为"http://localhost:4317"
use_stdout = true # 是否将日志输出到标准输出
sample_ratio = 2.0 # 采样率,表示每 2 条数据采样 1 条
meter_interval = 30 # 指标收集间隔,单位为秒
service_name = "rustfs" # 服务名称,用于标识当前服务
service_version = "0.1.0" # 服务版本号
environment = "develop" # 运行环境,如开发环境 (develop)
logger_level = "debug" # 日志级别,可选 debug/info/warn/error 等
local_logging_enabled = true # 是否启用本地 stdout 日志记录true 表示启用false 表示禁用
#[[sinks]] # Kafka 接收器配置
#type = "Kafka" # 是否启用 Kafka 接收器,默认禁用
#brokers = "localhost:9092" # Kafka 服务器地址
#topic = "logs" # Kafka 主题名称
#batch_size = 100 # 批处理大小,每次发送的消息数量
#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
#[[sinks]] # Webhook 接收器配置
#type = "Webhook" # 是否启用 Webhook 接收器
#endpoint = "http://localhost:8080/webhook" # Webhook 接收地址
#auth_token = "" # 认证令牌
#batch_size = 100 # 批处理大小
#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒
[[sinks]] # 文件接收器配置
type = "File" # 是否启用文件接收器
path = "./deploy/logs/rustfs.log" # 日志文件路径
buffer_size = 10 # 缓冲区大小,表示每次写入的字节数
flush_interval_ms = 100 # 批处理超时时间,单位为毫秒
flush_threshold = 100 # 刷新阈值,表示在达到该数量时刷新日志
[logger] # 日志器配置
queue_capacity = 10 # 日志队列容量,表示可以缓存的日志条数

View File

@@ -1,34 +0,0 @@
[observability]
endpoint = "http://localhost:4317" # Default is "http://localhost:4317" if not specified
use_stdout = false # Output with stdout, true output, false no output
sample_ratio = 2.0
meter_interval = 30
service_name = "rustfs"
service_version = "0.0.1"
environment = "production" # Default is "production" if not specified
logger_level = "info"
local_logging_enabled = true
#[[sinks]]
#type = "Kafka"
#brokers = "localhost:9092"
#topic = "logs"
#batch_size = 100 # Default is 100 if not specified
#batch_timeout_ms = 100 # Default is 1000ms if not specified
#
#[[sinks]]
#type = "Webhook"
#endpoint = "http://localhost:8080/webhook"
#auth_token = ""
#batch_size = 100 # Default is 3 if not specified
#batch_timeout_ms = 100 # Default is 100ms if not specified
[[sinks]]
type = "File"
path = "deploy/logs/rustfs.log"
buffer_size = 101 # Default is 8192 bytes if not specified
flush_interval_ms = 1000
flush_threshold = 100
[logger]
queue_capacity = 10000

View File

@@ -20,8 +20,8 @@ RUSTFS_SERVER_ENDPOINT="http://127.0.0.1:9000"
RUSTFS_SERVER_DOMAINS=127.0.0.1:9002
# RustFS 许可证内容
RUSTFS_LICENSE="license content"
# 可观测性配置文件路径deploy/config/obs.example.toml
RUSTFS_OBS_CONFIG=/etc/default/obs.toml
# 可观测性配置Endpointhttp://localhost:4317
RUSTFS_OBS_ENDPOINT=http://localhost:4317
# TLS 证书目录路径deploy/certs
RUSTFS_TLS_PATH=/etc/default/tls
# 事件通知配置文件路径deploy/config/event.example.toml

View File

@@ -20,8 +20,8 @@ RUSTFS_SERVER_ENDPOINT="http://127.0.0.1:9000"
RUSTFS_SERVER_DOMAINS=127.0.0.1:9002
# RustFS license content
RUSTFS_LICENSE="license content"
# Observability configuration file path: deploy/config/obs.example.toml
RUSTFS_OBS_CONFIG=/etc/default/obs.toml
# Observability configuration endpoint: RUSTFS_OBS_ENDPOINT
RUSTFS_OBS_ENDPOINT=http://localhost:4317
# TLS certificates directory path: deploy/certs
RUSTFS_TLS_PATH=/etc/default/tls
# event notification configuration file path: deploy/config/event.example.toml

View File

@@ -2385,16 +2385,16 @@ impl DiskAPI for LocalDisk {
}
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject);
let mut res = HashMap::new();
let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata).await;
let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata);
let buf = match disk.read_metadata(item.path.clone()).await {
Ok(buf) => buf,
Err(err) => {
res.insert("err".to_string(), err.to_string());
stop_fn(&res).await;
stop_fn(&res);
return Err(Error::from_string(ERR_SKIP_FILE));
}
};
done_sz(buf.len() as u64).await;
done_sz(buf.len() as u64);
res.insert("metasize".to_string(), buf.len().to_string());
item.transform_meda_dir();
let meta_cache = MetaCacheEntry {
@@ -2406,7 +2406,7 @@ impl DiskAPI for LocalDisk {
Ok(fivs) => fivs,
Err(err) => {
res.insert("err".to_string(), err.to_string());
stop_fn(&res).await;
stop_fn(&res);
return Err(Error::from_string(ERR_SKIP_FILE));
}
};
@@ -2416,7 +2416,7 @@ impl DiskAPI for LocalDisk {
Ok(obj_infos) => obj_infos,
Err(err) => {
res.insert("err".to_string(), err.to_string());
stop_fn(&res).await;
stop_fn(&res);
return Err(Error::from_string(ERR_SKIP_FILE));
}
};
@@ -2432,7 +2432,7 @@ impl DiskAPI for LocalDisk {
let done = ScannerMetrics::time(ScannerMetric::ApplyVersion);
let sz: usize;
(obj_deleted, sz) = item.apply_actions(info, &size_s).await;
done().await;
done();
if obj_deleted {
break;
@@ -2462,14 +2462,14 @@ impl DiskAPI for LocalDisk {
let _obj_info =
frer_version.to_object_info(&item.bucket, &item.object_path().to_string_lossy(), versioned);
let done = ScannerMetrics::time(ScannerMetric::TierObjSweep);
done().await;
done();
}
// todo: global trace
if obj_deleted {
return Err(Error::from_string(ERR_IGNORE_FILE_CONTRIB));
}
done().await;
done();
Ok(size_s)
})
}),

View File

@@ -143,92 +143,169 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn
}
}
/// Initialize and start the data scanner in the background
///
/// This function starts a background task that continuously runs the data scanner
/// with randomized intervals between cycles to avoid resource contention.
///
/// # Features
/// - Graceful shutdown support via cancellation token
/// - Randomized sleep intervals to prevent synchronized scanning across nodes
/// - Minimum sleep duration to avoid excessive CPU usage
/// - Proper error handling and logging
///
/// # Architecture
/// 1. Initialize with random seed for sleep intervals
/// 2. Run scanner cycles in a loop
/// 3. Use randomized sleep between cycles to avoid thundering herd
/// 4. Ensure minimum sleep duration to prevent CPU thrashing
pub async fn init_data_scanner() {
info!("Initializing data scanner background task");
tokio::spawn(async move {
loop {
// Run the data scanner
run_data_scanner().await;
let random = {
let mut r = rand::thread_rng();
r.gen_range(0.0..1.0)
};
let duration = Duration::from_secs_f64(random * (SCANNER_CYCLE.load(Ordering::SeqCst) as f64));
let sleep_duration = if duration < Duration::new(1, 0) {
Duration::new(1, 0)
} else {
duration
};
info!("data scanner will sleeping {sleep_duration:?}");
// Calculate randomized sleep duration
// Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration
let random_factor: f64 = {
let mut rng = rand::thread_rng();
rng.gen_range(1.0..10.0)
};
let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64;
let sleep_duration_secs = random_factor * base_cycle_duration;
let sleep_duration = Duration::from_secs_f64(sleep_duration_secs);
info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle");
// Sleep with the calculated duration
sleep(sleep_duration).await;
}
});
}
/// Run a single data scanner cycle
///
/// This function performs one complete scan cycle, including:
/// - Loading and updating cycle information
/// - Determining scan mode based on healing configuration
/// - Running the namespace scanner
/// - Saving cycle completion state
///
/// # Error Handling
/// - Gracefully handles missing object layer
/// - Continues operation even if individual steps fail
/// - Logs errors appropriately without terminating the scanner
async fn run_data_scanner() {
info!("run_data_scanner");
info!("Starting data scanner cycle");
// Get the object layer, return early if not available
let Some(store) = new_object_layer_fn() else {
error!("errServerNotInitialized");
error!("Object layer not initialized, skipping scanner cycle");
return;
};
// Load current cycle information from persistent storage
let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
.await
.map_or(Vec::new(), |buf| buf);
let mut buf_t = Deserializer::new(Cursor::new(buf));
let mut cycle_info: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap_or_default();
loop {
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
cycle_info.current = cycle_info.next;
cycle_info.started = Utc::now();
{
globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await;
}
let bg_heal_info = read_background_heal_info(store.clone()).await;
let scan_mode =
get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await;
if bg_heal_info.current_scan_mode != scan_mode {
let mut new_heal_info = bg_heal_info;
new_heal_info.current_scan_mode = scan_mode;
if scan_mode == HEAL_DEEP_SCAN {
new_heal_info.bitrot_start_time = SystemTime::now();
new_heal_info.bitrot_start_cycle = cycle_info.current;
}
save_background_heal_info(store.clone(), &new_heal_info).await;
}
// Wait before starting next cycle and wait on startup.
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async {
store_data_usage_in_backend(rx).await;
.unwrap_or_else(|err| {
info!(error = %err, "Failed to read cycle info, starting fresh");
Vec::new()
});
let mut res = HashMap::new();
res.insert("cycle".to_string(), cycle_info.current.to_string());
info!("start ns_scanner");
match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await {
Ok(_) => {
info!("ns_scanner completed");
cycle_info.next += 1;
cycle_info.current = 0;
cycle_info.cycle_completed.push(Utc::now());
if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize {
let _ = cycle_info.cycle_completed.remove(0);
}
globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await;
let mut wr = Vec::new();
cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap();
let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, wr).await;
let mut cycle_info = if buf.is_empty() {
CurrentScannerCycle::default()
} else {
let mut buf_cursor = Deserializer::new(Cursor::new(buf));
Deserialize::deserialize(&mut buf_cursor).unwrap_or_else(|err| {
error!(error = %err, "Failed to deserialize cycle info, using default");
CurrentScannerCycle::default()
})
};
// Start metrics collection for this cycle
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
// Update cycle information
cycle_info.current = cycle_info.next;
cycle_info.started = Utc::now();
// Update global scanner metrics
globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await;
// Read background healing information and determine scan mode
let bg_heal_info = read_background_heal_info(store.clone()).await;
let scan_mode =
get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await;
// Update healing info if scan mode changed
if bg_heal_info.current_scan_mode != scan_mode {
let mut new_heal_info = bg_heal_info;
new_heal_info.current_scan_mode = scan_mode;
if scan_mode == HEAL_DEEP_SCAN {
new_heal_info.bitrot_start_time = SystemTime::now();
new_heal_info.bitrot_start_cycle = cycle_info.current;
}
save_background_heal_info(store.clone(), &new_heal_info).await;
}
// Set up data usage storage channel
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
let _ = store_data_usage_in_backend(rx).await;
});
// Prepare result tracking
let mut scan_result = HashMap::new();
scan_result.insert("cycle".to_string(), cycle_info.current.to_string());
info!(
cycle = cycle_info.current,
scan_mode = ?scan_mode,
"Starting namespace scanner"
);
// Run the namespace scanner
match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await {
Ok(_) => {
info!(cycle = cycle_info.current, "Namespace scanner completed successfully");
// Update cycle completion information
cycle_info.next += 1;
cycle_info.current = 0;
cycle_info.cycle_completed.push(Utc::now());
// Maintain cycle completion history (keep only recent cycles)
if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize {
let _ = cycle_info.cycle_completed.remove(0);
}
Err(err) => {
info!("ns_scanner failed: {:?}", err);
res.insert("error".to_string(), err.to_string());
// Update global metrics with completion info
globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await;
// Persist updated cycle information
// ignore error, continue.
let mut serialized_data = Vec::new();
if let Err(err) = cycle_info.serialize(&mut Serializer::new(&mut serialized_data)) {
error!(error = %err, "Failed to serialize cycle info");
} else if let Err(err) = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, serialized_data).await {
error!(error = %err, "Failed to save cycle info to storage");
}
}
stop_fn(&res).await;
sleep(Duration::from_secs(SCANNER_CYCLE.load(Ordering::SeqCst))).await;
Err(err) => {
error!(
cycle = cycle_info.current,
error = %err,
"Namespace scanner failed"
);
scan_result.insert("error".to_string(), err.to_string());
}
}
// Complete metrics collection for this cycle
stop_fn(&scan_result);
}
#[derive(Debug, Serialize, Deserialize)]
@@ -476,7 +553,7 @@ impl ScannerItem {
pub async fn apply_actions(&self, oi: &ObjectInfo, _size_s: &SizeSummary) -> (bool, usize) {
let done = ScannerMetrics::time(ScannerMetric::Ilm);
//todo: lifecycle
done().await;
done();
(false, oi.size)
}

View File

@@ -1,30 +1,30 @@
use chrono::Utc;
use common::globals::GLOBAL_Local_Node_Name;
use common::last_minute::{AccElem, LastMinuteLatency};
use lazy_static::lazy_static;
use madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::Once;
use std::time::{Duration, UNIX_EPOCH};
use std::{
collections::HashMap,
sync::{atomic::Ordering, Arc},
time::SystemTime,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tracing::{debug, info};
use tokio::sync::{Mutex, RwLock};
use tracing::debug;
use super::data_scanner::{CurrentScannerCycle, UpdateCurrentPathFn};
use super::data_scanner::CurrentScannerCycle;
lazy_static! {
pub static ref globalScannerMetrics: Arc<RwLock<ScannerMetrics>> = Arc::new(RwLock::new(ScannerMetrics::new()));
pub static ref globalScannerMetrics: Arc<ScannerMetrics> = Arc::new(ScannerMetrics::new());
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
/// Scanner metric types, matching the Go version exactly
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ScannerMetric {
// START Realtime metrics, that only to records
// START Realtime metrics, that only records
// last minute latencies and total operation count.
ReadMetadata = 0,
CheckMissing,
@@ -58,78 +58,387 @@ pub enum ScannerMetric {
Last,
}
static INIT: Once = Once::new();
impl ScannerMetric {
/// Convert to string representation for metrics
pub fn as_str(self) -> &'static str {
match self {
Self::ReadMetadata => "read_metadata",
Self::CheckMissing => "check_missing",
Self::SaveUsage => "save_usage",
Self::ApplyAll => "apply_all",
Self::ApplyVersion => "apply_version",
Self::TierObjSweep => "tier_obj_sweep",
Self::HealCheck => "heal_check",
Self::Ilm => "ilm",
Self::CheckReplication => "check_replication",
Self::Yield => "yield",
Self::CleanAbandoned => "clean_abandoned",
Self::ApplyNonCurrent => "apply_non_current",
Self::HealAbandonedVersion => "heal_abandoned_version",
Self::StartTrace => "start_trace",
Self::ScanObject => "scan_object",
Self::HealAbandonedObject => "heal_abandoned_object",
Self::LastRealtime => "last_realtime",
Self::ScanFolder => "scan_folder",
Self::ScanCycle => "scan_cycle",
Self::ScanBucketDrive => "scan_bucket_drive",
Self::CompactFolder => "compact_folder",
Self::Last => "last",
}
}
/// Convert from index back to enum (safe version)
pub fn from_index(index: usize) -> Option<Self> {
if index >= Self::Last as usize {
return None;
}
// Safe conversion using match instead of unsafe transmute
match index {
0 => Some(Self::ReadMetadata),
1 => Some(Self::CheckMissing),
2 => Some(Self::SaveUsage),
3 => Some(Self::ApplyAll),
4 => Some(Self::ApplyVersion),
5 => Some(Self::TierObjSweep),
6 => Some(Self::HealCheck),
7 => Some(Self::Ilm),
8 => Some(Self::CheckReplication),
9 => Some(Self::Yield),
10 => Some(Self::CleanAbandoned),
11 => Some(Self::ApplyNonCurrent),
12 => Some(Self::HealAbandonedVersion),
13 => Some(Self::StartTrace),
14 => Some(Self::ScanObject),
15 => Some(Self::HealAbandonedObject),
16 => Some(Self::LastRealtime),
17 => Some(Self::ScanFolder),
18 => Some(Self::ScanCycle),
19 => Some(Self::ScanBucketDrive),
20 => Some(Self::CompactFolder),
21 => Some(Self::Last),
_ => None,
}
}
}
/// Thread-safe wrapper for LastMinuteLatency with atomic operations
#[derive(Default)]
pub struct LockedLastMinuteLatency {
cached_sec: AtomicU64,
cached: AccElem,
mu: RwLock<bool>,
latency: LastMinuteLatency,
latency: Arc<Mutex<LastMinuteLatency>>,
}
impl Clone for LockedLastMinuteLatency {
fn clone(&self) -> Self {
Self {
cached_sec: AtomicU64::new(0),
cached: self.cached.clone(),
mu: RwLock::new(true),
latency: self.latency.clone(),
latency: Arc::clone(&self.latency),
}
}
}
impl LockedLastMinuteLatency {
pub async fn add(&mut self, value: &Duration) {
self.add_size(value, 0).await;
}
pub async fn add_size(&mut self, value: &Duration, sz: u64) {
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
INIT.call_once(|| {
self.cached = AccElem::default();
self.cached_sec.store(t, Ordering::SeqCst);
});
let last_t = self.cached_sec.load(Ordering::SeqCst);
if last_t != t
&& self
.cached_sec
.compare_exchange(last_t, t, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let old = self.cached.clone();
self.cached = AccElem::default();
let a = AccElem {
size: old.size,
total: old.total,
n: old.n,
};
let _ = self.mu.write().await;
self.latency.add_all(t - 1, &a);
pub fn new() -> Self {
Self {
latency: Arc::new(Mutex::new(LastMinuteLatency::default())),
}
self.cached.n += 1;
self.cached.total += value.as_secs();
self.cached.size += sz;
}
pub async fn total(&mut self) -> AccElem {
let _ = self.mu.read().await;
self.latency.get_total()
/// Add a duration measurement
pub async fn add(&self, duration: Duration) {
self.add_size(duration, 0).await;
}
/// Add a duration measurement with size
pub async fn add_size(&self, duration: Duration, size: u64) {
let mut latency = self.latency.lock().await;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elem = AccElem {
n: 1,
total: duration.as_secs(),
size,
};
latency.add_all(now, &elem);
}
/// Get total accumulated metrics for the last minute
pub async fn total(&self) -> AccElem {
let mut latency = self.latency.lock().await;
latency.get_total()
}
}
pub type LogFn = Arc<dyn Fn(&HashMap<String, String>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type TimeSizeFn = Arc<dyn Fn(u64) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type TimeFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
/// Current path tracker for monitoring active scan paths
struct CurrentPathTracker {
current_path: Arc<RwLock<String>>,
}
impl CurrentPathTracker {
fn new(initial_path: String) -> Self {
Self {
current_path: Arc::new(RwLock::new(initial_path)),
}
}
async fn update_path(&self, path: String) {
*self.current_path.write().await = path;
}
async fn get_path(&self) -> String {
self.current_path.read().await.clone()
}
}
/// Main scanner metrics structure
pub struct ScannerMetrics {
// All fields must be accessed atomically and aligned.
operations: Vec<AtomicU64>,
latency: Vec<LockedLastMinuteLatency>,
cycle_info: RwLock<Option<CurrentScannerCycle>>,
current_paths: HashMap<String, String>,
// Current paths contains disk -> tracker mappings
current_paths: Arc<RwLock<HashMap<String, Arc<CurrentPathTracker>>>>,
// Cycle information
cycle_info: Arc<RwLock<Option<CurrentScannerCycle>>>,
}
impl ScannerMetrics {
pub fn new() -> Self {
let operations = (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect();
let latency = (0..ScannerMetric::LastRealtime as usize)
.map(|_| LockedLastMinuteLatency::new())
.collect();
Self {
operations,
latency,
current_paths: Arc::new(RwLock::new(HashMap::new())),
cycle_info: Arc::new(RwLock::new(None)),
}
}
/// Log scanner action with custom metadata - compatible with existing usage
pub fn log(metric: ScannerMetric) -> impl Fn(&HashMap<String, String>) {
let start_time = SystemTime::now();
move |_custom: &HashMap<String, String>| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task for this)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
tokio::spawn(async move {
globalScannerMetrics.latency[metric_index].add(duration).await;
});
}
// Log trace metrics
if metric as u8 > ScannerMetric::StartTrace as u8 {
debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric");
}
}
}
/// Time scanner action with size - returns function that takes size
pub fn time_size(metric: ScannerMetric) -> impl Fn(u64) {
let start_time = SystemTime::now();
move |size: u64| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics with size (spawn async task)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
tokio::spawn(async move {
globalScannerMetrics.latency[metric_index].add_size(duration, size).await;
});
}
}
}
/// Time a scanner action - returns a closure to call when done
pub fn time(metric: ScannerMetric) -> impl Fn() {
let start_time = SystemTime::now();
move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
tokio::spawn(async move {
globalScannerMetrics.latency[metric_index].add(duration).await;
});
}
}
}
/// Time N scanner actions - returns function that takes count, then returns completion function
pub fn time_n(metric: ScannerMetric) -> Box<dyn Fn(usize) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let start_time = SystemTime::now();
Box::new(move |count: usize| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(count as u64, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
tokio::spawn(async move {
globalScannerMetrics.latency[metric_index].add(duration).await;
});
}
})
})
}
/// Increment time with specific duration
pub async fn inc_time(metric: ScannerMetric, duration: Duration) {
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics
if (metric as usize) < ScannerMetric::LastRealtime as usize {
globalScannerMetrics.latency[metric as usize].add(duration).await;
}
}
/// Get lifetime operation count for a metric
pub fn lifetime(&self, metric: ScannerMetric) -> u64 {
if (metric as usize) >= ScannerMetric::Last as usize {
return 0;
}
self.operations[metric as usize].load(Ordering::Relaxed)
}
/// Get last minute statistics for a metric
pub async fn last_minute(&self, metric: ScannerMetric) -> AccElem {
if (metric as usize) >= ScannerMetric::LastRealtime as usize {
return AccElem::default();
}
self.latency[metric as usize].total().await
}
/// Set current cycle information
pub async fn set_cycle(&self, cycle: Option<CurrentScannerCycle>) {
*self.cycle_info.write().await = cycle;
}
/// Get current cycle information
pub async fn get_cycle(&self) -> Option<CurrentScannerCycle> {
self.cycle_info.read().await.clone()
}
/// Get current active paths
pub async fn get_current_paths(&self) -> Vec<String> {
let mut result = Vec::new();
let paths = self.current_paths.read().await;
for (disk, tracker) in paths.iter() {
let path = tracker.get_path().await;
result.push(format!("{}/{}", disk, path));
}
result
}
/// Get number of active drives
pub async fn active_drives(&self) -> usize {
self.current_paths.read().await.len()
}
/// Generate metrics report
pub async fn report(&self) -> M_ScannerMetrics {
let mut metrics = M_ScannerMetrics::default();
// Set cycle information
if let Some(cycle) = self.get_cycle().await {
metrics.current_cycle = cycle.current;
metrics.cycles_completed_at = cycle.cycle_completed;
metrics.current_started = cycle.started;
}
metrics.collected_at = Utc::now();
metrics.active_paths = self.get_current_paths().await;
// Lifetime operations
for i in 0..ScannerMetric::Last as usize {
let count = self.operations[i].load(Ordering::Relaxed);
if count > 0 {
if let Some(metric) = ScannerMetric::from_index(i) {
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
}
}
}
// Last minute statistics for realtime metrics
for i in 0..ScannerMetric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
if last_min.n > 0 {
if let Some(_metric) = ScannerMetric::from_index(i) {
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
}
}
}
metrics
}
}
// Type aliases for compatibility with existing code
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
/// Create a current path updater for tracking scan progress
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let tracker = Arc::new(CurrentPathTracker::new(initial.to_string()));
let disk_name = disk.to_string();
// Store the tracker in global metrics
let tracker_clone = Arc::clone(&tracker);
let disk_clone = disk_name.clone();
tokio::spawn(async move {
globalScannerMetrics
.current_paths
.write()
.await
.insert(disk_clone, tracker_clone);
});
let update_fn = {
let tracker = Arc::clone(&tracker);
Arc::new(move |path: &str| -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let tracker = Arc::clone(&tracker);
let path = path.to_string();
Box::pin(async move {
tracker.update_path(path).await;
})
})
};
let done_fn = {
let disk_name = disk_name.clone();
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
globalScannerMetrics.current_paths.write().await.remove(&disk_name);
})
})
};
(update_fn, done_fn)
}
impl Default for ScannerMetrics {
@@ -137,123 +446,3 @@ impl Default for ScannerMetrics {
Self::new()
}
}
impl ScannerMetrics {
pub fn new() -> Self {
Self {
operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect(),
latency: vec![LockedLastMinuteLatency::default(); ScannerMetric::LastRealtime as usize],
cycle_info: RwLock::new(None),
current_paths: HashMap::new(),
}
}
pub async fn set_cycle(&mut self, c: Option<CurrentScannerCycle>) {
debug!("ScannerMetrics set_cycle {c:?}");
*self.cycle_info.write().await = c;
}
pub fn log(s: ScannerMetric) -> LogFn {
let start = SystemTime::now();
let s_clone = s as usize;
Arc::new(move |_custom: &HashMap<String, String>| {
Box::pin(async move {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
let mut sm_w = globalScannerMetrics.write().await;
sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst);
if s_clone < ScannerMetric::LastRealtime as usize {
sm_w.latency[s_clone].add(&duration).await;
}
})
})
}
pub async fn time_size(s: ScannerMetric) -> TimeSizeFn {
let start = SystemTime::now();
let s_clone = s as usize;
Arc::new(move |sz: u64| {
Box::pin(async move {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
let mut sm_w = globalScannerMetrics.write().await;
sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst);
if s_clone < ScannerMetric::LastRealtime as usize {
sm_w.latency[s_clone].add_size(&duration, sz).await;
}
})
})
}
pub fn time(s: ScannerMetric) -> TimeFn {
let start = SystemTime::now();
let s_clone = s as usize;
Arc::new(move || {
Box::pin(async move {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
let mut sm_w = globalScannerMetrics.write().await;
sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst);
if s_clone < ScannerMetric::LastRealtime as usize {
sm_w.latency[s_clone].add(&duration).await;
}
})
})
}
pub async fn get_cycle(&self) -> Option<CurrentScannerCycle> {
let r = self.cycle_info.read().await;
if let Some(c) = r.as_ref() {
return Some(c.clone());
}
None
}
pub async fn get_current_paths(&self) -> Vec<String> {
let mut res = Vec::new();
let prefix = format!("{}/", GLOBAL_Local_Node_Name.read().await);
self.current_paths.iter().for_each(|(k, v)| {
res.push(format!("{}/{}/{}", prefix, k, v));
});
res
}
pub async fn report(&self) -> M_ScannerMetrics {
let mut m = M_ScannerMetrics::default();
if let Some(cycle) = self.get_cycle().await {
info!("cycle: {cycle:?}");
m.current_cycle = cycle.current;
m.cycles_completed_at = cycle.cycle_completed;
m.current_started = cycle.started;
}
m.collected_at = Utc::now();
m.active_paths = self.get_current_paths().await;
for (i, v) in self.operations.iter().enumerate() {
m.life_time_ops.insert(i.to_string(), v.load(Ordering::SeqCst));
}
m
}
}
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub fn current_path_updater(disk: &str, _initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let disk_1 = disk.to_string();
let disk_2 = disk.to_string();
(
Arc::new(move |path: &str| {
let disk_inner = disk_1.clone();
let path = path.to_string();
Box::pin(async move {
globalScannerMetrics
.write()
.await
.current_paths
.insert(disk_inner, path.to_string());
})
}),
Arc::new(move || {
let disk_inner = disk_2.clone();
Box::pin(async move {
globalScannerMetrics.write().await.current_paths.remove(&disk_inner);
})
}),
)
}

View File

@@ -93,7 +93,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts)
if types.contains(&MetricType::SCANNER) {
info!("start get scanner metrics");
let metrics = globalScannerMetrics.read().await.report().await;
let metrics = globalScannerMetrics.report().await;
real_time_metrics.aggregated.scanner = Some(metrics);
}

View File

@@ -2893,7 +2893,7 @@ impl SetDisks {
}
pub async fn ns_scanner(
&self,
self: Arc<Self>,
buckets: &[BucketInfo],
want_cycle: u32,
updates: Sender<DataUsageCache>,
@@ -2911,7 +2911,7 @@ impl SetDisks {
return Ok(());
}
let old_cache = DataUsageCache::load(self, DATA_USAGE_CACHE_NAME).await?;
let old_cache = DataUsageCache::load(&self, DATA_USAGE_CACHE_NAME).await?;
let mut cache = DataUsageCache {
info: DataUsageCacheInfo {
name: DATA_USAGE_ROOT.to_string(),
@@ -2934,6 +2934,7 @@ impl SetDisks {
permutes.shuffle(&mut rng);
permutes
};
// Add new buckets first
for idx in permutes.iter() {
let b = buckets[*idx].clone();
@@ -2954,6 +2955,7 @@ impl SetDisks {
Duration::from_secs(30) + Duration::from_secs_f64(10.0 * rng.gen_range(0.0..1.0))
};
let mut ticker = interval(update_time);
let task = tokio::spawn(async move {
let last_save = Some(SystemTime::now());
let mut need_loop = true;
@@ -2983,8 +2985,8 @@ impl SetDisks {
}
}
});
// Restrict parallelism for disk usage scanner
// upto GOMAXPROCS if GOMAXPROCS is < len(disks)
let max_procs = num_cpus::get();
if max_procs < disks.len() {
disks = disks[0..max_procs].to_vec();
@@ -2997,6 +2999,7 @@ impl SetDisks {
Some(disk) => disk.clone(),
None => continue,
};
let self_clone = Arc::clone(&self);
let bucket_rx_clone = bucket_rx.clone();
let buckets_results_tx_clone = buckets_results_tx.clone();
futures.push(async move {
@@ -3005,7 +3008,7 @@ impl SetDisks {
Err(_) => return,
Ok(bucket_info) => {
let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME);
let mut cache = match DataUsageCache::load(self, &cache_name.to_string_lossy()).await {
let mut cache = match DataUsageCache::load(&self_clone, &cache_name.to_string_lossy()).await {
Ok(cache) => cache,
Err(_) => continue,
};
@@ -3022,6 +3025,7 @@ impl SetDisks {
..Default::default()
};
}
// Collect updates.
let (tx, mut rx) = mpsc::channel(1);
let buckets_results_tx_inner_clone = buckets_results_tx_clone.clone();
@@ -3042,9 +3046,10 @@ impl SetDisks {
}
}
});
// Calc usage
let before = cache.info.last_update;
let mut cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode, None).await {
let mut cache = match disk.ns_scanner(&cache, tx, heal_scan_mode, None).await {
Ok(cache) => cache,
Err(_) => {
if cache.info.last_update > before {
@@ -3080,9 +3085,9 @@ impl SetDisks {
}
});
}
info!("ns_scanner start");
let _ = join_all(futures).await;
drop(buckets_results_tx);
let _ = task.await;
info!("ns_scanner completed");
Ok(())

View File

@@ -828,6 +828,7 @@ impl ECStore {
}
});
if let Err(err) = set
.clone()
.ns_scanner(&all_buckets_clone, want_cycle as u32, tx, heal_scan_mode)
.await
{

View File

@@ -64,8 +64,8 @@ pub struct Opt {
/// Observability configuration file
/// Default value: config/obs.toml
#[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")]
pub obs_config: String,
#[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_ENDPOINT.to_string(), env = "RUSTFS_OBS_ENDPOINT")]
pub obs_endpoint: String,
/// tls path for rustfs api and console.
#[arg(long, env = "RUSTFS_TLS_PATH")]

View File

@@ -104,7 +104,7 @@ async fn main() -> Result<()> {
init_license(opt.license.clone());
// Load the configuration file
let config = load_config(Some(opt.clone().obs_config));
let config = load_config(Some(opt.clone().obs_endpoint));
// Initialize Observability
let (_logger, guard) = init_obs(config.clone()).await;

0
rustfs/static/.gitkeep Normal file
View File

View File

@@ -31,42 +31,42 @@ export RUSTFS_VOLUMES="./target/volume/test{0...4}"
# export RUSTFS_VOLUMES="./target/volume/test"
export RUSTFS_ADDRESS=":9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS=":9002"
export RUSTFS_CONSOLE_ADDRESS=":9001"
# export RUSTFS_SERVER_DOMAINS="localhost:9000"
# HTTPS 证书目录
# export RUSTFS_TLS_PATH="./deploy/certs"
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一
# export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"
# 可观测性 相关配置信息
#export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector 的地址
#export RUSTFS_OBS_USE_STDOUT=false # 是否使用标准输出
#export RUSTFS_OBS_SAMPLE_RATIO=2.0 # 采样率0.0-1.0之间0.0表示不采样1.0表示全部采样
#export RUSTFS_OBS_METER_INTERVAL=1 # 采样间隔,单位为秒
#export RUSTFS_OBS_SERVICE_NAME=rustfs # 服务名称
#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # 服务版本
#export RUSTFS_OBS_ENVIRONMENT=develop # 环境名称
export RUSTFS_OBS_LOGGER_LEVEL=debug # 日志级别,支持 trace, debug, info, warn, error
export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # 是否启用本地日志记录
export RUSTFS_OBS_LOG_DIRECTORY="./deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB
# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值
export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317
export RUSTFS_OBSERVABILITY_USE_STDOUT=false
export RUSTFS_OBSERVABILITY_SAMPLE_RATIO=2.0
export RUSTFS_OBSERVABILITY_METER_INTERVAL=31
export RUSTFS_OBSERVABILITY_SERVICE_NAME=rustfs
export RUSTFS_OBSERVABILITY_SERVICE_VERSION=0.1.0
export RUSTFS_OBSERVABILITY_ENVIRONMENT=develop
export RUSTFS_OBSERVABILITY_LOGGER_LEVEL=debug
export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true
#
#export RUSTFS_SINKS_type=File
export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log
#export RUSTFS_SINKS_buffer_size=12
#export RUSTFS_SINKS_flush_interval_ms=1000
#export RUSTFS_SINKS_flush_threshold=100
#export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log
#export RUSTFS_SINKS_FILE_BUFFER_SIZE=12
#export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000
#export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100
#
#export RUSTFS_SINKS_type=Kakfa
# Kafka sink 配置
#export RUSTFS_SINKS_KAFKA_BROKERS=localhost:9092
#export RUSTFS_SINKS_KAFKA_TOPIC=logs
#export RUSTFS_SINKS_batch_size=100
#export RUSTFS_SINKS_batch_timeout_ms=1000
#export RUSTFS_SINKS_KAFKA_BATCH_SIZE=100
#export RUSTFS_SINKS_KAFKA_BATCH_TIMEOUT_MS=1000
#
#export RUSTFS_SINKS_type=Webhook
# Webhook sink 配置
#export RUSTFS_SINKS_WEBHOOK_ENDPOINT=http://localhost:8080/webhook
#export RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN=you-auth-token
#export RUSTFS_SINKS_batch_size=100
#export RUSTFS_SINKS_batch_timeout_ms=1000
#export RUSTFS_SINKS_WEBHOOK_BATCH_SIZE=100
#export RUSTFS_SINKS_WEBHOOK_BATCH_TIMEOUT_MS=1000
#
#export RUSTFS_LOGGER_QUEUE_CAPACITY=10