diff --git a/.docker/openobserve-otel/README.md b/.docker/openobserve-otel/README.md new file mode 100644 index 00000000..5993c2fc --- /dev/null +++ b/.docker/openobserve-otel/README.md @@ -0,0 +1,75 @@ +# OpenObserve + OpenTelemetry Collector + +[![OpenObserve](https://img.shields.io/badge/OpenObserve-OpenSource-blue.svg)](https://openobserve.org) +[![OpenTelemetry](https://img.shields.io/badge/OpenTelemetry-Collector-green.svg)](https://opentelemetry.io/) + +English | [中文](README_ZH.md) + +This directory contains the configuration files for setting up an observability stack with OpenObserve and OpenTelemetry +Collector. + +### Overview + +This setup provides a complete observability solution for your applications: + +- **OpenObserve**: A modern, open-source observability platform for logs, metrics, and traces. +- **OpenTelemetry Collector**: Collects and processes telemetry data before sending it to OpenObserve. + +### Setup Instructions + +1. **Prerequisites**: + - Docker and Docker Compose installed + - Sufficient memory resources (minimum 2GB recommended) + +2. **Starting the Services**: + ```bash + cd .docker/openobserve-otel + docker compose -f docker-compose.yml up -d + ``` + +3. **Accessing the Dashboard**: + - OpenObserve UI: http://localhost:5080 + - Default credentials: + - Username: root@rustfs.com + - Password: rustfs123 + +### Configuration + +#### OpenObserve Configuration + +The OpenObserve service is configured with: + +- Root user credentials +- Data persistence through a volume mount +- Memory cache enabled +- Health checks +- Exposed ports: + - 5080: HTTP API and UI + - 5081: OTLP gRPC + +#### OpenTelemetry Collector Configuration + +The collector is configured to: + +- Receive telemetry data via OTLP (HTTP and gRPC) +- Collect logs from files +- Process data in batches +- Export data to OpenObserve +- Manage memory usage + +### Integration with Your Application + +To send telemetry data from your application, configure your OpenTelemetry SDK to send data to: + +- OTLP gRPC: `localhost:4317` +- OTLP HTTP: `localhost:4318` + +For example, in a Rust application using the `rustfs-obs` library: + +```bash +export RUSTFS_OBS_ENDPOINT=http://localhost:4317 +export RUSTFS_OBS_SERVICE_NAME=yourservice +export RUSTFS_OBS_SERVICE_VERSION=1.0.0 +export RUSTFS_OBS_ENVIRONMENT=development +``` + diff --git a/.docker/openobserve-otel/README_ZH.md b/.docker/openobserve-otel/README_ZH.md new file mode 100644 index 00000000..2e2e80c9 --- /dev/null +++ b/.docker/openobserve-otel/README_ZH.md @@ -0,0 +1,75 @@ +# OpenObserve + OpenTelemetry Collector + +[![OpenObserve](https://img.shields.io/badge/OpenObserve-OpenSource-blue.svg)](https://openobserve.org) +[![OpenTelemetry](https://img.shields.io/badge/OpenTelemetry-Collector-green.svg)](https://opentelemetry.io/) + +[English](README.md) | 中文 + +## 中文 + +本目录包含搭建 OpenObserve 和 OpenTelemetry Collector 可观测性栈的配置文件。 + +### 概述 + +此设置为应用程序提供了完整的可观测性解决方案: + +- **OpenObserve**:现代化、开源的可观测性平台,用于日志、指标和追踪。 +- **OpenTelemetry Collector**:收集和处理遥测数据,然后将其发送到 OpenObserve。 + +### 设置说明 + +1. **前提条件**: + - 已安装 Docker 和 Docker Compose + - 足够的内存资源(建议至少 2GB) + +2. **启动服务**: + ```bash + cd .docker/openobserve-otel + docker compose -f docker-compose.yml up -d + ``` + +3. **访问仪表板**: + - OpenObserve UI:http://localhost:5080 + - 默认凭据: + - 用户名:root@rustfs.com + - 密码:rustfs123 + +### 配置 + +#### OpenObserve 配置 + +OpenObserve 服务配置: + +- 根用户凭据 +- 通过卷挂载实现数据持久化 +- 启用内存缓存 +- 健康检查 +- 暴露端口: + - 5080:HTTP API 和 UI + - 5081:OTLP gRPC + +#### OpenTelemetry Collector 配置 + +收集器配置为: + +- 通过 OTLP(HTTP 和 gRPC)接收遥测数据 +- 从文件中收集日志 +- 批处理数据 +- 将数据导出到 OpenObserve +- 管理内存使用 + +### 与应用程序集成 + +要从应用程序发送遥测数据,将 OpenTelemetry SDK 配置为发送数据到: + +- OTLP gRPC:`localhost:4317` +- OTLP HTTP:`localhost:4318` + +例如,在使用 `rustfs-obs` 库的 Rust 应用程序中: + +```bash +export RUSTFS_OBS_ENDPOINT=http://localhost:4317 +export RUSTFS_OBS_SERVICE_NAME=yourservice +export RUSTFS_OBS_SERVICE_VERSION=1.0.0 +export RUSTFS_OBS_ENVIRONMENT=development +``` \ No newline at end of file diff --git a/.docker/openobserve-otel/docker-compose.yml b/.docker/openobserve-otel/docker-compose.yml index 68788aba..a4083bca 100644 --- a/.docker/openobserve-otel/docker-compose.yml +++ b/.docker/openobserve-otel/docker-compose.yml @@ -6,11 +6,12 @@ services: ZO_ROOT_USER_EMAIL: "root@rustfs.com" ZO_ROOT_USER_PASSWORD: "rustfs123" ZO_TRACING_HEADER_KEY: "Authorization" - ZO_TRACING_HEADER_VALUE: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" + ZO_TRACING_HEADER_VALUE: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ=" ZO_DATA_DIR: "/data" ZO_MEMORY_CACHE_ENABLED: "true" ZO_MEMORY_CACHE_MAX_SIZE: "256" RUST_LOG: "info" + TZ: Asia/Shanghai ports: - "5080:5080" - "5081:5081" @@ -44,9 +45,11 @@ services: - "13133:13133" # Health check - "1777:1777" # pprof - "55679:55679" # zpages + - "1888:1888" # Metrics + - "8888:8888" # Prometheus metrics + - "8889:8889" # Additional metrics endpoint depends_on: - openobserve: - condition: service_healthy + - openobserve networks: - otel-network deploy: diff --git a/.docker/openobserve-otel/otel-collector-config.yaml b/.docker/openobserve-otel/otel-collector-config.yaml index 288ac9a2..561692ad 100644 --- a/.docker/openobserve-otel/otel-collector-config.yaml +++ b/.docker/openobserve-otel/otel-collector-config.yaml @@ -20,9 +20,9 @@ processors: exporters: otlphttp/openobserve: - endpoint: http://openobserve:5080/api/default + endpoint: http://openobserve:5080/api/default # http://127.0.0.1:5080/api/default headers: - Authorization: "Bearer cm9vdEBydXN0ZnMuY29tOmxIV0RqQmZMWXJ6MnZOajU=" + Authorization: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ=" stream-name: default organization: default compression: gzip @@ -32,6 +32,21 @@ exporters: max_interval: 30s max_elapsed_time: 300s timeout: 10s + otlp/openobserve: + endpoint: openobserve:5081 # http://127.0.0.1:5080/api/default + headers: + Authorization: "Basic cm9vdEBydXN0ZnMuY29tOmQ4SXlCSEJTUkk3RGVlcEQ=" + stream-name: default + organization: default + compression: gzip + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + timeout: 10s + tls: + insecure: true extensions: health_check: @@ -47,15 +62,15 @@ service: traces: receivers: [ otlp ] processors: [ memory_limiter, batch ] - exporters: [ otlphttp/openobserve ] + exporters: [ otlp/openobserve ] metrics: receivers: [ otlp ] processors: [ memory_limiter, batch ] - exporters: [ otlphttp/openobserve ] + exporters: [ otlp/openobserve ] logs: receivers: [ otlp, filelog ] processors: [ memory_limiter, batch ] - exporters: [ otlphttp/openobserve ] + exporters: [ otlp/openobserve ] telemetry: logs: level: "info" # Collector 日志级别 diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bf729d21..07c80c8e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,7 +21,7 @@ jobs: - { profile: release, target: x86_64-unknown-linux-gnu, glibc: "default" } - { profile: release, target: aarch64-apple-darwin, glibc: "default" } #- { profile: release, target: aarch64-unknown-linux-gnu, glibc: "default" } - #- { profile: release, target: aarch64-unknown-linux-musl, glibc: "default" } + - { profile: release, target: aarch64-unknown-linux-musl, glibc: "default" } #- { profile: release, target: x86_64-pc-windows-msvc, glibc: "default" } exclude: # Linux targets on non-Linux systems @@ -413,4 +413,4 @@ jobs: with: name: rustfs-packages pattern: "rustfs-*" - delete-merged: true \ No newline at end of file + delete-merged: true diff --git a/.gitignore b/.gitignore index d38ffae3..8eb6191c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ /data .devcontainer rustfs/static/* +!rustfs/static/.gitkeep vendor cli/rustfs-gui/embedded-rustfs/rustfs deploy/config/obs.toml diff --git a/Cargo.lock b/Cargo.lock index 8e28ad63..a1e8ecc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -834,9 +834,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd0b50b1b78dbadd44ab18b3c794e496f3a139abb9fbc27d9c94c4eebbb96496" +checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" dependencies = [ "fastrand", "gloo-timers", @@ -1782,6 +1782,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1825,9 +1834,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.2.0-rc.2" +version = "0.2.0-rc.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "170d71b5b14dec99db7739f6fc7d6ec2db80b78c3acb77db48392ccc3d8a9ea0" +checksum = "8a23fa214dea9efd4dacee5a5614646b30216ae0f05d4bb51bafb50e9da1c5be" dependencies = [ "hybrid-array", ] @@ -1934,7 +1943,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.10", + "parking_lot_core 0.9.11", ] [[package]] @@ -1948,7 +1957,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.10", + "parking_lot_core 0.9.11", ] [[package]] @@ -1995,7 +2004,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "parquet", "rand 0.8.5", "regex", @@ -2025,7 +2034,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "parking_lot 0.12.3", + "parking_lot 0.12.4", ] [[package]] @@ -2137,7 +2146,7 @@ dependencies = [ "futures", "log", "object_store", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "rand 0.8.5", "tempfile", "url", @@ -2273,7 +2282,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-plan", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "paste", ] @@ -2414,7 +2423,7 @@ dependencies = [ "indexmap 2.9.0", "itertools 0.14.0", "log", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "pin-project-lite", "tokio", ] @@ -2562,7 +2571,7 @@ checksum = "6c478574b20020306f98d61c8ca3322d762e1ff08117422ac6106438605ea516" dependencies = [ "block-buffer 0.11.0-rc.4", "const-oid 0.10.1", - "crypto-common 0.2.0-rc.2", + "crypto-common 0.2.0-rc.3", "subtle", ] @@ -2955,7 +2964,7 @@ dependencies = [ "futures-util", "generational-box", "once_cell", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "rustc-hash 1.1.0", "tracing", "warnings", @@ -3287,6 +3296,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.12" @@ -3401,6 +3420,27 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flexi_logger" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb03342077df16d5b1400d7bed00156882846d7a479ff61a6f10594bcc3423d8" +dependencies = [ + "chrono", + "crossbeam-channel", + "crossbeam-queue", + "log", + "notify-debouncer-mini", + "nu-ansi-term 0.50.1", + "regex", + "serde", + "serde_derive", + "thiserror 2.0.12", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "flume" version = "0.11.1" @@ -3476,6 +3516,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futf" version = "0.1.5" @@ -3702,7 +3751,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a673cf4fb0ea6a91aa86c08695756dfe875277a912cdbf33db9a9f62d47ed82b" dependencies = [ - "parking_lot 0.12.3", + "parking_lot 0.12.4", "tracing", ] @@ -4510,6 +4559,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.9.1", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -4735,6 +4804,26 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kuchikiki" version = "0.8.2" @@ -5010,9 +5099,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" dependencies = [ "autocfg", "scopeguard", @@ -5023,6 +5112,10 @@ name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +dependencies = [ + "serde", + "value-bag", +] [[package]] name = "longest-increasing-subsequence" @@ -5240,6 +5333,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.59.0", ] @@ -5419,6 +5513,43 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "notify" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" +dependencies = [ + "bitflags 2.9.1", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.59.0", +] + +[[package]] +name = "notify-debouncer-mini" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a689eb4262184d9a1727f9087cd03883ea716682ab03ed24efec57d7716dccb8" +dependencies = [ + "log", + "notify", + "notify-types", + "tempfile", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "ntapi" version = "0.4.1" @@ -5438,6 +5569,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "nugine-rust-utils" version = "0.3.1" @@ -5837,7 +5977,7 @@ dependencies = [ "futures", "humantime", "itertools 0.13.0", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "percent-encoding", "snafu", "tokio", @@ -6073,12 +6213,12 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" dependencies = [ "lock_api", - "parking_lot_core 0.9.10", + "parking_lot_core 0.9.11", ] [[package]] @@ -6097,9 +6237,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.10" +version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", @@ -6806,7 +6946,7 @@ dependencies = [ "derive_builder", "futures", "lazy_static", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "s3s", "snafu", "tokio", @@ -7164,9 +7304,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqwest" -version = "0.12.16" +version = "0.12.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bf597b113be201cb2269b4c39b39a804d01b99ee95a4278f0ed04e45cff1c71" +checksum = "e98ff6b0dbbe4d5a37318f433d4fc82babd21631f194d370409ceb2e40b2f0b5" dependencies = [ "base64 0.22.1", "bytes", @@ -7608,9 +7748,8 @@ version = "0.0.1" dependencies = [ "async-trait", "chrono", - "config", - "lazy_static", - "local-ip-address", + "flexi_logger", + "nu-ansi-term 0.50.1", "nvml-wrapper", "opentelemetry", "opentelemetry-appender-tracing", @@ -7980,6 +8119,15 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "serde_fmt" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4" +dependencies = [ + "serde", +] + [[package]] name = "serde_json" version = "1.0.140" @@ -8357,9 +8505,9 @@ dependencies = [ [[package]] name = "snafu" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" +checksum = "320b01e011bf8d5d7a4a4a4be966d9160968935849c83b918827f6a435e7f627" dependencies = [ "backtrace", "snafu-derive", @@ -8367,9 +8515,9 @@ dependencies = [ [[package]] name = "snafu-derive" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" +checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -8502,7 +8650,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" dependencies = [ "new_debug_unreachable", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "phf_shared 0.11.3", "precomputed-hash", "serde", @@ -8554,6 +8702,84 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "sval" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc9739f56c5d0c44a5ed45473ec868af02eb896af8c05f616673a31e1d1bb09" + +[[package]] +name = "sval_buffer" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f39b07436a8c271b34dad5070c634d1d3d76d6776e938ee97b4a66a5e8003d0b" +dependencies = [ + "sval", + "sval_ref", +] + +[[package]] +name = "sval_dynamic" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffcb072d857431bf885580dacecf05ed987bac931230736739a79051dbf3499b" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_fmt" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f214f427ad94a553e5ca5514c95c6be84667cbc5568cce957f03f3477d03d5c" +dependencies = [ + "itoa 1.0.15", + "ryu", + "sval", +] + +[[package]] +name = "sval_json" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ed34b32e638dec9a99c8ac92d0aa1220d40041026b625474c2b6a4d6f4feb" +dependencies = [ + "itoa 1.0.15", + "ryu", + "sval", +] + +[[package]] +name = "sval_nested" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14bae8fcb2f24fee2c42c1f19037707f7c9a29a0cda936d2188d48a961c4bb2a" +dependencies = [ + "sval", + "sval_buffer", + "sval_ref", +] + +[[package]] +name = "sval_ref" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a4eaea3821d3046dcba81d4b8489421da42961889902342691fb7eab491d79e" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_serde" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "172dd4aa8cb3b45c8ac8f3b4111d644cd26938b0643ede8f93070812b87fb339" +dependencies = [ + "serde", + "sval", + "sval_nested", +] + [[package]] name = "syn" version = "1.0.109" @@ -8671,7 +8897,7 @@ dependencies = [ "ndk-sys", "objc", "once_cell", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "raw-window-handle 0.5.2", "raw-window-handle 0.6.2", "scopeguard", @@ -8922,7 +9148,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot 0.12.3", + "parking_lot 0.12.4", "pin-project-lite", "signal-hook-registry", "socket2", @@ -9312,7 +9538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", - "nu-ansi-term", + "nu-ansi-term 0.46.0", "once_cell", "regex", "serde", @@ -9419,6 +9645,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.18.0" @@ -9587,6 +9819,42 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "value-bag" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" +dependencies = [ + "value-bag-serde1", + "value-bag-sval2", +] + +[[package]] +name = "value-bag-serde1" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35540706617d373b118d550d41f5dfe0b78a0c195dc13c6815e92e2638432306" +dependencies = [ + "erased-serde", + "serde", + "serde_fmt", +] + +[[package]] +name = "value-bag-sval2" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe7e140a2658cc16f7ee7a86e413e803fc8f9b5127adc8755c19f9fefa63a52" +dependencies = [ + "sval", + "sval_buffer", + "sval_dynamic", + "sval_fmt", + "sval_json", + "sval_ref", + "sval_serde", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 5e2abdfe..800a56c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ dioxus = { version = "0.6.3", features = ["router"] } dirs = "6.0.0" dotenvy = "0.15.7" flatbuffers = "25.2.10" +flexi_logger = { version = "0.30.2", features = ["trc"] } futures = "0.3.31" futures-core = "0.3.31" futures-util = "0.3.31" @@ -106,6 +107,7 @@ mime = "0.3.17" mime_guess = "2.0.5" netif = "0.1.6" nix = { version = "0.30.1", features = ["fs"] } +nu-ansi-term = "0.50.1" num_cpus = { version = "1.16.0" } nvml-wrapper = "0.10.0" object_store = "0.11.2" diff --git a/README.md b/README.md index 603f7c80..a212486c 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001" # Observability config -export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" +export RUSTFS_OBS_ENDPOINT="http://localhost:4317" # Event message configuration #export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml" @@ -73,9 +73,9 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" ./rustfs /data/rustfs ``` -### Observability Stack +## Observability Stack Otel and OpenObserve -#### Deployment +### OpenTelemetry Collector 和 Jaeger、Grafana、Prometheus、Loki 1. Navigate to the observability directory: ```bash @@ -93,24 +93,29 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" - Jaeger: `http://localhost:16686` - Prometheus: `http://localhost:9090` -#### Configuring Observability +#### Configure observability -1. Copy the example configuration: +``` +OpenTelemetry Collector address(endpoint): http://localhost:4317 +``` + +--- + +### OpenObserve and OpenTelemetry Collector + +1. Navigate to the OpenObserve and OpenTelemetry directory: ```bash - cd deploy/config - cp obs.example.toml obs.toml + cd .docker/openobserve-otel ``` - -2. Edit `obs.toml` with the following parameters: - -| Parameter | Description | Example | -|----------------------|-----------------------------------|-----------------------| -| endpoint | OpenTelemetry Collector address | http://localhost:4317 | -| service_name | Service name | rustfs | -| service_version | Service version | 1.0.0 | -| environment | Runtime environment | production | -| meter_interval | Metrics export interval (seconds) | 30 | -| sample_ratio | Sampling ratio | 1.0 | -| use_stdout | Output to console | true/false | -| logger_level | Log level | info | -| local_logging_enable | stdout | true/false | +2. Start the OpenObserve and OpenTelemetry Collector services: + ```bash + docker compose -f docker-compose.yml up -d + ``` +3. Access the OpenObserve UI: + OpenObserve UI: `http://localhost:5080` + - Default credentials: + - Username: `root@rustfs.com` + - Password: `rustfs123` + - Exposed ports: + - 5080: HTTP API and UI + - 5081: OTLP gRPC diff --git a/README_ZH.md b/README_ZH.md index 3bdf3299..6ef40e61 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -60,7 +60,7 @@ export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001" # 可观测性配置 -export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" +export RUSTFS_OBS_ENDPOINT="http://localhost:4317" # 事件消息配置 #export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml" @@ -72,9 +72,9 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" ./rustfs /data/rustfs ``` -### 可观测性系统 +## 可观测性系统 Otel 和 OpenObserve -#### 部署 +### OpenTelemetry Collector 和 Jaeger、Grafana、Prometheus、Loki 1. 进入可观测性目录: ```bash @@ -94,22 +94,28 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" #### 配置可观测性 -1. 复制示例配置: +``` +OpenTelemetry Collector 地址(endpoint): http://localhost:4317 +``` + +--- + +### OpenObserve 和 OpenTelemetry Collector + +1. 进入 OpenObserve 和 OpenTelemetry 目录: ```bash - cd deploy/config - cp obs.example.toml obs.toml + cd .docker/openobserve-otel ``` +2. 启动 OpenObserve 和 OpenTelemetry Collector 服务: + ```bash + docker compose -f docker-compose.yml up -d + ``` +3. 访问 OpenObserve UI: + OpenObserve UI: `http://localhost:5080` + - 默认凭据: + - 用户名:`root@rustfs.com` + - 密码:`rustfs123` + - 开放端口: + - 5080:HTTP API 和 UI + - 5081:OTLP gRPC -2. 编辑 `obs.toml` 配置文件,参数如下: - -| 配置项 | 说明 | 示例值 | -|----------------------|----------------------------|-----------------------| -| endpoint | OpenTelemetry Collector 地址 | http://localhost:4317 | -| service_name | 服务名称 | rustfs | -| service_version | 服务版本 | 1.0.0 | -| environment | 运行环境 | production | -| meter_interval | 指标导出间隔 (秒) | 30 | -| sample_ratio | 采样率 | 1.0 | -| use_stdout | 是否输出到控制台 | true/false | -| logger_level | 日志级别 | info | -| local_logging_enable | 控制台是否答应日志 | true/false | diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index c4f22f93..e6baaba8 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -56,14 +56,13 @@ pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; /// Example: RUSTFS_SECRET_KEY=rustfsadmin /// Example: --secret-key rustfsadmin pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; -/// Default configuration file for observability -/// Default value: config/obs.toml -/// Environment variable: RUSTFS_OBS_CONFIG -/// Command line argument: --obs-config -/// Example: RUSTFS_OBS_CONFIG=config/obs.toml -/// Example: --obs-config config/obs.toml -/// Example: --obs-config /etc/rustfs/obs.toml -pub const DEFAULT_OBS_CONFIG: &str = "./deploy/config/obs.toml"; + +/// Default OBS configuration endpoint +/// Environment variable: DEFAULT_OBS_ENDPOINT +/// Command line argument: --obs-endpoint +/// Example: DEFAULT_OBS_ENDPOINT="http://localost:4317" +/// Example: --obs-endpoint http://localost:4317 +pub const DEFAULT_OBS_ENDPOINT: &str = ""; /// Default TLS key for rustfs /// This is the default key for TLS. @@ -90,6 +89,41 @@ pub const DEFAULT_CONSOLE_PORT: u16 = 9001; /// This is the default address for rustfs console. pub const DEFAULT_CONSOLE_ADDRESS: &str = concat!(":", DEFAULT_CONSOLE_PORT); +/// Default log filename for rustfs +/// This is the default log filename for rustfs. +/// It is used to store the logs of the application. +/// Default value: rustfs.log +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_FILENAME +pub const DEFAULT_LOG_FILENAME: &str = "rustfs.log"; + +/// Default log directory for rustfs +/// This is the default log directory for rustfs. +/// It is used to store the logs of the application. +/// Default value: logs +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_DIRECTORY +pub const DEFAULT_LOG_DIR: &str = "logs"; + +/// Default log rotation size mb for rustfs +/// This is the default log rotation size for rustfs. +/// It is used to rotate the logs of the application. +/// Default value: 100 MB +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_ROTATION_SIZE_MB +pub const DEFAULT_LOG_ROTATION_SIZE_MB: u64 = 100; + +/// Default log rotation time for rustfs +/// This is the default log rotation time for rustfs. +/// It is used to rotate the logs of the application. +/// Default value: hour, eg: day,hour,minute,second +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_ROTATION_TIME +pub const DEFAULT_LOG_ROTATION_TIME: &str = "day"; + +/// Default log keep files for rustfs +/// This is the default log keep files for rustfs. +/// It is used to keep the logs of the application. +/// Default value: 30 +/// Environment variable: RUSTFS_OBSERVABILITY_LOG_KEEP_FILES +pub const DEFAULT_LOG_KEEP_FILES: u16 = 30; + #[cfg(test)] mod tests { use super::*; @@ -154,10 +188,6 @@ mod tests { #[test] fn test_file_path_constants() { - // Test file path related constants - assert_eq!(DEFAULT_OBS_CONFIG, "./deploy/config/obs.toml"); - assert!(DEFAULT_OBS_CONFIG.ends_with(".toml"), "Config file should be TOML format"); - assert_eq!(RUSTFS_TLS_KEY, "rustfs_key.pem"); assert!(RUSTFS_TLS_KEY.ends_with(".pem"), "TLS key should be PEM format"); @@ -219,7 +249,6 @@ mod tests { ENVIRONMENT, DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, - DEFAULT_OBS_CONFIG, RUSTFS_TLS_KEY, RUSTFS_TLS_CERT, DEFAULT_ADDRESS, diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 77b89174..e2265103 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -20,9 +20,8 @@ kafka = ["dep:rdkafka"] rustfs-config = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } -config = { workspace = true } -lazy_static = { workspace = true } -local-ip-address = { workspace = true } +flexi_logger = { workspace = true, features = ["trc", "kv"] } +nu-ansi-term = { workspace = true } nvml-wrapper = { workspace = true, optional = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 770203de..5284e301 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -1,5 +1,7 @@ -use config::{Config, File, FileFormat}; -use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; +use rustfs_config::{ + APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_FILENAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, + DEFAULT_LOG_ROTATION_TIME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, +}; use serde::{Deserialize, Serialize}; use std::env; @@ -22,54 +24,94 @@ pub struct OtelConfig { pub environment: Option, // Environment pub logger_level: Option, // Logger level pub local_logging_enabled: Option, // Local logging enabled -} - -/// Helper function: Extract observable configuration from environment variables -fn extract_otel_config_from_env() -> OtelConfig { - OtelConfig { - endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()), - use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(USE_STDOUT)), - sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(SAMPLE_RATIO)), - meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(METER_INTERVAL)), - service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(APP_NAME.to_string())), - service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(SERVICE_VERSION.to_string())), - environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(ENVIRONMENT.to_string())), - logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_LOG_LEVEL.to_string())), - local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED") - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(false)), - } + // 新增 flexi_logger 相关配置 + pub log_directory: Option, // 日志文件目录 + pub log_filename: Option, // 日志文件名称 + pub log_rotation_size_mb: Option, // 日志文件大小切割阈值 (MB) + pub log_rotation_time: Option, // 日志按时间切割 (Hour, Day) + pub log_keep_files: Option, // 保留日志文件数量 } impl OtelConfig { + /// Helper function: Extract observable configuration from environment variables + pub fn extract_otel_config_from_env(endpoint: Option) -> OtelConfig { + let endpoint = if let Some(endpoint) = endpoint { + if endpoint.is_empty() { + env::var("RUSTFS_OBS_ENDPOINT").unwrap_or_else(|_| "".to_string()) + } else { + endpoint + } + } else { + env::var("RUSTFS_OBS_ENDPOINT").unwrap_or_else(|_| "".to_string()) + }; + let mut use_stdout = env::var("RUSTFS_OBS_USE_STDOUT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(USE_STDOUT)); + if endpoint.is_empty() { + use_stdout = Some(true); + } + + OtelConfig { + endpoint, + use_stdout, + sample_ratio: env::var("RUSTFS_OBS_SAMPLE_RATIO") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SAMPLE_RATIO)), + meter_interval: env::var("RUSTFS_OBS_METER_INTERVAL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(METER_INTERVAL)), + service_name: env::var("RUSTFS_OBS_SERVICE_NAME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(APP_NAME.to_string())), + service_version: env::var("RUSTFS_OBS_SERVICE_VERSION") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SERVICE_VERSION.to_string())), + environment: env::var("RUSTFS_OBS_ENVIRONMENT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(ENVIRONMENT.to_string())), + logger_level: env::var("RUSTFS_OBS_LOGGER_LEVEL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_LEVEL.to_string())), + local_logging_enabled: env::var("RUSTFS_OBS_LOCAL_LOGGING_ENABLED") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(false)), + log_directory: env::var("RUSTFS_OBS_LOG_DIRECTORY") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_DIR.to_string())), + log_filename: env::var("RUSTFS_OBS_LOG_FILENAME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_FILENAME.to_string())), + log_rotation_size_mb: env::var("RUSTFS_OBS_LOG_ROTATION_SIZE_MB") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB + log_rotation_time: env::var("RUSTFS_OBS_LOG_ROTATION_TIME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day" + log_keep_files: env::var("RUSTFS_OBS_LOG_KEEP_FILES") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files + } + } + /// Create a new instance of OtelConfig with default values /// /// # Returns /// A new instance of OtelConfig pub fn new() -> Self { - extract_otel_config_from_env() + Self::extract_otel_config_from_env(None) } } @@ -122,6 +164,12 @@ pub struct WebhookSinkConfig { impl WebhookSinkConfig { pub fn new() -> Self { + Self::default() + } +} + +impl Default for WebhookSinkConfig { + fn default() -> Self { Self { endpoint: env::var("RUSTFS_SINKS_WEBHOOK_ENDPOINT") .ok() @@ -137,12 +185,6 @@ impl WebhookSinkConfig { } } -impl Default for WebhookSinkConfig { - fn default() -> Self { - Self::new() - } -} - /// File Sink Configuration - Add buffering parameters #[derive(Debug, Deserialize, Serialize, Clone)] pub struct FileSinkConfig { @@ -160,7 +202,6 @@ impl FileSinkConfig { eprintln!("Failed to create log directory: {}", e); return "rustfs/rustfs.log".to_string(); } - println!("Using log directory: {:?}", temp_dir); temp_dir .join("rustfs.log") .to_str() @@ -173,9 +214,18 @@ impl FileSinkConfig { .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(Self::get_default_log_path), - buffer_size: Some(8192), - flush_interval_ms: Some(1000), - flush_threshold: Some(100), + buffer_size: env::var("RUSTFS_SINKS_FILE_BUFFER_SIZE") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(8192)), + flush_interval_ms: env::var("RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(1000)), + flush_threshold: env::var("RUSTFS_SINKS_FILE_FLUSH_THRESHOLD") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(100)), } } } @@ -260,6 +310,14 @@ impl AppConfig { logger: Some(LoggerConfig::default()), } } + + pub fn new_with_endpoint(endpoint: Option) -> Self { + Self { + observability: OtelConfig::extract_otel_config_from_env(endpoint), + sinks: vec![SinkConfig::new()], + logger: Some(LoggerConfig::new()), + } + } } // implement default for AppConfig @@ -269,9 +327,6 @@ impl Default for AppConfig { } } -/// Default configuration file name -const DEFAULT_CONFIG_FILE: &str = "obs"; - /// Loading the configuration file /// Supports TOML, YAML and .env formats, read in order by priority /// @@ -288,51 +343,6 @@ const DEFAULT_CONFIG_FILE: &str = "obs"; /// /// let config = load_config(None); /// ``` -pub fn load_config(config_dir: Option) -> AppConfig { - let config_dir = if let Some(path) = config_dir { - // If a path is provided, check if it's empty - if path.is_empty() { - // If empty, use the default config file name - DEFAULT_CONFIG_FILE.to_string() - } else { - // Use the provided path - let path = std::path::Path::new(&path); - if path.extension().is_some() { - // If path has extension, use it as is (extension will be added by Config::builder) - path.with_extension("").to_string_lossy().into_owned() - } else { - // If path is a directory, append the default config file name - path.to_string_lossy().into_owned() - } - } - } else { - // If no path provided, use current directory + default config file - match env::current_dir() { - Ok(dir) => dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned(), - Err(_) => { - eprintln!("Warning: Failed to get current directory, using default config file"); - DEFAULT_CONFIG_FILE.to_string() - } - } - }; - - // Log using proper logging instead of println when possible - println!("Using config file base: {}", config_dir); - - let app_config = Config::builder() - .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false)) - .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false)) - .build() - .unwrap_or_default(); - - match app_config.try_deserialize::() { - Ok(app_config) => { - println!("Parsed AppConfig: {:?}", app_config); - app_config - } - Err(e) => { - println!("Failed to deserialize config: {}", e); - AppConfig::default() - } - } +pub fn load_config(config: Option) -> AppConfig { + AppConfig::new_with_endpoint(config) } diff --git a/crates/obs/src/system/collector.rs b/crates/obs/src/system/collector.rs index c4184051..ea9bae3b 100644 --- a/crates/obs/src/system/collector.rs +++ b/crates/obs/src/system/collector.rs @@ -23,7 +23,7 @@ pub struct Collector { impl Collector { pub fn new(pid: Pid, meter: opentelemetry::metrics::Meter, interval_ms: u64) -> Result { - let mut system = System::new_all(); + let mut system = System::new(); let attributes = ProcessAttributes::new(pid, &mut system)?; let core_count = System::physical_core_count().ok_or(GlobalError::CoreCountError)?; let metrics = Metrics::new(&meter); @@ -52,7 +52,7 @@ impl Collector { fn collect(&mut self) -> Result<(), GlobalError> { self.system - .refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), true); + .refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), false); // refresh the network interface list and statistics self.networks.refresh(false); diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index d4ccaa82..69966e19 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,4 +1,7 @@ +// Added flexi_logger related dependencies use crate::OtelConfig; +use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode}; +use nu_ansi_term::Color; use opentelemetry::trace::TracerProvider; use opentelemetry::{global, KeyValue}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; @@ -13,7 +16,9 @@ use opentelemetry_semantic_conventions::{ attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, SCHEMA_URL, }; -use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; +use rustfs_config::{ + APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, +}; use rustfs_utils::get_local_ip_with_default; use smallvec::SmallVec; use std::borrow::Cow; @@ -47,23 +52,44 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte /// // When it's dropped, all telemetry components are properly shut down /// drop(otel_guard); /// ``` -#[derive(Debug)] +// Implement Debug trait correctly, rather than using derive, as some fields may not have implemented Debug pub struct OtelGuard { - tracer_provider: SdkTracerProvider, - meter_provider: SdkMeterProvider, - logger_provider: SdkLoggerProvider, + tracer_provider: Option, + meter_provider: Option, + logger_provider: Option, + // Add a flexi_logger handle to keep the logging alive + _flexi_logger_handles: Option, +} + +// Implement debug manually and avoid relying on all fields to implement debug +impl std::fmt::Debug for OtelGuard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OtelGuard") + .field("tracer_provider", &self.tracer_provider.is_some()) + .field("meter_provider", &self.meter_provider.is_some()) + .field("logger_provider", &self.logger_provider.is_some()) + .field("_flexi_logger_handles", &self._flexi_logger_handles.is_some()) + .finish() + } } impl Drop for OtelGuard { fn drop(&mut self) { - if let Err(err) = self.tracer_provider.shutdown() { - eprintln!("Tracer shutdown error: {:?}", err); + if let Some(provider) = self.tracer_provider.take() { + if let Err(err) = provider.shutdown() { + eprintln!("Tracer shutdown error: {:?}", err); + } } - if let Err(err) = self.meter_provider.shutdown() { - eprintln!("Meter shutdown error: {:?}", err); + + if let Some(provider) = self.meter_provider.take() { + if let Err(err) = provider.shutdown() { + eprintln!("Meter shutdown error: {:?}", err); + } } - if let Err(err) = self.logger_provider.shutdown() { - eprintln!("Logger shutdown error: {:?}", err); + if let Some(provider) = self.logger_provider.take() { + if let Err(err) = provider.shutdown() { + eprintln!("Logger shutdown error: {:?}", err); + } } } } @@ -106,156 +132,258 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { let service_name = config.service_name.as_deref().unwrap_or(APP_NAME); let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT); - // Pre-create resource objects to avoid repeated construction - let res = resource(config); + // Configure flexi_logger to cut by time and size + let mut flexi_logger_handle = None; + if !endpoint.is_empty() { + // Pre-create resource objects to avoid repeated construction + let res = resource(config); - // initialize tracer provider - let tracer_provider = { - let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); - let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 { - Sampler::TraceIdRatioBased(sample_ratio) - } else { - Sampler::AlwaysOn + // initialize tracer provider + let tracer_provider = { + let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); + let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 { + Sampler::TraceIdRatioBased(sample_ratio) + } else { + Sampler::AlwaysOn + }; + + let builder = SdkTracerProvider::builder() + .with_sampler(sampler) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(res.clone()); + + let tracer_provider = if endpoint.is_empty() { + builder + .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + .build() + } else { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .build() + .unwrap(); + + let builder = if use_stdout { + builder + .with_batch_exporter(exporter) + .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + } else { + builder.with_batch_exporter(exporter) + }; + + builder.build() + }; + + global::set_tracer_provider(tracer_provider.clone()); + tracer_provider }; - let builder = SdkTracerProvider::builder() - .with_sampler(sampler) - .with_id_generator(RandomIdGenerator::default()) - .with_resource(res.clone()); + // initialize meter provider + let meter_provider = { + let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); - let tracer_provider = if endpoint.is_empty() { - builder - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) - .build() - } else { - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - .unwrap(); - - let builder = if use_stdout { - builder - .with_batch_exporter(exporter) - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + if endpoint.is_empty() { + builder = builder.with_reader(create_periodic_reader(meter_interval)); } else { - builder.with_batch_exporter(exporter) - }; + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build() + .unwrap(); + + builder = builder.with_reader( + PeriodicReader::builder(exporter) + .with_interval(std::time::Duration::from_secs(meter_interval)) + .build(), + ); + + if use_stdout { + builder = builder.with_reader(create_periodic_reader(meter_interval)); + } + } + + let meter_provider = builder.build(); + global::set_meter_provider(meter_provider.clone()); + meter_provider + }; + + // initialize logger provider + let logger_provider = { + let mut builder = SdkLoggerProvider::builder().with_resource(res); + + if endpoint.is_empty() { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + } else { + let exporter = opentelemetry_otlp::LogExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .build() + .unwrap(); + + builder = builder.with_batch_exporter(exporter); + + if use_stdout { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + } + } builder.build() }; - global::set_tracer_provider(tracer_provider.clone()); - tracer_provider - }; + // configuring tracing + { + // configure the formatting layer + let fmt_layer = { + let enable_color = std::io::stdout().is_terminal(); + let mut layer = tracing_subscriber::fmt::layer() + .with_target(true) + .with_ansi(enable_color) + .with_thread_names(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true); - // initialize meter provider - let meter_provider = { - let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); + // Only add full span events tracking in the development environment + if environment != ENVIRONMENT { + layer = layer.with_span_events(FmtSpan::FULL); + } - if endpoint.is_empty() { - builder = builder.with_reader(create_periodic_reader(meter_interval)); - } else { - let exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) - .build() - .unwrap(); + layer.with_filter(build_env_filter(logger_level, None)) + }; - builder = builder.with_reader( - PeriodicReader::builder(exporter) - .with_interval(std::time::Duration::from_secs(meter_interval)) - .build(), - ); + let filter = build_env_filter(logger_level, None); + let otel_filter = build_env_filter(logger_level, None); + let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter); + let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); - if use_stdout { - builder = builder.with_reader(create_periodic_reader(meter_interval)); + // Configure registry to avoid repeated calls to filter methods + tracing_subscriber::registry() + .with(filter) + .with(ErrorLayer::default()) + .with(if config.local_logging_enabled.unwrap_or(false) { + Some(fmt_layer) + } else { + None + }) + .with(OpenTelemetryLayer::new(tracer)) + .with(otel_layer) + .with(MetricsLayer::new(meter_provider.clone())) + .init(); + + if !endpoint.is_empty() { + info!( + "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", + endpoint, + logger_level, + std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) + ); } } - let meter_provider = builder.build(); - global::set_meter_provider(meter_provider.clone()); - meter_provider - }; - - // initialize logger provider - let logger_provider = { - let mut builder = SdkLoggerProvider::builder().with_resource(res); - - if endpoint.is_empty() { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); - } else { - let exporter = opentelemetry_otlp::LogExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - .unwrap(); - - builder = builder.with_batch_exporter(exporter); - - if use_stdout { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); - } + OtelGuard { + tracer_provider: Some(tracer_provider), + meter_provider: Some(meter_provider), + logger_provider: Some(logger_provider), + _flexi_logger_handles: flexi_logger_handle, } + } else { + // Obtain the log directory and file name configuration + let log_directory = config.log_directory.as_deref().unwrap_or("logs"); + let log_filename = config.log_filename.as_deref().unwrap_or(service_name); - builder.build() - }; - - // configuring tracing - { - // configure the formatting layer - let fmt_layer = { - let enable_color = std::io::stdout().is_terminal(); - let mut layer = tracing_subscriber::fmt::layer() - .with_target(true) - .with_ansi(enable_color) - .with_thread_names(true) - .with_thread_ids(true) - .with_file(true) - .with_line_number(true); - - // Only add full span events tracking in the development environment - if environment != ENVIRONMENT { - layer = layer.with_span_events(FmtSpan::FULL); + // Build log cutting conditions + let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { + // Cut by time and size at the same time + (Some(time), Some(size)) => { + let age = match time.to_lowercase().as_str() { + "hour" => Age::Hour, + "day" => Age::Day, + "minute" => Age::Minute, + "second" => Age::Second, + _ => Age::Day, // The default is by day + }; + Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes } - - layer.with_filter(build_env_filter(logger_level, None)) + // Cut by time only + (Some(time), None) => { + let age = match time.to_lowercase().as_str() { + "hour" => Age::Hour, + "day" => Age::Day, + "minute" => Age::Minute, + "second" => Age::Second, + _ => Age::Day, // The default is by day + }; + Criterion::Age(age) + } + // Cut by size only + (None, Some(size)) => { + Criterion::Size(size * 1024 * 1024) // Convert to bytes + } + // By default, it is cut by the day + _ => Criterion::Age(Age::Day), }; - let filter = build_env_filter(logger_level, None); - let otel_filter = build_env_filter(logger_level, None); - let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter); - let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); + // The number of log files retained + let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES); - // Configure registry to avoid repeated calls to filter methods - tracing_subscriber::registry() - .with(filter) - .with(ErrorLayer::default()) - .with(if config.local_logging_enabled.unwrap_or(false) { - Some(fmt_layer) - } else { - None - }) - .with(OpenTelemetryLayer::new(tracer)) - .with(otel_layer) - .with(MetricsLayer::new(meter_provider.clone())) - .init(); + // Parsing the log level + let log_spec = LogSpecification::parse(logger_level).unwrap_or(LogSpecification::info()); - if !endpoint.is_empty() { - info!( - "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", - endpoint, - logger_level, - std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) - ); + // Convert the logger_level string to the corresponding LevelFilter + let level_filter = match logger_level.to_lowercase().as_str() { + "trace" => flexi_logger::Duplicate::Trace, + "debug" => flexi_logger::Duplicate::Debug, + "info" => flexi_logger::Duplicate::Info, + "warn" | "warning" => flexi_logger::Duplicate::Warn, + "error" => flexi_logger::Duplicate::Error, + "off" => flexi_logger::Duplicate::None, + _ => flexi_logger::Duplicate::Info, // the default is info + }; + + // Configure the flexi_logger + let flexi_logger_result = flexi_logger::Logger::with(log_spec) + .log_to_file( + FileSpec::default() + .directory(log_directory) + .basename(log_filename) + .suffix("log"), + ) + .rotate(rotation_criterion, Naming::Timestamps, Cleanup::KeepLogFiles(keep_files.into())) + .format_for_files(format_for_file) // Add a custom formatting function for file output + .duplicate_to_stdout(level_filter) // Use dynamic levels + .format_for_stdout(format_with_color) // Add a custom formatting function for terminal output + .write_mode(WriteMode::Async) + .start(); + + if let Ok(logger) = flexi_logger_result { + // Save the logger handle to keep the logging + flexi_logger_handle = Some(logger); + + info!("Flexi logger initialized with file logging to {}/{}.log", log_directory, log_filename); + + // Log logging of log cutting conditions + match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { + (Some(time), Some(size)) => info!( + "Log rotation configured for: every {} or when size exceeds {}MB, keeping {} files", + time, size, keep_files + ), + (Some(time), None) => info!("Log rotation configured for: every {}, keeping {} files", time, keep_files), + (None, Some(size)) => { + info!("Log rotation configured for: when size exceeds {}MB, keeping {} files", size, keep_files) + } + _ => info!("Log rotation configured for: daily, keeping {} files", keep_files), + } + } else { + eprintln!("Failed to initialize flexi_logger: {:?}", flexi_logger_result.err()); } - } - OtelGuard { - tracer_provider, - meter_provider, - logger_provider, + OtelGuard { + tracer_provider: None, + meter_provider: None, + logger_provider: None, + _flexi_logger_handles: flexi_logger_handle, + } } } @@ -271,3 +399,50 @@ fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilte filter } + +/// Custom Log Formatter Function - Terminal Output (with Color) +fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> { + let level = record.level(); + let level_style = style(level); + + // Get the current thread information + let binding = std::thread::current(); + let thread_name = binding.name().unwrap_or("unnamed"); + let thread_id = format!("{:?}", std::thread::current().id()); + + writeln!( + w, + "{} {} [{}] [{}:{}] [{}:{}] {}", + now.now().format("%Y-%m-%d %H:%M:%S%.6f"), + level_style.paint(level.to_string()), + Color::Magenta.paint(record.target()), + Color::Blue.paint(record.file().unwrap_or("unknown")), + Color::Blue.paint(record.line().unwrap_or(0).to_string()), + Color::Green.paint(thread_name), + Color::Green.paint(thread_id), + record.args() + ) +} + +/// Custom Log Formatter - File Output (No Color) +fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> { + let level = record.level(); + + // Get the current thread information + let binding = std::thread::current(); + let thread_name = binding.name().unwrap_or("unnamed"); + let thread_id = format!("{:?}", std::thread::current().id()); + + writeln!( + w, + "{} {} [{}] [{}:{}] [{}:{}] {}", + now.now().format("%Y-%m-%d %H:%M:%S%.6f"), + level, + record.target(), + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + thread_name, + thread_id, + record.args() + ) +} diff --git a/deploy/config/.example.obs.env b/deploy/config/.example.obs.env deleted file mode 100644 index edbf3e88..00000000 --- a/deploy/config/.example.obs.env +++ /dev/null @@ -1,28 +0,0 @@ -#RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 -#RUSTFS__OBSERVABILITY__USE_STDOUT=true -#RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 -#RUSTFS__OBSERVABILITY__METER_INTERVAL=30 -#RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs -#RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 -#RUSTFS__OBSERVABILITY__ENVIRONMENT=develop -#RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug -# -#RUSTFS__SINKS_0__type=Kakfa -#RUSTFS__SINKS_0__brokers=localhost:9092 -#RUSTFS__SINKS_0__topic=logs -#RUSTFS__SINKS_0__batch_size=100 -#RUSTFS__SINKS_0__batch_timeout_ms=1000 -# -#RUSTFS__SINKS_1__type=Webhook -#RUSTFS__SINKS_1__endpoint=http://localhost:8080/webhook -#RUSTFS__SINKS_1__auth_token=you-auth-token -#RUSTFS__SINKS_1__batch_size=100 -#RUSTFS__SINKS_1__batch_timeout_ms=1000 -# -#RUSTFS__SINKS_2__type=File -#RUSTFS__SINKS_2__path=./deploy/logs/rustfs.log -#RUSTFS__SINKS_2__buffer_size=10 -#RUSTFS__SINKS_2__flush_interval_ms=1000 -#RUSTFS__SINKS_2__flush_threshold=100 -# -#RUSTFS__LOGGER__QUEUE_CAPACITY=10 \ No newline at end of file diff --git a/deploy/config/event.example.toml b/deploy/config/event.example.toml deleted file mode 100644 index 6deb67a2..00000000 --- a/deploy/config/event.example.toml +++ /dev/null @@ -1,29 +0,0 @@ -# config.toml -store_path = "./deploy/logs/event_store" -channel_capacity = 5000 - -[[adapters]] -type = "Webhook" -endpoint = "http://127.0.0.1:3020/webhook" -auth_token = "your-auth-token" -max_retries = 3 -timeout = 50 - -[adapters.custom_headers] -custom_server = "value_server" -custom_client = "value_client" - -#[[adapters]] -#type = "Kafka" -#brokers = "localhost:9092" -#topic = "notifications" -#max_retries = 3 -#timeout = 60 -# -#[[adapters]] -#type = "Mqtt" -#broker = "mqtt.example.com" -#port = 1883 -#client_id = "event-notifier" -#topic = "events" -#max_retries = 3 \ No newline at end of file diff --git a/deploy/config/obs-zh.example.toml b/deploy/config/obs-zh.example.toml deleted file mode 100644 index 0ada41af..00000000 --- a/deploy/config/obs-zh.example.toml +++ /dev/null @@ -1,34 +0,0 @@ -[observability] -endpoint = "http://localhost:4317" # 可观测性数据上报的终端地址,默认为"http://localhost:4317" -use_stdout = true # 是否将日志输出到标准输出 -sample_ratio = 2.0 # 采样率,表示每 2 条数据采样 1 条 -meter_interval = 30 # 指标收集间隔,单位为秒 -service_name = "rustfs" # 服务名称,用于标识当前服务 -service_version = "0.1.0" # 服务版本号 -environment = "develop" # 运行环境,如开发环境 (develop) -logger_level = "debug" # 日志级别,可选 debug/info/warn/error 等 -local_logging_enabled = true # 是否启用本地 stdout 日志记录,true 表示启用,false 表示禁用 - -#[[sinks]] # Kafka 接收器配置 -#type = "Kafka" # 是否启用 Kafka 接收器,默认禁用 -#brokers = "localhost:9092" # Kafka 服务器地址 -#topic = "logs" # Kafka 主题名称 -#batch_size = 100 # 批处理大小,每次发送的消息数量 -#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 - -#[[sinks]] # Webhook 接收器配置 -#type = "Webhook" # 是否启用 Webhook 接收器 -#endpoint = "http://localhost:8080/webhook" # Webhook 接收地址 -#auth_token = "" # 认证令牌 -#batch_size = 100 # 批处理大小 -#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 - -[[sinks]] # 文件接收器配置 -type = "File" # 是否启用文件接收器 -path = "./deploy/logs/rustfs.log" # 日志文件路径 -buffer_size = 10 # 缓冲区大小,表示每次写入的字节数 -flush_interval_ms = 100 # 批处理超时时间,单位为毫秒 -flush_threshold = 100 # 刷新阈值,表示在达到该数量时刷新日志 - -[logger] # 日志器配置 -queue_capacity = 10 # 日志队列容量,表示可以缓存的日志条数 \ No newline at end of file diff --git a/deploy/config/obs.example.toml b/deploy/config/obs.example.toml deleted file mode 100644 index 0733b229..00000000 --- a/deploy/config/obs.example.toml +++ /dev/null @@ -1,34 +0,0 @@ -[observability] -endpoint = "http://localhost:4317" # Default is "http://localhost:4317" if not specified -use_stdout = false # Output with stdout, true output, false no output -sample_ratio = 2.0 -meter_interval = 30 -service_name = "rustfs" -service_version = "0.0.1" -environment = "production" # Default is "production" if not specified -logger_level = "info" -local_logging_enabled = true - -#[[sinks]] -#type = "Kafka" -#brokers = "localhost:9092" -#topic = "logs" -#batch_size = 100 # Default is 100 if not specified -#batch_timeout_ms = 100 # Default is 1000ms if not specified -# -#[[sinks]] -#type = "Webhook" -#endpoint = "http://localhost:8080/webhook" -#auth_token = "" -#batch_size = 100 # Default is 3 if not specified -#batch_timeout_ms = 100 # Default is 100ms if not specified - -[[sinks]] -type = "File" -path = "deploy/logs/rustfs.log" -buffer_size = 101 # Default is 8192 bytes if not specified -flush_interval_ms = 1000 -flush_threshold = 100 - -[logger] -queue_capacity = 10000 diff --git a/deploy/config/rustfs-zh.env b/deploy/config/rustfs-zh.env index fd2a5178..37f20e5c 100644 --- a/deploy/config/rustfs-zh.env +++ b/deploy/config/rustfs-zh.env @@ -20,8 +20,8 @@ RUSTFS_SERVER_ENDPOINT="http://127.0.0.1:9000" RUSTFS_SERVER_DOMAINS=127.0.0.1:9002 # RustFS 许可证内容 RUSTFS_LICENSE="license content" -# 可观测性配置文件路径:deploy/config/obs.example.toml -RUSTFS_OBS_CONFIG=/etc/default/obs.toml +# 可观测性配置Endpoint:http://localhost:4317 +RUSTFS_OBS_ENDPOINT=http://localhost:4317 # TLS 证书目录路径:deploy/certs RUSTFS_TLS_PATH=/etc/default/tls # 事件通知配置文件路径:deploy/config/event.example.toml diff --git a/deploy/config/rustfs.env b/deploy/config/rustfs.env index 3f50033f..a8a0d853 100644 --- a/deploy/config/rustfs.env +++ b/deploy/config/rustfs.env @@ -20,8 +20,8 @@ RUSTFS_SERVER_ENDPOINT="http://127.0.0.1:9000" RUSTFS_SERVER_DOMAINS=127.0.0.1:9002 # RustFS license content RUSTFS_LICENSE="license content" -# Observability configuration file path: deploy/config/obs.example.toml -RUSTFS_OBS_CONFIG=/etc/default/obs.toml +# Observability configuration endpoint: RUSTFS_OBS_ENDPOINT +RUSTFS_OBS_ENDPOINT=http://localhost:4317 # TLS certificates directory path: deploy/certs RUSTFS_TLS_PATH=/etc/default/tls # event notification configuration file path: deploy/config/event.example.toml diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 8607efbe..cac01316 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -2385,16 +2385,16 @@ impl DiskAPI for LocalDisk { } let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject); let mut res = HashMap::new(); - let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata).await; + let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata); let buf = match disk.read_metadata(item.path.clone()).await { Ok(buf) => buf, Err(err) => { res.insert("err".to_string(), err.to_string()); - stop_fn(&res).await; + stop_fn(&res); return Err(Error::from_string(ERR_SKIP_FILE)); } }; - done_sz(buf.len() as u64).await; + done_sz(buf.len() as u64); res.insert("metasize".to_string(), buf.len().to_string()); item.transform_meda_dir(); let meta_cache = MetaCacheEntry { @@ -2406,7 +2406,7 @@ impl DiskAPI for LocalDisk { Ok(fivs) => fivs, Err(err) => { res.insert("err".to_string(), err.to_string()); - stop_fn(&res).await; + stop_fn(&res); return Err(Error::from_string(ERR_SKIP_FILE)); } }; @@ -2416,7 +2416,7 @@ impl DiskAPI for LocalDisk { Ok(obj_infos) => obj_infos, Err(err) => { res.insert("err".to_string(), err.to_string()); - stop_fn(&res).await; + stop_fn(&res); return Err(Error::from_string(ERR_SKIP_FILE)); } }; @@ -2432,7 +2432,7 @@ impl DiskAPI for LocalDisk { let done = ScannerMetrics::time(ScannerMetric::ApplyVersion); let sz: usize; (obj_deleted, sz) = item.apply_actions(info, &size_s).await; - done().await; + done(); if obj_deleted { break; @@ -2462,14 +2462,14 @@ impl DiskAPI for LocalDisk { let _obj_info = frer_version.to_object_info(&item.bucket, &item.object_path().to_string_lossy(), versioned); let done = ScannerMetrics::time(ScannerMetric::TierObjSweep); - done().await; + done(); } // todo: global trace if obj_deleted { return Err(Error::from_string(ERR_IGNORE_FILE_CONTRIB)); } - done().await; + done(); Ok(size_s) }) }), diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 627f044e..d8607ed1 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -143,92 +143,169 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn } } +/// Initialize and start the data scanner in the background +/// +/// This function starts a background task that continuously runs the data scanner +/// with randomized intervals between cycles to avoid resource contention. +/// +/// # Features +/// - Graceful shutdown support via cancellation token +/// - Randomized sleep intervals to prevent synchronized scanning across nodes +/// - Minimum sleep duration to avoid excessive CPU usage +/// - Proper error handling and logging +/// +/// # Architecture +/// 1. Initialize with random seed for sleep intervals +/// 2. Run scanner cycles in a loop +/// 3. Use randomized sleep between cycles to avoid thundering herd +/// 4. Ensure minimum sleep duration to prevent CPU thrashing pub async fn init_data_scanner() { + info!("Initializing data scanner background task"); + tokio::spawn(async move { loop { + // Run the data scanner run_data_scanner().await; - let random = { - let mut r = rand::thread_rng(); - r.gen_range(0.0..1.0) - }; - let duration = Duration::from_secs_f64(random * (SCANNER_CYCLE.load(Ordering::SeqCst) as f64)); - let sleep_duration = if duration < Duration::new(1, 0) { - Duration::new(1, 0) - } else { - duration - }; - info!("data scanner will sleeping {sleep_duration:?}"); + // Calculate randomized sleep duration + // Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration + let random_factor: f64 = { + let mut rng = rand::thread_rng(); + rng.gen_range(1.0..10.0) + }; + let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64; + let sleep_duration_secs = random_factor * base_cycle_duration; + + let sleep_duration = Duration::from_secs_f64(sleep_duration_secs); + + info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle"); + + // Sleep with the calculated duration sleep(sleep_duration).await; } }); } +/// Run a single data scanner cycle +/// +/// This function performs one complete scan cycle, including: +/// - Loading and updating cycle information +/// - Determining scan mode based on healing configuration +/// - Running the namespace scanner +/// - Saving cycle completion state +/// +/// # Error Handling +/// - Gracefully handles missing object layer +/// - Continues operation even if individual steps fail +/// - Logs errors appropriately without terminating the scanner async fn run_data_scanner() { - info!("run_data_scanner"); + info!("Starting data scanner cycle"); + + // Get the object layer, return early if not available let Some(store) = new_object_layer_fn() else { - error!("errServerNotInitialized"); + error!("Object layer not initialized, skipping scanner cycle"); return; }; + // Load current cycle information from persistent storage let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) .await - .map_or(Vec::new(), |buf| buf); - - let mut buf_t = Deserializer::new(Cursor::new(buf)); - - let mut cycle_info: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap_or_default(); - - loop { - let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); - cycle_info.current = cycle_info.next; - cycle_info.started = Utc::now(); - { - globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - } - - let bg_heal_info = read_background_heal_info(store.clone()).await; - let scan_mode = - get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; - if bg_heal_info.current_scan_mode != scan_mode { - let mut new_heal_info = bg_heal_info; - new_heal_info.current_scan_mode = scan_mode; - if scan_mode == HEAL_DEEP_SCAN { - new_heal_info.bitrot_start_time = SystemTime::now(); - new_heal_info.bitrot_start_cycle = cycle_info.current; - } - save_background_heal_info(store.clone(), &new_heal_info).await; - } - // Wait before starting next cycle and wait on startup. - let (tx, rx) = mpsc::channel(100); - tokio::spawn(async { - store_data_usage_in_backend(rx).await; + .unwrap_or_else(|err| { + info!(error = %err, "Failed to read cycle info, starting fresh"); + Vec::new() }); - let mut res = HashMap::new(); - res.insert("cycle".to_string(), cycle_info.current.to_string()); - info!("start ns_scanner"); - match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { - Ok(_) => { - info!("ns_scanner completed"); - cycle_info.next += 1; - cycle_info.current = 0; - cycle_info.cycle_completed.push(Utc::now()); - if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { - let _ = cycle_info.cycle_completed.remove(0); - } - globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - let mut wr = Vec::new(); - cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, wr).await; + + let mut cycle_info = if buf.is_empty() { + CurrentScannerCycle::default() + } else { + let mut buf_cursor = Deserializer::new(Cursor::new(buf)); + Deserialize::deserialize(&mut buf_cursor).unwrap_or_else(|err| { + error!(error = %err, "Failed to deserialize cycle info, using default"); + CurrentScannerCycle::default() + }) + }; + + // Start metrics collection for this cycle + let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); + + // Update cycle information + cycle_info.current = cycle_info.next; + cycle_info.started = Utc::now(); + + // Update global scanner metrics + globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; + + // Read background healing information and determine scan mode + let bg_heal_info = read_background_heal_info(store.clone()).await; + let scan_mode = + get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; + + // Update healing info if scan mode changed + if bg_heal_info.current_scan_mode != scan_mode { + let mut new_heal_info = bg_heal_info; + new_heal_info.current_scan_mode = scan_mode; + if scan_mode == HEAL_DEEP_SCAN { + new_heal_info.bitrot_start_time = SystemTime::now(); + new_heal_info.bitrot_start_cycle = cycle_info.current; + } + save_background_heal_info(store.clone(), &new_heal_info).await; + } + + // Set up data usage storage channel + let (tx, rx) = mpsc::channel(100); + tokio::spawn(async move { + let _ = store_data_usage_in_backend(rx).await; + }); + + // Prepare result tracking + let mut scan_result = HashMap::new(); + scan_result.insert("cycle".to_string(), cycle_info.current.to_string()); + + info!( + cycle = cycle_info.current, + scan_mode = ?scan_mode, + "Starting namespace scanner" + ); + + // Run the namespace scanner + match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { + Ok(_) => { + info!(cycle = cycle_info.current, "Namespace scanner completed successfully"); + + // Update cycle completion information + cycle_info.next += 1; + cycle_info.current = 0; + cycle_info.cycle_completed.push(Utc::now()); + + // Maintain cycle completion history (keep only recent cycles) + if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { + let _ = cycle_info.cycle_completed.remove(0); } - Err(err) => { - info!("ns_scanner failed: {:?}", err); - res.insert("error".to_string(), err.to_string()); + + // Update global metrics with completion info + globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; + + // Persist updated cycle information + // ignore error, continue. + let mut serialized_data = Vec::new(); + if let Err(err) = cycle_info.serialize(&mut Serializer::new(&mut serialized_data)) { + error!(error = %err, "Failed to serialize cycle info"); + } else if let Err(err) = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, serialized_data).await { + error!(error = %err, "Failed to save cycle info to storage"); } } - stop_fn(&res).await; - sleep(Duration::from_secs(SCANNER_CYCLE.load(Ordering::SeqCst))).await; + Err(err) => { + error!( + cycle = cycle_info.current, + error = %err, + "Namespace scanner failed" + ); + scan_result.insert("error".to_string(), err.to_string()); + } } + + // Complete metrics collection for this cycle + stop_fn(&scan_result); } #[derive(Debug, Serialize, Deserialize)] @@ -476,7 +553,7 @@ impl ScannerItem { pub async fn apply_actions(&self, oi: &ObjectInfo, _size_s: &SizeSummary) -> (bool, usize) { let done = ScannerMetrics::time(ScannerMetric::Ilm); //todo: lifecycle - done().await; + done(); (false, oi.size) } diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index dedd4417..d5bf1688 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -1,30 +1,30 @@ use chrono::Utc; -use common::globals::GLOBAL_Local_Node_Name; use common::last_minute::{AccElem, LastMinuteLatency}; use lazy_static::lazy_static; use madmin::metrics::ScannerMetrics as M_ScannerMetrics; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::AtomicU64; -use std::sync::Once; -use std::time::{Duration, UNIX_EPOCH}; use std::{ collections::HashMap, - sync::{atomic::Ordering, Arc}, - time::SystemTime, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, }; -use tokio::sync::RwLock; -use tracing::{debug, info}; +use tokio::sync::{Mutex, RwLock}; +use tracing::debug; -use super::data_scanner::{CurrentScannerCycle, UpdateCurrentPathFn}; +use super::data_scanner::CurrentScannerCycle; lazy_static! { - pub static ref globalScannerMetrics: Arc> = Arc::new(RwLock::new(ScannerMetrics::new())); + pub static ref globalScannerMetrics: Arc = Arc::new(ScannerMetrics::new()); } -#[derive(Clone, Debug, PartialEq, PartialOrd)] +/// Scanner metric types, matching the Go version exactly +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] pub enum ScannerMetric { - // START Realtime metrics, that only to records + // START Realtime metrics, that only records // last minute latencies and total operation count. ReadMetadata = 0, CheckMissing, @@ -58,78 +58,387 @@ pub enum ScannerMetric { Last, } -static INIT: Once = Once::new(); +impl ScannerMetric { + /// Convert to string representation for metrics + pub fn as_str(self) -> &'static str { + match self { + Self::ReadMetadata => "read_metadata", + Self::CheckMissing => "check_missing", + Self::SaveUsage => "save_usage", + Self::ApplyAll => "apply_all", + Self::ApplyVersion => "apply_version", + Self::TierObjSweep => "tier_obj_sweep", + Self::HealCheck => "heal_check", + Self::Ilm => "ilm", + Self::CheckReplication => "check_replication", + Self::Yield => "yield", + Self::CleanAbandoned => "clean_abandoned", + Self::ApplyNonCurrent => "apply_non_current", + Self::HealAbandonedVersion => "heal_abandoned_version", + Self::StartTrace => "start_trace", + Self::ScanObject => "scan_object", + Self::HealAbandonedObject => "heal_abandoned_object", + Self::LastRealtime => "last_realtime", + Self::ScanFolder => "scan_folder", + Self::ScanCycle => "scan_cycle", + Self::ScanBucketDrive => "scan_bucket_drive", + Self::CompactFolder => "compact_folder", + Self::Last => "last", + } + } + /// Convert from index back to enum (safe version) + pub fn from_index(index: usize) -> Option { + if index >= Self::Last as usize { + return None; + } + // Safe conversion using match instead of unsafe transmute + match index { + 0 => Some(Self::ReadMetadata), + 1 => Some(Self::CheckMissing), + 2 => Some(Self::SaveUsage), + 3 => Some(Self::ApplyAll), + 4 => Some(Self::ApplyVersion), + 5 => Some(Self::TierObjSweep), + 6 => Some(Self::HealCheck), + 7 => Some(Self::Ilm), + 8 => Some(Self::CheckReplication), + 9 => Some(Self::Yield), + 10 => Some(Self::CleanAbandoned), + 11 => Some(Self::ApplyNonCurrent), + 12 => Some(Self::HealAbandonedVersion), + 13 => Some(Self::StartTrace), + 14 => Some(Self::ScanObject), + 15 => Some(Self::HealAbandonedObject), + 16 => Some(Self::LastRealtime), + 17 => Some(Self::ScanFolder), + 18 => Some(Self::ScanCycle), + 19 => Some(Self::ScanBucketDrive), + 20 => Some(Self::CompactFolder), + 21 => Some(Self::Last), + _ => None, + } + } +} + +/// Thread-safe wrapper for LastMinuteLatency with atomic operations #[derive(Default)] pub struct LockedLastMinuteLatency { - cached_sec: AtomicU64, - cached: AccElem, - mu: RwLock, - latency: LastMinuteLatency, + latency: Arc>, } impl Clone for LockedLastMinuteLatency { fn clone(&self) -> Self { Self { - cached_sec: AtomicU64::new(0), - cached: self.cached.clone(), - mu: RwLock::new(true), - latency: self.latency.clone(), + latency: Arc::clone(&self.latency), } } } impl LockedLastMinuteLatency { - pub async fn add(&mut self, value: &Duration) { - self.add_size(value, 0).await; - } - - pub async fn add_size(&mut self, value: &Duration, sz: u64) { - let t = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs(); - INIT.call_once(|| { - self.cached = AccElem::default(); - self.cached_sec.store(t, Ordering::SeqCst); - }); - let last_t = self.cached_sec.load(Ordering::SeqCst); - if last_t != t - && self - .cached_sec - .compare_exchange(last_t, t, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { - let old = self.cached.clone(); - self.cached = AccElem::default(); - let a = AccElem { - size: old.size, - total: old.total, - n: old.n, - }; - let _ = self.mu.write().await; - self.latency.add_all(t - 1, &a); + pub fn new() -> Self { + Self { + latency: Arc::new(Mutex::new(LastMinuteLatency::default())), } - self.cached.n += 1; - self.cached.total += value.as_secs(); - self.cached.size += sz; } - pub async fn total(&mut self) -> AccElem { - let _ = self.mu.read().await; - self.latency.get_total() + /// Add a duration measurement + pub async fn add(&self, duration: Duration) { + self.add_size(duration, 0).await; + } + + /// Add a duration measurement with size + pub async fn add_size(&self, duration: Duration, size: u64) { + let mut latency = self.latency.lock().await; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let elem = AccElem { + n: 1, + total: duration.as_secs(), + size, + }; + latency.add_all(now, &elem); + } + + /// Get total accumulated metrics for the last minute + pub async fn total(&self) -> AccElem { + let mut latency = self.latency.lock().await; + latency.get_total() } } -pub type LogFn = Arc) -> Pin + Send>> + Send + Sync + 'static>; -pub type TimeSizeFn = Arc Pin + Send>> + Send + Sync + 'static>; -pub type TimeFn = Arc Pin + Send>> + Send + Sync + 'static>; +/// Current path tracker for monitoring active scan paths +struct CurrentPathTracker { + current_path: Arc>, +} +impl CurrentPathTracker { + fn new(initial_path: String) -> Self { + Self { + current_path: Arc::new(RwLock::new(initial_path)), + } + } + + async fn update_path(&self, path: String) { + *self.current_path.write().await = path; + } + + async fn get_path(&self) -> String { + self.current_path.read().await.clone() + } +} + +/// Main scanner metrics structure pub struct ScannerMetrics { + // All fields must be accessed atomically and aligned. operations: Vec, latency: Vec, - cycle_info: RwLock>, - current_paths: HashMap, + + // Current paths contains disk -> tracker mappings + current_paths: Arc>>>, + + // Cycle information + cycle_info: Arc>>, +} + +impl ScannerMetrics { + pub fn new() -> Self { + let operations = (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect(); + + let latency = (0..ScannerMetric::LastRealtime as usize) + .map(|_| LockedLastMinuteLatency::new()) + .collect(); + + Self { + operations, + latency, + current_paths: Arc::new(RwLock::new(HashMap::new())), + cycle_info: Arc::new(RwLock::new(None)), + } + } + + /// Log scanner action with custom metadata - compatible with existing usage + pub fn log(metric: ScannerMetric) -> impl Fn(&HashMap) { + let start_time = SystemTime::now(); + move |_custom: &HashMap| { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task for this) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add(duration).await; + }); + } + + // Log trace metrics + if metric as u8 > ScannerMetric::StartTrace as u8 { + debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric"); + } + } + } + + /// Time scanner action with size - returns function that takes size + pub fn time_size(metric: ScannerMetric) -> impl Fn(u64) { + let start_time = SystemTime::now(); + move |size: u64| { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics with size (spawn async task) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add_size(duration, size).await; + }); + } + } + } + + /// Time a scanner action - returns a closure to call when done + pub fn time(metric: ScannerMetric) -> impl Fn() { + let start_time = SystemTime::now(); + move || { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add(duration).await; + }); + } + } + } + + /// Time N scanner actions - returns function that takes count, then returns completion function + pub fn time_n(metric: ScannerMetric) -> Box Box + Send + Sync> { + let start_time = SystemTime::now(); + Box::new(move |count: usize| { + Box::new(move || { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(count as u64, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task) + if (metric as usize) < ScannerMetric::LastRealtime as usize { + let metric_index = metric as usize; + tokio::spawn(async move { + globalScannerMetrics.latency[metric_index].add(duration).await; + }); + } + }) + }) + } + + /// Increment time with specific duration + pub async fn inc_time(metric: ScannerMetric, duration: Duration) { + // Update operation count + globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics + if (metric as usize) < ScannerMetric::LastRealtime as usize { + globalScannerMetrics.latency[metric as usize].add(duration).await; + } + } + + /// Get lifetime operation count for a metric + pub fn lifetime(&self, metric: ScannerMetric) -> u64 { + if (metric as usize) >= ScannerMetric::Last as usize { + return 0; + } + self.operations[metric as usize].load(Ordering::Relaxed) + } + + /// Get last minute statistics for a metric + pub async fn last_minute(&self, metric: ScannerMetric) -> AccElem { + if (metric as usize) >= ScannerMetric::LastRealtime as usize { + return AccElem::default(); + } + self.latency[metric as usize].total().await + } + + /// Set current cycle information + pub async fn set_cycle(&self, cycle: Option) { + *self.cycle_info.write().await = cycle; + } + + /// Get current cycle information + pub async fn get_cycle(&self) -> Option { + self.cycle_info.read().await.clone() + } + + /// Get current active paths + pub async fn get_current_paths(&self) -> Vec { + let mut result = Vec::new(); + let paths = self.current_paths.read().await; + + for (disk, tracker) in paths.iter() { + let path = tracker.get_path().await; + result.push(format!("{}/{}", disk, path)); + } + + result + } + + /// Get number of active drives + pub async fn active_drives(&self) -> usize { + self.current_paths.read().await.len() + } + + /// Generate metrics report + pub async fn report(&self) -> M_ScannerMetrics { + let mut metrics = M_ScannerMetrics::default(); + + // Set cycle information + if let Some(cycle) = self.get_cycle().await { + metrics.current_cycle = cycle.current; + metrics.cycles_completed_at = cycle.cycle_completed; + metrics.current_started = cycle.started; + } + + metrics.collected_at = Utc::now(); + metrics.active_paths = self.get_current_paths().await; + + // Lifetime operations + for i in 0..ScannerMetric::Last as usize { + let count = self.operations[i].load(Ordering::Relaxed); + if count > 0 { + if let Some(metric) = ScannerMetric::from_index(i) { + metrics.life_time_ops.insert(metric.as_str().to_string(), count); + } + } + } + + // Last minute statistics for realtime metrics + for i in 0..ScannerMetric::LastRealtime as usize { + let last_min = self.latency[i].total().await; + if last_min.n > 0 { + if let Some(_metric) = ScannerMetric::from_index(i) { + // Convert to madmin TimedAction format if needed + // This would require implementing the conversion + } + } + } + + metrics + } +} + +// Type aliases for compatibility with existing code +pub type UpdateCurrentPathFn = Arc Pin + Send>> + Send + Sync>; +pub type CloseDiskFn = Arc Pin + Send>> + Send + Sync>; + +/// Create a current path updater for tracking scan progress +pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { + let tracker = Arc::new(CurrentPathTracker::new(initial.to_string())); + let disk_name = disk.to_string(); + + // Store the tracker in global metrics + let tracker_clone = Arc::clone(&tracker); + let disk_clone = disk_name.clone(); + tokio::spawn(async move { + globalScannerMetrics + .current_paths + .write() + .await + .insert(disk_clone, tracker_clone); + }); + + let update_fn = { + let tracker = Arc::clone(&tracker); + Arc::new(move |path: &str| -> Pin + Send>> { + let tracker = Arc::clone(&tracker); + let path = path.to_string(); + Box::pin(async move { + tracker.update_path(path).await; + }) + }) + }; + + let done_fn = { + let disk_name = disk_name.clone(); + Arc::new(move || -> Pin + Send>> { + let disk_name = disk_name.clone(); + Box::pin(async move { + globalScannerMetrics.current_paths.write().await.remove(&disk_name); + }) + }) + }; + + (update_fn, done_fn) } impl Default for ScannerMetrics { @@ -137,123 +446,3 @@ impl Default for ScannerMetrics { Self::new() } } - -impl ScannerMetrics { - pub fn new() -> Self { - Self { - operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect(), - latency: vec![LockedLastMinuteLatency::default(); ScannerMetric::LastRealtime as usize], - cycle_info: RwLock::new(None), - current_paths: HashMap::new(), - } - } - - pub async fn set_cycle(&mut self, c: Option) { - debug!("ScannerMetrics set_cycle {c:?}"); - *self.cycle_info.write().await = c; - } - - pub fn log(s: ScannerMetric) -> LogFn { - let start = SystemTime::now(); - let s_clone = s as usize; - Arc::new(move |_custom: &HashMap| { - Box::pin(async move { - let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); - let mut sm_w = globalScannerMetrics.write().await; - sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); - if s_clone < ScannerMetric::LastRealtime as usize { - sm_w.latency[s_clone].add(&duration).await; - } - }) - }) - } - - pub async fn time_size(s: ScannerMetric) -> TimeSizeFn { - let start = SystemTime::now(); - let s_clone = s as usize; - Arc::new(move |sz: u64| { - Box::pin(async move { - let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); - let mut sm_w = globalScannerMetrics.write().await; - sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); - if s_clone < ScannerMetric::LastRealtime as usize { - sm_w.latency[s_clone].add_size(&duration, sz).await; - } - }) - }) - } - - pub fn time(s: ScannerMetric) -> TimeFn { - let start = SystemTime::now(); - let s_clone = s as usize; - Arc::new(move || { - Box::pin(async move { - let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); - let mut sm_w = globalScannerMetrics.write().await; - sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst); - if s_clone < ScannerMetric::LastRealtime as usize { - sm_w.latency[s_clone].add(&duration).await; - } - }) - }) - } - - pub async fn get_cycle(&self) -> Option { - let r = self.cycle_info.read().await; - if let Some(c) = r.as_ref() { - return Some(c.clone()); - } - None - } - - pub async fn get_current_paths(&self) -> Vec { - let mut res = Vec::new(); - let prefix = format!("{}/", GLOBAL_Local_Node_Name.read().await); - self.current_paths.iter().for_each(|(k, v)| { - res.push(format!("{}/{}/{}", prefix, k, v)); - }); - res - } - - pub async fn report(&self) -> M_ScannerMetrics { - let mut m = M_ScannerMetrics::default(); - if let Some(cycle) = self.get_cycle().await { - info!("cycle: {cycle:?}"); - m.current_cycle = cycle.current; - m.cycles_completed_at = cycle.cycle_completed; - m.current_started = cycle.started; - } - m.collected_at = Utc::now(); - m.active_paths = self.get_current_paths().await; - for (i, v) in self.operations.iter().enumerate() { - m.life_time_ops.insert(i.to_string(), v.load(Ordering::SeqCst)); - } - - m - } -} - -pub type CloseDiskFn = Arc Pin + Send>> + Send + Sync + 'static>; -pub fn current_path_updater(disk: &str, _initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { - let disk_1 = disk.to_string(); - let disk_2 = disk.to_string(); - ( - Arc::new(move |path: &str| { - let disk_inner = disk_1.clone(); - let path = path.to_string(); - Box::pin(async move { - globalScannerMetrics - .write() - .await - .current_paths - .insert(disk_inner, path.to_string()); - }) - }), - Arc::new(move || { - let disk_inner = disk_2.clone(); - Box::pin(async move { - globalScannerMetrics.write().await.current_paths.remove(&disk_inner); - }) - }), - ) -} diff --git a/ecstore/src/metrics_realtime.rs b/ecstore/src/metrics_realtime.rs index 5bfc189b..509bc76b 100644 --- a/ecstore/src/metrics_realtime.rs +++ b/ecstore/src/metrics_realtime.rs @@ -93,7 +93,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) if types.contains(&MetricType::SCANNER) { info!("start get scanner metrics"); - let metrics = globalScannerMetrics.read().await.report().await; + let metrics = globalScannerMetrics.report().await; real_time_metrics.aggregated.scanner = Some(metrics); } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index b40a8abd..045cc676 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -2893,7 +2893,7 @@ impl SetDisks { } pub async fn ns_scanner( - &self, + self: Arc, buckets: &[BucketInfo], want_cycle: u32, updates: Sender, @@ -2911,7 +2911,7 @@ impl SetDisks { return Ok(()); } - let old_cache = DataUsageCache::load(self, DATA_USAGE_CACHE_NAME).await?; + let old_cache = DataUsageCache::load(&self, DATA_USAGE_CACHE_NAME).await?; let mut cache = DataUsageCache { info: DataUsageCacheInfo { name: DATA_USAGE_ROOT.to_string(), @@ -2934,6 +2934,7 @@ impl SetDisks { permutes.shuffle(&mut rng); permutes }; + // Add new buckets first for idx in permutes.iter() { let b = buckets[*idx].clone(); @@ -2954,6 +2955,7 @@ impl SetDisks { Duration::from_secs(30) + Duration::from_secs_f64(10.0 * rng.gen_range(0.0..1.0)) }; let mut ticker = interval(update_time); + let task = tokio::spawn(async move { let last_save = Some(SystemTime::now()); let mut need_loop = true; @@ -2983,8 +2985,8 @@ impl SetDisks { } } }); + // Restrict parallelism for disk usage scanner - // upto GOMAXPROCS if GOMAXPROCS is < len(disks) let max_procs = num_cpus::get(); if max_procs < disks.len() { disks = disks[0..max_procs].to_vec(); @@ -2997,6 +2999,7 @@ impl SetDisks { Some(disk) => disk.clone(), None => continue, }; + let self_clone = Arc::clone(&self); let bucket_rx_clone = bucket_rx.clone(); let buckets_results_tx_clone = buckets_results_tx.clone(); futures.push(async move { @@ -3005,7 +3008,7 @@ impl SetDisks { Err(_) => return, Ok(bucket_info) => { let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME); - let mut cache = match DataUsageCache::load(self, &cache_name.to_string_lossy()).await { + let mut cache = match DataUsageCache::load(&self_clone, &cache_name.to_string_lossy()).await { Ok(cache) => cache, Err(_) => continue, }; @@ -3022,6 +3025,7 @@ impl SetDisks { ..Default::default() }; } + // Collect updates. let (tx, mut rx) = mpsc::channel(1); let buckets_results_tx_inner_clone = buckets_results_tx_clone.clone(); @@ -3042,9 +3046,10 @@ impl SetDisks { } } }); + // Calc usage let before = cache.info.last_update; - let mut cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode, None).await { + let mut cache = match disk.ns_scanner(&cache, tx, heal_scan_mode, None).await { Ok(cache) => cache, Err(_) => { if cache.info.last_update > before { @@ -3080,9 +3085,9 @@ impl SetDisks { } }); } + info!("ns_scanner start"); let _ = join_all(futures).await; - drop(buckets_results_tx); let _ = task.await; info!("ns_scanner completed"); Ok(()) diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index dabdae4d..608f06be 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -828,6 +828,7 @@ impl ECStore { } }); if let Err(err) = set + .clone() .ns_scanner(&all_buckets_clone, want_cycle as u32, tx, heal_scan_mode) .await { diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index a93b6dda..a720221f 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -64,8 +64,8 @@ pub struct Opt { /// Observability configuration file /// Default value: config/obs.toml - #[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")] - pub obs_config: String, + #[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_ENDPOINT.to_string(), env = "RUSTFS_OBS_ENDPOINT")] + pub obs_endpoint: String, /// tls path for rustfs api and console. #[arg(long, env = "RUSTFS_TLS_PATH")] diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e6c4a44c..8c39c232 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -104,7 +104,7 @@ async fn main() -> Result<()> { init_license(opt.license.clone()); // Load the configuration file - let config = load_config(Some(opt.clone().obs_config)); + let config = load_config(Some(opt.clone().obs_endpoint)); // Initialize Observability let (_logger, guard) = init_obs(config.clone()).await; diff --git a/rustfs/static/.gitkeep b/rustfs/static/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/scripts/run.sh b/scripts/run.sh index 39bd4dc2..86f44e46 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -31,42 +31,42 @@ export RUSTFS_VOLUMES="./target/volume/test{0...4}" # export RUSTFS_VOLUMES="./target/volume/test" export RUSTFS_ADDRESS=":9000" export RUSTFS_CONSOLE_ENABLE=true -export RUSTFS_CONSOLE_ADDRESS=":9002" +export RUSTFS_CONSOLE_ADDRESS=":9001" # export RUSTFS_SERVER_DOMAINS="localhost:9000" # HTTPS 证书目录 # export RUSTFS_TLS_PATH="./deploy/certs" -# 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中 `RUSTFS_OBS_CONFIG` 和下面变量二选一 -# export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" +# 可观测性 相关配置信息 +#export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector 的地址 +#export RUSTFS_OBS_USE_STDOUT=false # 是否使用标准输出 +#export RUSTFS_OBS_SAMPLE_RATIO=2.0 # 采样率,0.0-1.0之间,0.0表示不采样,1.0表示全部采样 +#export RUSTFS_OBS_METER_INTERVAL=1 # 采样间隔,单位为秒 +#export RUSTFS_OBS_SERVICE_NAME=rustfs # 服务名称 +#export RUSTFS_OBS_SERVICE_VERSION=0.1.0 # 服务版本 +#export RUSTFS_OBS_ENVIRONMENT=develop # 环境名称 +export RUSTFS_OBS_LOGGER_LEVEL=debug # 日志级别,支持 trace, debug, info, warn, error +export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # 是否启用本地日志记录 +export RUSTFS_OBS_LOG_DIRECTORY="./deploy/logs" # Log directory +export RUSTFS_OBS_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day" +export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB -# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 -export RUSTFS_OBSERVABILITY_ENDPOINT=http://localhost:4317 -export RUSTFS_OBSERVABILITY_USE_STDOUT=false -export RUSTFS_OBSERVABILITY_SAMPLE_RATIO=2.0 -export RUSTFS_OBSERVABILITY_METER_INTERVAL=31 -export RUSTFS_OBSERVABILITY_SERVICE_NAME=rustfs -export RUSTFS_OBSERVABILITY_SERVICE_VERSION=0.1.0 -export RUSTFS_OBSERVABILITY_ENVIRONMENT=develop -export RUSTFS_OBSERVABILITY_LOGGER_LEVEL=debug -export RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED=true # -#export RUSTFS_SINKS_type=File -export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log -#export RUSTFS_SINKS_buffer_size=12 -#export RUSTFS_SINKS_flush_interval_ms=1000 -#export RUSTFS_SINKS_flush_threshold=100 +#export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log +#export RUSTFS_SINKS_FILE_BUFFER_SIZE=12 +#export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000 +#export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100 # -#export RUSTFS_SINKS_type=Kakfa +# Kafka sink 配置 #export RUSTFS_SINKS_KAFKA_BROKERS=localhost:9092 #export RUSTFS_SINKS_KAFKA_TOPIC=logs -#export RUSTFS_SINKS_batch_size=100 -#export RUSTFS_SINKS_batch_timeout_ms=1000 +#export RUSTFS_SINKS_KAFKA_BATCH_SIZE=100 +#export RUSTFS_SINKS_KAFKA_BATCH_TIMEOUT_MS=1000 # -#export RUSTFS_SINKS_type=Webhook +# Webhook sink 配置 #export RUSTFS_SINKS_WEBHOOK_ENDPOINT=http://localhost:8080/webhook #export RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN=you-auth-token -#export RUSTFS_SINKS_batch_size=100 -#export RUSTFS_SINKS_batch_timeout_ms=1000 +#export RUSTFS_SINKS_WEBHOOK_BATCH_SIZE=100 +#export RUSTFS_SINKS_WEBHOOK_BATCH_TIMEOUT_MS=1000 # #export RUSTFS_LOGGER_QUEUE_CAPACITY=10