Refactor Telemetry Initialization and Environment Utilities (#811)

* improve code for metrics

* improve code for metrics

* fix

* fix

* Refactor telemetry initialization and environment functions ordering

- Reorder functions in envs.rs by type size (8-bit to 64-bit, signed before unsigned) and add missing variants like get_env_opt_u16.
- Optimize init_telemetry to support three modes: stdout logging (default error level with span tracing), file rolling logs (size-based with retention), and HTTP-based observability with sub-endpoints (trace, metric, log) falling back to unified endpoint.
- Fix stdout logging issue by retaining WorkerGuard in OtelGuard to prevent premature release of async writer threads.
- Enhance observability mode with HTTP protocol, compression, and proper resource management.
- Update OtelGuard to include tracing_guard for stdout and flexi_logger_handles for file logging.
- Improve error handling and configuration extraction in OtelConfig.

* fix

* up

* fix

* fix

* improve code for obs

* fix

* fix
This commit is contained in:
houseme
2025-11-07 20:01:54 +08:00
committed by GitHub
parent e823922654
commit 29056a767a
47 changed files with 1237 additions and 743 deletions

View File

@@ -16,7 +16,7 @@ services:
tempo-init:
image: busybox:latest
command: ["sh", "-c", "chown -R 10001:10001 /var/tempo"]
command: [ "sh", "-c", "chown -R 10001:10001 /var/tempo" ]
volumes:
- ./tempo-data:/var/tempo
user: root
@@ -39,7 +39,7 @@ services:
- otel-network
otel-collector:
image: otel/opentelemetry-collector-contrib:0.129.1
image: otel/opentelemetry-collector-contrib:latest
environment:
- TZ=Asia/Shanghai
volumes:
@@ -55,7 +55,7 @@ services:
networks:
- otel-network
jaeger:
image: jaegertracing/jaeger:2.8.0
image: jaegertracing/jaeger:latest
environment:
- TZ=Asia/Shanghai
ports:
@@ -65,17 +65,21 @@ services:
networks:
- otel-network
prometheus:
image: prom/prometheus:v3.4.2
image: prom/prometheus:latest
environment:
- TZ=Asia/Shanghai
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--web.enable-otlp-receiver' # Enable OTLP
- '--enable-feature=promql-experimental-functions' # Enable info()
networks:
- otel-network
loki:
image: grafana/loki:3.5.1
image: grafana/loki:latest
environment:
- TZ=Asia/Shanghai
volumes:
@@ -86,7 +90,7 @@ services:
networks:
- otel-network
grafana:
image: grafana/grafana:12.0.2
image: grafana/grafana:latest
ports:
- "3000:3000" # Web UI
volumes:

View File

@@ -29,4 +29,80 @@ datasources:
serviceMap:
datasourceUid: prometheus
streamingEnabled:
search: true
search: true
tracesToLogsV2:
# Field with an internal link pointing to a logs data source in Grafana.
# datasourceUid value must match the uid value of the logs data source.
datasourceUid: 'loki'
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
tags: [ 'job', 'instance', 'pod', 'namespace' ]
filterByTraceID: false
filterBySpanID: false
customQuery: true
query: 'method="$${__span.tags.method}"'
tracesToMetrics:
datasourceUid: 'prom'
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
tags: [ { key: 'service.name', value: 'service' }, { key: 'job' } ]
queries:
- name: 'Sample query'
query: 'sum(rate(traces_spanmetrics_latency_bucket{$$__tags}[5m]))'
tracesToProfiles:
datasourceUid: 'grafana-pyroscope-datasource'
tags: [ 'job', 'instance', 'pod', 'namespace' ]
profileTypeId: 'process_cpu:cpu:nanoseconds:cpu:nanoseconds'
customQuery: true
query: 'method="$${__span.tags.method}"'
serviceMap:
datasourceUid: 'prometheus'
nodeGraph:
enabled: true
search:
hide: false
traceQuery:
timeShiftEnabled: true
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
spanBar:
type: 'Tag'
tag: 'http.path'
streamingEnabled:
search: true
- name: Jaeger
type: jaeger
uid: Jaeger
url: http://jaeger:16686
basicAuth: false
access: proxy
readOnly: false
isDefault: false
jsonData:
tracesToLogsV2:
# Field with an internal link pointing to a logs data source in Grafana.
# datasourceUid value must match the uid value of the logs data source.
datasourceUid: 'loki'
spanStartTimeShift: '1h'
spanEndTimeShift: '-1h'
tags: [ 'job', 'instance', 'pod', 'namespace' ]
filterByTraceID: false
filterBySpanID: false
customQuery: true
query: 'method="$${__span.tags.method}"'
tracesToMetrics:
datasourceUid: 'prom'
spanStartTimeShift: '1h'
spanEndTimeShift: '-1h'
tags: [ { key: 'service.name', value: 'service' }, { key: 'job' } ]
queries:
- name: 'Sample query'
query: 'sum(rate(traces_spanmetrics_latency_bucket{$$__tags}[5m]))'
nodeGraph:
enabled: true
traceQuery:
timeShiftEnabled: true
spanStartTimeShift: '1h'
spanEndTimeShift: '-1h'
spanBar:
type: 'None'

View File

@@ -63,6 +63,7 @@ ruler:
frontend:
encoding: protobuf
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#

View File

@@ -43,7 +43,6 @@ exporters:
send_timestamps: true # 发送时间戳
# enable_open_metrics: true
otlphttp/loki: # Loki 导出器,用于日志数据
# endpoint: "http://loki:3100/otlp/v1/logs"
endpoint: "http://loki:3100/otlp/v1/logs"
tls:
insecure: true

View File

@@ -13,16 +13,43 @@
# limitations under the License.
global:
scrape_interval: 5s # 刮取间隔
scrape_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: [ 'otel-collector:8888' ] # Collector 刮取指标
- targets: [ 'otel-collector:8888' ] # Scrape metrics from Collector
- job_name: 'otel-metrics'
static_configs:
- targets: [ 'otel-collector:8889' ] # 应用指标
- targets: [ 'otel-collector:8889' ] # Application indicators
- job_name: 'tempo'
static_configs:
- targets: [ 'tempo:3200' ]
- targets: [ 'tempo:3200' ] # Scrape metrics from Tempo
otlp:
# Recommended attributes to be promoted to labels.
promote_resource_attributes:
- service.instance.id
- service.name
- service.namespace
- cloud.availability_zone
- cloud.region
- container.name
- deployment.environment.name
- k8s.cluster.name
- k8s.container.name
- k8s.cronjob.name
- k8s.daemonset.name
- k8s.deployment.name
- k8s.job.name
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.statefulset.name
# Ingest OTLP data keeping all characters in metric/label names.
translation_strategy: NoUTF8EscapingWithSuffixes
storage:
# OTLP is a push-based protocol, Out of order samples is a common scenario.
tsdb:
out_of_order_time_window: 30m

215
Cargo.lock generated
View File

@@ -505,7 +505,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -527,7 +527,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -538,7 +538,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -564,7 +564,7 @@ checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -605,9 +605,9 @@ dependencies = [
[[package]]
name = "aws-credential-types"
version = "1.2.8"
version = "1.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf26925f4a5b59eb76722b63c2892b1d70d06fa053c72e4a100ec308c1d47bc"
checksum = "86590e57ea40121d47d3f2e131bfd873dea15d78dc2f4604f4734537ad9e56c4"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
@@ -640,9 +640,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.5.13"
version = "1.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f2402da1a5e16868ba98725e5d73f26b8116eaa892e56f2cd0bf5eec7985f70"
checksum = "8fe0fd441565b0b318c76e7206c8d1d0b0166b3e986cf30e890b61feb6192045"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@@ -665,9 +665,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.110.0"
version = "1.112.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a811ec867f77c01aa0f0abfaa9fedef647cc83608ad8e67949f95d30d04a7fd"
checksum = "eee73a27721035c46da0572b390a69fbdb333d0177c24f3d8f7ff952eeb96690"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -699,9 +699,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
version = "1.88.0"
version = "1.89.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05b276777560aa9a196dbba2e3aada4d8006d3d7eeb3ba7fe0c317227d933c4"
checksum = "a9c1b1af02288f729e95b72bd17988c009aa72e26dcb59b3200f86d7aea726c9"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -721,9 +721,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
version = "1.90.0"
version = "1.91.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9be14d6d9cd761fac3fd234a0f47f7ed6c0df62d83c0eeb7012750e4732879b"
checksum = "4e8122301558dc7c6c68e878af918880b82ff41897a60c8c4e18e4dc4d93e9f1"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -743,9 +743,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
version = "1.90.0"
version = "1.91.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98a862d704c817d865c8740b62d8bbeb5adcb30965e93b471df8a5bcefa20a80"
checksum = "8f8090151d4d1e971269957b10dbf287bba551ab812e591ce0516b1c73b75d27"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -805,9 +805,9 @@ dependencies = [
[[package]]
name = "aws-smithy-checksums"
version = "0.63.10"
version = "0.63.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb9a26b2831e728924ec0089e92697a78a2f9cdcf90d81e8cfcc6a6c85080369"
checksum = "95bd108f7b3563598e4dc7b62e1388c9982324a2abd622442167012690184591"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
@@ -1193,7 +1193,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -1273,7 +1273,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -1614,7 +1614,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -1707,7 +1707,7 @@ checksum = "a08a8aee16926ee1c4ad18868b8c3dfe5106359053f91e035861ec2a17116988"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -1738,9 +1738,9 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
[[package]]
name = "convert_case"
version = "0.8.0"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f"
checksum = "db05ffb6856bf0ecdf6367558a76a0e8a77b1713044eb92845c692100ed50190"
dependencies = [
"unicode-segmentation",
]
@@ -1806,15 +1806,15 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc-fast"
version = "1.3.0"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bf62af4cc77d8fe1c22dde4e721d87f2f54056139d8c412e1366b740305f56f"
checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3"
dependencies = [
"crc",
"digest 0.10.7",
"libc",
"rand 0.9.2",
"regex",
"rustversion",
]
[[package]]
@@ -1963,9 +1963,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.2.0-rc.4"
version = "0.2.0-rc.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8235645834fbc6832939736ce2f2d08192652269e11010a6240f61b908a1c6"
checksum = "919bd05924682a5480aec713596b9e2aabed3a0a6022fab6847f85a99e5f190a"
dependencies = [
"hybrid-array",
]
@@ -2024,7 +2024,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2082,7 +2082,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2096,7 +2096,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2118,7 +2118,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2129,7 +2129,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core 0.21.3",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2604,7 +2604,7 @@ checksum = "ec6f637bce95efac05cdfb9b6c19579ed4aa5f6b94d951cfa5bb054b7bb4f730"
dependencies = [
"datafusion-expr",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2843,7 +2843,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2885,7 +2885,7 @@ dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2905,7 +2905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
"derive_builder_core 0.20.2",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -2925,7 +2925,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
"unicode-xid",
]
@@ -2955,7 +2955,7 @@ checksum = "6c478574b20020306f98d61c8ca3322d762e1ff08117422ac6106438605ea516"
dependencies = [
"block-buffer 0.11.0-rc.5",
"const-oid 0.10.1",
"crypto-common 0.2.0-rc.4",
"crypto-common 0.2.0-rc.5",
"subtle",
]
@@ -2988,7 +2988,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -3159,7 +3159,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -3180,7 +3180,7 @@ dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -3219,7 +3219,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -3230,9 +3230,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "erased-serde"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b"
checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3"
dependencies = [
"serde",
"serde_core",
@@ -3500,7 +3500,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -4566,9 +4566,9 @@ dependencies = [
[[package]]
name = "jsonwebtoken"
version = "10.1.0"
version = "10.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d119c6924272d16f0ab9ce41f7aa0bfef9340c00b0bb7ca3dd3b263d4a9150b"
checksum = "c76e1c7d7df3e34443b3621b459b066a7b79644f059fc8b2db7070c825fd417e"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",
@@ -5490,6 +5490,7 @@ dependencies = [
"bytes",
"http 1.3.1",
"opentelemetry 0.31.0",
"reqwest",
]
[[package]]
@@ -5504,10 +5505,10 @@ dependencies = [
"opentelemetry-proto",
"opentelemetry_sdk 0.31.0",
"prost 0.14.1",
"reqwest",
"thiserror 2.0.17",
"tokio",
"tonic",
"tracing",
"zstd",
]
[[package]]
@@ -5839,7 +5840,7 @@ dependencies = [
"phf_shared 0.11.3",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -5877,7 +5878,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6072,7 +6073,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6131,7 +6132,7 @@ dependencies = [
"pulldown-cmark",
"pulldown-cmark-to-cmark",
"regex",
"syn 2.0.108",
"syn 2.0.109",
"tempfile",
]
@@ -6145,7 +6146,7 @@ dependencies = [
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6158,7 +6159,7 @@ dependencies = [
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6338,9 +6339,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.41"
version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1"
checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f"
dependencies = [
"proc-macro2",
]
@@ -6453,7 +6454,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b"
dependencies = [
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6505,7 +6506,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6635,9 +6636,9 @@ dependencies = [
[[package]]
name = "rmcp"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2c66318b30535ccd0d3b39afaa4240d6b5a35328fb7672a28e3386c97472805"
checksum = "e5947688160b56fb6c827e3c20a72c90392a1d7e9dec74749197aa1780ac42ca"
dependencies = [
"base64 0.22.1",
"chrono",
@@ -6645,7 +6646,7 @@ dependencies = [
"paste",
"pin-project-lite",
"rmcp-macros",
"schemars 1.0.5",
"schemars 1.1.0",
"serde",
"serde_json",
"thiserror 2.0.17",
@@ -6656,15 +6657,15 @@ dependencies = [
[[package]]
name = "rmcp-macros"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a19193e0d69bb1c96324b1b4daec078d4c8e76d734e53404c107437858a4d2"
checksum = "01263441d3f8635c628e33856c468b96ebbce1af2d3699ea712ca71432d4ee7a"
dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -6751,7 +6752,7 @@ dependencies = [
"quote",
"rust-embed-utils",
"shellexpand",
"syn 2.0.108",
"syn 2.0.109",
"walkdir",
]
@@ -6918,7 +6919,9 @@ name = "rustfs-audit"
version = "0.0.5"
dependencies = [
"chrono",
"const-str",
"futures",
"metrics",
"rumqttc",
"rustfs-config",
"rustfs-ecstore",
@@ -7188,7 +7191,7 @@ dependencies = [
"clap",
"mime_guess",
"rmcp",
"schemars 1.0.5",
"schemars 1.1.0",
"serde",
"serde_json",
"tokio",
@@ -7740,9 +7743,9 @@ dependencies = [
[[package]]
name = "schemars"
version = "1.0.5"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1317c3bf3e7df961da95b0a56a172a02abead31276215a0497241a7624b487ce"
checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289"
dependencies = [
"chrono",
"dyn-clone",
@@ -7754,14 +7757,14 @@ dependencies = [
[[package]]
name = "schemars_derive"
version = "1.0.5"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f760a6150d45dd66ec044983c124595ae76912e77ed0b44124cb3e415cce5d9"
checksum = "301858a4023d78debd2353c7426dc486001bddc91ae31a76fb1f55132f7e2633"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -7915,7 +7918,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -7926,7 +7929,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -7995,7 +7998,7 @@ dependencies = [
"indexmap 1.9.3",
"indexmap 2.12.0",
"schemars 0.9.0",
"schemars 1.0.5",
"schemars 1.1.0",
"serde_core",
"serde_json",
"serde_with_macros",
@@ -8011,7 +8014,7 @@ dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8036,7 +8039,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8225,7 +8228,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8311,7 +8314,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8405,7 +8408,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8417,7 +8420,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8540,9 +8543,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.108"
version = "2.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f"
dependencies = [
"proc-macro2",
"quote",
@@ -8587,7 +8590,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8686,7 +8689,7 @@ dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8697,7 +8700,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
"test-case-core",
]
@@ -8727,7 +8730,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8738,7 +8741,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -8894,7 +8897,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -9037,7 +9040,7 @@ dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -9062,7 +9065,7 @@ dependencies = [
"prost-build",
"prost-types",
"quote",
"syn 2.0.108",
"syn 2.0.109",
"tempfile",
"tonic-build",
]
@@ -9155,7 +9158,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -9397,7 +9400,7 @@ checksum = "d9384a660318abfbd7f8932c34d67e4d1ec511095f95972ddc01e19d7ba8413f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -9553,7 +9556,7 @@ dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
"wasm-bindgen-shared",
]
@@ -9622,9 +9625,9 @@ dependencies = [
[[package]]
name = "wildmatch"
version = "2.5.0"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39b7d07a236abaef6607536ccfaf19b396dbe3f5110ddb73d39f4562902ed382"
checksum = "2d654e41fe05169e03e27b97e0c23716535da037c1652a31fd99c6b2fad84059"
dependencies = [
"serde",
]
@@ -9727,7 +9730,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -9738,7 +9741,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -10008,7 +10011,7 @@ dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -10073,7 +10076,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
"synstructure 0.13.2",
]
@@ -10094,7 +10097,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -10114,7 +10117,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
"synstructure 0.13.2",
]
@@ -10135,7 +10138,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]
@@ -10168,7 +10171,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"syn 2.0.109",
]
[[package]]

View File

@@ -129,13 +129,13 @@ flatbuffers = "25.9.23"
form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.3"
rmcp = { version = "0.8.4" }
rmcp = { version = "0.8.5" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.0.5"
schemars = "1.1.0"
# Cryptography and Security
aes-gcm = { version = "0.10.3", features = ["std"] }
@@ -147,7 +147,7 @@ crc32c = "0.6.8"
crc32fast = "1.5.0"
crc64fast-nvme = "1.2.0"
hmac = "0.12.1"
jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
pbkdf2 = "0.12.2"
rsa = { version = "0.9.8" }
rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false }
@@ -169,8 +169,8 @@ astral-tokio-tar = "0.5.6"
atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.10" }
aws-credential-types = { version = "1.2.8" }
aws-sdk-s3 = { version = "1.110.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-credential-types = { version = "1.2.9" }
aws-sdk-s3 = { version = "1.112.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-smithy-types = { version = "1.3.4" }
base64 = "0.22.1"
base64-simd = "0.8.0"
@@ -178,7 +178,7 @@ brotli = "8.0.2"
cfg-if = "1.0.4"
clap = { version = "4.5.51", features = ["derive", "env"] }
const-str = { version = "0.7.0", features = ["std", "proc"] }
convert_case = "0.8.0"
convert_case = "0.9.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "50.3.0"
@@ -253,7 +253,7 @@ urlencoding = "2.1.3"
uuid = { version = "1.18.1", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.5.0", features = ["serde"] }
wildmatch = { version = "2.6.0", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "6.0.0"
@@ -262,7 +262,7 @@ zstd = "0.13.3"
# Observability and Metrics
opentelemetry = { version = "0.31.0" }
opentelemetry-appender-tracing = { version = "0.31.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes", "spec_unstable_logs_enabled"] }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] }
opentelemetry-otlp = { version = "0.31.0", features = ["http-proto", "zstd-http"] }
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry-stdout = { version = "0.31.0" }

View File

@@ -30,7 +30,9 @@ rustfs-targets = { workspace = true }
rustfs-config = { workspace = true, features = ["audit", "constants"] }
rustfs-ecstore = { workspace = true }
chrono = { workspace = true }
const-str = { workspace = true }
futures = { workspace = true }
metrics = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
@@ -39,5 +41,6 @@ tracing = { workspace = true, features = ["std", "attributes"] }
url = { workspace = true }
rumqttc = { workspace = true }
[lints]
workspace = true

View File

@@ -21,12 +21,47 @@
//! - Error rate monitoring
//! - Queue depth monitoring
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::info;
const RUSTFS_AUDIT_METRICS_NAMESPACE: &str = "rustfs.audit.";
const M_AUDIT_EVENTS_TOTAL: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "events.total");
const M_AUDIT_EVENTS_FAILED: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "events.failed");
const M_AUDIT_DISPATCH_NS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "dispatch.ns");
const M_AUDIT_EPS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "eps");
const M_AUDIT_TARGET_OPS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "target.ops");
const M_AUDIT_CONFIG_RELOADS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "config.reloads");
const M_AUDIT_SYSTEM_STARTS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "system.starts");
const L_RESULT: &str = "result";
const L_STATUS: &str = "status";
const V_SUCCESS: &str = "success";
const V_FAILURE: &str = "failure";
/// One-time registration of indicator meta information
/// This function ensures that metric descriptors are registered only once.
pub fn init_observability_metrics() {
static METRICS_DESC_INIT: OnceLock<()> = OnceLock::new();
METRICS_DESC_INIT.get_or_init(|| {
// Event/Time-consuming
describe_counter!(M_AUDIT_EVENTS_TOTAL, "Total audit events (labeled by result).");
describe_counter!(M_AUDIT_EVENTS_FAILED, "Total failed audit events.");
describe_histogram!(M_AUDIT_DISPATCH_NS, "Dispatch time per event (ns).");
describe_gauge!(M_AUDIT_EPS, "Events per second since last reset.");
// Target operation/system event
describe_counter!(M_AUDIT_TARGET_OPS, "Total target operations (labeled by status).");
describe_counter!(M_AUDIT_CONFIG_RELOADS, "Total configuration reloads.");
describe_counter!(M_AUDIT_SYSTEM_STARTS, "Total system starts.");
});
}
/// Metrics collector for audit system observability
#[derive(Debug)]
pub struct AuditMetrics {
@@ -56,6 +91,7 @@ impl Default for AuditMetrics {
impl AuditMetrics {
/// Creates a new metrics collector
pub fn new() -> Self {
init_observability_metrics();
Self {
total_events_processed: AtomicU64::new(0),
total_events_failed: AtomicU64::new(0),
@@ -68,11 +104,28 @@ impl AuditMetrics {
}
}
// Suggestion: Call this auxiliary function in the existing "Successful Event Recording" method body to complete the instrumentation
#[inline]
fn emit_event_success_metrics(&self, dispatch_time: Duration) {
// count + histogram
counter!(M_AUDIT_EVENTS_TOTAL, L_RESULT => V_SUCCESS).increment(1);
histogram!(M_AUDIT_DISPATCH_NS).record(dispatch_time.as_nanos() as f64);
}
// Suggestion: Call this auxiliary function in the existing "Failure Event Recording" method body to complete the instrumentation
#[inline]
fn emit_event_failure_metrics(&self, dispatch_time: Duration) {
counter!(M_AUDIT_EVENTS_TOTAL, L_RESULT => V_FAILURE).increment(1);
counter!(M_AUDIT_EVENTS_FAILED).increment(1);
histogram!(M_AUDIT_DISPATCH_NS).record(dispatch_time.as_nanos() as f64);
}
/// Records a successful event dispatch
pub fn record_event_success(&self, dispatch_time: Duration) {
self.total_events_processed.fetch_add(1, Ordering::Relaxed);
self.total_dispatch_time_ns
.fetch_add(dispatch_time.as_nanos() as u64, Ordering::Relaxed);
self.emit_event_success_metrics(dispatch_time);
}
/// Records a failed event dispatch
@@ -80,27 +133,32 @@ impl AuditMetrics {
self.total_events_failed.fetch_add(1, Ordering::Relaxed);
self.total_dispatch_time_ns
.fetch_add(dispatch_time.as_nanos() as u64, Ordering::Relaxed);
self.emit_event_failure_metrics(dispatch_time);
}
/// Records a successful target operation
pub fn record_target_success(&self) {
self.target_success_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_TARGET_OPS, L_STATUS => V_SUCCESS).increment(1);
}
/// Records a failed target operation
pub fn record_target_failure(&self) {
self.target_failure_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_TARGET_OPS, L_STATUS => V_FAILURE).increment(1);
}
/// Records a configuration reload
pub fn record_config_reload(&self) {
self.config_reload_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_CONFIG_RELOADS).increment(1);
info!("Audit configuration reloaded");
}
/// Records a system start
pub fn record_system_start(&self) {
self.system_start_count.fetch_add(1, Ordering::Relaxed);
counter!(M_AUDIT_SYSTEM_STARTS).increment(1);
info!("Audit system started");
}
@@ -110,11 +168,14 @@ impl AuditMetrics {
let elapsed = reset_time.elapsed();
let total_events = self.total_events_processed.load(Ordering::Relaxed) + self.total_events_failed.load(Ordering::Relaxed);
if elapsed.as_secs_f64() > 0.0 {
let eps = if elapsed.as_secs_f64() > 0.0 {
total_events as f64 / elapsed.as_secs_f64()
} else {
0.0
}
};
// EPS is reported in gauge
gauge!(M_AUDIT_EPS).set(eps);
eps
}
/// Gets the average dispatch latency in milliseconds
@@ -166,6 +227,8 @@ impl AuditMetrics {
let mut reset_time = self.last_reset_time.write().await;
*reset_time = Instant::now();
// Reset EPS to zero after reset
gauge!(M_AUDIT_EPS).set(0.0);
info!("Audit metrics reset");
}

View File

@@ -145,7 +145,7 @@ pub const DEFAULT_LOG_ROTATION_TIME: &str = "hour";
/// It is used to keep the logs of the application.
/// Default value: 30
/// Environment variable: RUSTFS_OBS_LOG_KEEP_FILES
pub const DEFAULT_LOG_KEEP_FILES: u16 = 30;
pub const DEFAULT_LOG_KEEP_FILES: usize = 30;
/// Default log local logging enabled for rustfs
/// This is the default log local logging enabled for rustfs.

View File

@@ -12,30 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/// Profiler related environment variable names and default values
pub const ENV_ENABLE_PROFILING: &str = "RUSTFS_ENABLE_PROFILING";
// CPU profiling
pub const ENV_CPU_MODE: &str = "RUSTFS_PROF_CPU_MODE"; // off|continuous|periodic
/// Frequency of CPU profiling samples
pub const ENV_CPU_FREQ: &str = "RUSTFS_PROF_CPU_FREQ";
/// Interval between CPU profiling sessions (for periodic mode)
pub const ENV_CPU_INTERVAL_SECS: &str = "RUSTFS_PROF_CPU_INTERVAL_SECS";
/// Duration of each CPU profiling session (for periodic mode)
pub const ENV_CPU_DURATION_SECS: &str = "RUSTFS_PROF_CPU_DURATION_SECS";
// Memory profiling (jemalloc)
/// Memory profiling (jemalloc)
pub const ENV_MEM_PERIODIC: &str = "RUSTFS_PROF_MEM_PERIODIC";
/// Interval between memory profiling snapshots (for periodic mode)
pub const ENV_MEM_INTERVAL_SECS: &str = "RUSTFS_PROF_MEM_INTERVAL_SECS";
// Output directory
/// Output directory
pub const ENV_OUTPUT_DIR: &str = "RUSTFS_PROF_OUTPUT_DIR";
// Defaults
/// Defaults for profiler settings
pub const DEFAULT_ENABLE_PROFILING: bool = false;
/// CPU profiling
pub const DEFAULT_CPU_MODE: &str = "off";
/// Frequency of CPU profiling samples
pub const DEFAULT_CPU_FREQ: usize = 100;
/// Interval between CPU profiling sessions (for periodic mode)
pub const DEFAULT_CPU_INTERVAL_SECS: u64 = 300;
/// Duration of each CPU profiling session (for periodic mode)
pub const DEFAULT_CPU_DURATION_SECS: u64 = 60;
/// Memory profiling (jemalloc)
pub const DEFAULT_MEM_PERIODIC: bool = false;
/// Interval between memory profiling snapshots (for periodic mode)
pub const DEFAULT_MEM_INTERVAL_SECS: u64 = 300;
/// Output directory
pub const DEFAULT_OUTPUT_DIR: &str = ".";

View File

@@ -0,0 +1,19 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Metrics collection interval in milliseconds for system metrics (CPU, memory, disk, network).
pub const DEFAULT_METRICS_SYSTEM_INTERVAL_MS: u64 = 30000;
/// Environment variable for setting the metrics collection interval for system metrics.
pub const ENV_OBS_METRICS_SYSTEM_INTERVAL_MS: &str = "RUSTFS_OBS_METRICS_SYSTEM_INTERVAL_MS";

View File

@@ -14,7 +14,13 @@
// Observability Keys
mod metrics;
pub use metrics::*;
pub const ENV_OBS_ENDPOINT: &str = "RUSTFS_OBS_ENDPOINT";
pub const ENV_OBS_TRACE_ENDPOINT: &str = "RUSTFS_OBS_TRACE_ENDPOINT";
pub const ENV_OBS_METRIC_ENDPOINT: &str = "RUSTFS_OBS_METRIC_ENDPOINT";
pub const ENV_OBS_LOG_ENDPOINT: &str = "RUSTFS_OBS_LOG_ENDPOINT";
pub const ENV_OBS_USE_STDOUT: &str = "RUSTFS_OBS_USE_STDOUT";
pub const ENV_OBS_SAMPLE_RATIO: &str = "RUSTFS_OBS_SAMPLE_RATIO";
pub const ENV_OBS_METER_INTERVAL: &str = "RUSTFS_OBS_METER_INTERVAL";
@@ -65,6 +71,9 @@ mod tests {
#[test]
fn test_env_keys() {
assert_eq!(ENV_OBS_ENDPOINT, "RUSTFS_OBS_ENDPOINT");
assert_eq!(ENV_OBS_TRACE_ENDPOINT, "RUSTFS_OBS_TRACE_ENDPOINT");
assert_eq!(ENV_OBS_METRIC_ENDPOINT, "RUSTFS_OBS_METRIC_ENDPOINT");
assert_eq!(ENV_OBS_LOG_ENDPOINT, "RUSTFS_OBS_LOG_ENDPOINT");
assert_eq!(ENV_OBS_USE_STDOUT, "RUSTFS_OBS_USE_STDOUT");
assert_eq!(ENV_OBS_SAMPLE_RATIO, "RUSTFS_OBS_SAMPLE_RATIO");
assert_eq!(ENV_OBS_METER_INTERVAL, "RUSTFS_OBS_METER_INTERVAL");

View File

@@ -45,7 +45,7 @@ opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] }
opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use rustfs_config::observability::{
ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL,
ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT,
ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_ENDPOINT, ENV_OBS_LOG_FILENAME,
ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED,
ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME,
ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_USE_STDOUT,
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME,
@@ -23,6 +24,7 @@ use rustfs_config::{
USE_STDOUT,
};
use rustfs_utils::dirs::get_log_directory_to_string;
use rustfs_utils::{get_env_bool, get_env_f64, get_env_opt_str, get_env_str, get_env_u64, get_env_usize};
use serde::{Deserialize, Serialize};
use std::env;
@@ -55,6 +57,9 @@ use std::env;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct OtelConfig {
pub endpoint: String, // Endpoint for metric collection
pub trace_endpoint: Option<String>, // Endpoint for trace collection
pub metric_endpoint: Option<String>, // Endpoint for metric collection
pub log_endpoint: Option<String>, // Endpoint for log collection
pub use_stdout: Option<bool>, // Output to stdout
pub sample_ratio: Option<f64>, // Trace sampling ratio
pub meter_interval: Option<u64>, // Metric collection interval
@@ -68,7 +73,7 @@ pub struct OtelConfig {
pub log_filename: Option<String>, // The name of the log file
pub log_rotation_size_mb: Option<u64>, // Log file size cut threshold (MB)
pub log_rotation_time: Option<String>, // Logs are cut by time (Hour DayMinute Second)
pub log_keep_files: Option<u16>, // Number of log files to be retained
pub log_keep_files: Option<usize>, // Number of log files to be retained
}
impl OtelConfig {
@@ -83,62 +88,29 @@ impl OtelConfig {
} else {
env::var(ENV_OBS_ENDPOINT).unwrap_or_else(|_| "".to_string())
};
let mut use_stdout = env::var(ENV_OBS_USE_STDOUT)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(USE_STDOUT));
let mut use_stdout = get_env_bool(ENV_OBS_USE_STDOUT, USE_STDOUT);
if endpoint.is_empty() {
use_stdout = Some(true);
use_stdout = true;
}
OtelConfig {
endpoint,
use_stdout,
sample_ratio: env::var(ENV_OBS_SAMPLE_RATIO)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SAMPLE_RATIO)),
meter_interval: env::var(ENV_OBS_METER_INTERVAL)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(METER_INTERVAL)),
service_name: env::var(ENV_OBS_SERVICE_NAME)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(APP_NAME.to_string())),
service_version: env::var(ENV_OBS_SERVICE_VERSION)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_VERSION.to_string())),
environment: env::var(ENV_OBS_ENVIRONMENT)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(ENVIRONMENT.to_string())),
logger_level: env::var(ENV_OBS_LOGGER_LEVEL)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
log_stdout_enabled: env::var(ENV_OBS_LOG_STDOUT_ENABLED)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_OBS_LOG_STDOUT_ENABLED)),
trace_endpoint: get_env_opt_str(ENV_OBS_TRACE_ENDPOINT),
metric_endpoint: get_env_opt_str(ENV_OBS_METRIC_ENDPOINT),
log_endpoint: get_env_opt_str(ENV_OBS_LOG_ENDPOINT),
use_stdout: Some(use_stdout),
sample_ratio: Some(get_env_f64(ENV_OBS_SAMPLE_RATIO, SAMPLE_RATIO)),
meter_interval: Some(get_env_u64(ENV_OBS_METER_INTERVAL, METER_INTERVAL)),
service_name: Some(get_env_str(ENV_OBS_SERVICE_NAME, APP_NAME)),
service_version: Some(get_env_str(ENV_OBS_SERVICE_VERSION, SERVICE_VERSION)),
environment: Some(get_env_str(ENV_OBS_ENVIRONMENT, ENVIRONMENT)),
logger_level: Some(get_env_str(ENV_OBS_LOGGER_LEVEL, DEFAULT_LOG_LEVEL)),
log_stdout_enabled: Some(get_env_bool(ENV_OBS_LOG_STDOUT_ENABLED, DEFAULT_OBS_LOG_STDOUT_ENABLED)),
log_directory: Some(get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY)),
log_filename: env::var(ENV_OBS_LOG_FILENAME)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_OBS_LOG_FILENAME.to_string())),
log_rotation_size_mb: env::var(ENV_OBS_LOG_ROTATION_SIZE_MB)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB
log_rotation_time: env::var(ENV_OBS_LOG_ROTATION_TIME)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day"
log_keep_files: env::var(ENV_OBS_LOG_KEEP_FILES)
.ok()
.and_then(|v| v.parse().ok())
.or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files
log_filename: Some(get_env_str(ENV_OBS_LOG_FILENAME, DEFAULT_OBS_LOG_FILENAME)),
log_rotation_size_mb: Some(get_env_u64(ENV_OBS_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB
log_rotation_time: Some(get_env_str(ENV_OBS_LOG_ROTATION_TIME, DEFAULT_LOG_ROTATION_TIME)), // Default to "Hour"
log_keep_files: Some(get_env_usize(ENV_OBS_LOG_KEEP_FILES, DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files
}
}

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::telemetry::{OtelGuard, init_telemetry};
use crate::{AppConfig, SystemObserver};
use crate::{AppConfig, OtelGuard, SystemObserver, TelemetryError, telemetry::init_telemetry};
use std::sync::{Arc, Mutex};
use tokio::sync::{OnceCell, SetError};
use tracing::{error, info};
@@ -52,6 +51,8 @@ pub enum GlobalError {
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
#[error("Telemetry initialization failed: {0}")]
TelemetryError(#[from] TelemetryError),
}
/// Initialize the observability module
@@ -68,14 +69,17 @@ pub enum GlobalError {
///
/// # #[tokio::main]
/// # async fn main() {
/// # let guard = init_obs(None).await;
/// # match init_obs(None).await {
/// # Ok(guard) => {}
/// # Err(e) => { eprintln!("Failed to initialize observability: {}", e); }
/// # }
/// # }
/// ```
pub async fn init_obs(endpoint: Option<String>) -> OtelGuard {
pub async fn init_obs(endpoint: Option<String>) -> Result<OtelGuard, GlobalError> {
// Load the configuration file
let config = AppConfig::new_with_endpoint(endpoint);
let otel_guard = init_telemetry(&config.observability);
let otel_guard = init_telemetry(&config.observability)?;
// Server will be created per connection - this ensures isolation
tokio::spawn(async move {
// Record the PID-related metrics of the current process
@@ -90,7 +94,7 @@ pub async fn init_obs(endpoint: Option<String>) -> OtelGuard {
}
});
otel_guard
Ok(otel_guard)
}
/// Set the global guard for OpenTelemetry
@@ -107,7 +111,10 @@ pub async fn init_obs(endpoint: Option<String>) -> OtelGuard {
/// # use rustfs_obs::{ init_obs, set_global_guard};
///
/// # async fn init() -> Result<(), Box<dyn std::error::Error>> {
/// # let guard = init_obs(None).await;
/// # let guard = match init_obs(None).await{
/// # Ok(g) => g,
/// # Err(e) => { return Err(Box::new(e)); }
/// # };
/// # set_global_guard(guard)?;
/// # Ok(())
/// # }

View File

@@ -38,7 +38,19 @@
///
/// # #[tokio::main]
/// # async fn main() {
/// # let guard = init_obs(None).await;
/// # let _guard = match init_obs(None).await {
/// # Ok(g) => g,
/// # Err(e) => {
/// # panic!("Failed to initialize observability: {:?}", e);
/// # }
/// # };
/// # // Application logic here
/// # {
/// # // Simulate some work
/// # tokio::time::sleep(std::time::Duration::from_secs(2)).await;
/// # println!("Application is running...");
/// # }
/// # // Guard will be dropped here, flushing telemetry data
/// # }
/// ```
mod config;
@@ -49,4 +61,6 @@ mod telemetry;
pub use config::{AppConfig, OtelConfig};
pub use global::*;
pub use metrics::*;
pub use system::SystemObserver;
pub use telemetry::{OtelGuard, TelemetryError};

View File

@@ -17,10 +17,15 @@
/// audit related metric descriptors
///
/// This module contains the metric descriptors for the audit subsystem.
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
const TARGET_ID: &str = "target_id";
pub const RESULT: &str = "result"; // success / failure
pub const STATUS: &str = "status"; // success / failure
pub const SUCCESS: &str = "success";
pub const FAILURE: &str = "failure";
pub static AUDIT_FAILED_MESSAGES_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {
new_counter_md(

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// bucket level s3 metric descriptor
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
use std::sync::LazyLock;
pub static BUCKET_API_TRAFFIC_SENT_BYTES_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Bucket copy metric descriptor
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Bucket level replication metric descriptor

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Metric descriptors related to cluster configuration
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Erasure code set related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// The label for the pool ID

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Cluster health-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static HEALTH_DRIVES_OFFLINE_COUNT_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// IAM related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use std::sync::LazyLock;
pub static LAST_SYNC_DURATION_MILLIS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Notify the relevant metric descriptor
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, subsystems};
use std::sync::LazyLock;
pub static NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Descriptors of metrics related to cluster object and bucket usage
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Bucket labels

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::metrics::{MetricName, MetricNamespace, MetricSubsystem, MetricType};
use crate::{MetricName, MetricNamespace, MetricSubsystem, MetricType};
use std::collections::HashSet;
/// MetricDescriptor - Metric descriptors

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::metrics::{MetricDescriptor, MetricName, MetricNamespace, MetricSubsystem, MetricType};
use crate::{MetricDescriptor, MetricName, MetricNamespace, MetricSubsystem, MetricType};
pub(crate) mod descriptor;
pub(crate) mod metric_name;
@@ -76,7 +76,7 @@ pub fn new_histogram_md(
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::subsystems;
use crate::subsystems;
#[test]
fn test_new_histogram_md() {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::metrics::entry::path_utils::format_path_to_metric_name;
use crate::entry::path_utils::format_path_to_metric_name;
/// The metrics subsystem is a subgroup of metrics within a namespace
/// The metrics subsystem, which represents a subgroup of metrics within a namespace
@@ -204,8 +204,8 @@ pub mod subsystems {
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::MetricType;
use crate::metrics::{MetricDescriptor, MetricName, MetricNamespace};
use crate::MetricType;
use crate::{MetricDescriptor, MetricName, MetricNamespace};
#[test]
fn test_metric_subsystem_formatting() {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// ILM-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static ILM_EXPIRY_PENDING_TASKS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// A descriptor for metrics related to webhook logs
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Define label constants for webhook metrics

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Metrics for replication subsystem
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -14,7 +14,7 @@
#![allow(dead_code)]
use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static API_REJECTED_AUTH_TOTAL_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Scanner-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
pub static SCANNER_BUCKET_SCANS_FINISHED_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {

View File

@@ -14,7 +14,7 @@
#![allow(dead_code)]
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
/// CPU system-related metric descriptors
use std::sync::LazyLock;

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
/// Drive-related metric descriptors
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// drive related labels

View File

@@ -20,7 +20,7 @@
/// These descriptors are initialized lazily using `std::sync::LazyLock` to ensure
/// they are only created when actually needed, improving performance and reducing
/// startup overhead.
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Total memory available on the node

View File

@@ -20,7 +20,7 @@
/// - Error counts for connection and general internode calls
/// - Network dial performance metrics
/// - Data transfer volume in both directions
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Total number of failed internode calls counter

View File

@@ -19,7 +19,7 @@
/// This module defines various system process metrics used for monitoring
/// the RustFS process performance, resource usage, and system integration.
/// Metrics are implemented using std::sync::LazyLock for thread-safe lazy initialization.
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
use std::sync::LazyLock;
/// Number of current READ locks on this peer

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use crate::{GlobalError, is_observability_enabled};
use opentelemetry::global::meter;
use opentelemetry::{global::meter, metrics::Meter};
use sysinfo::Pid;
mod attributes;
mod collector;
@@ -30,7 +31,8 @@ impl SystemObserver {
pub async fn init_process_observer() -> Result<(), GlobalError> {
if is_observability_enabled() {
let meter = meter("system");
return SystemObserver::init_process_observer_for_pid(meter, 30000).await;
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
return SystemObserver::init_process_observer_for_pid(meter, pid).await;
}
Ok(())
}
@@ -38,9 +40,12 @@ impl SystemObserver {
/// Initialize the metric collector for the specified PID process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
let pid = sysinfo::Pid::from_u32(pid);
let mut collector = collector::Collector::new(pid, meter, 30000)?;
pub async fn init_process_observer_for_pid(meter: Meter, pid: Pid) -> Result<(), GlobalError> {
let interval_ms = rustfs_utils::get_env_u64(
rustfs_config::observability::ENV_OBS_METRICS_SYSTEM_INTERVAL_MS,
rustfs_config::observability::DEFAULT_METRICS_SYSTEM_INTERVAL_MS,
);
let mut collector = collector::Collector::new(pid, meter, interval_ms)?;
collector.run().await
}
}

View File

@@ -14,17 +14,13 @@
use crate::config::OtelConfig;
use crate::global::IS_OBSERVABILITY_ENABLED;
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode,
WriteMode::{AsyncWith, BufferAndFlush},
style,
};
use flexi_logger::{DeferredNow, Record, WriteMode, WriteMode::AsyncWith, style};
use metrics::counter;
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::{Compression, Protocol, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
@@ -35,22 +31,22 @@ use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::observability::{ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL,
SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
SAMPLE_RATIO, SERVICE_VERSION,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
ENV_OBS_LOG_DIRECTORY,
},
};
use rustfs_utils::get_local_ip_with_default;
use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use tracing::info;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
@@ -74,20 +70,19 @@ pub struct OtelGuard {
meter_provider: Option<SdkMeterProvider>,
logger_provider: Option<SdkLoggerProvider>,
// Add a flexi_logger handle to keep the logging alive
_flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
// WorkerGuard for writing tracing files
_tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}
// Implement debug manually and avoid relying on all fields to implement debug
impl std::fmt::Debug for OtelGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelGuard")
.field("tracer_provider", &self.tracer_provider.is_some())
.field("meter_provider", &self.meter_provider.is_some())
.field("logger_provider", &self.logger_provider.is_some())
.field("_flexi_logger_handles", &self._flexi_logger_handles.is_some())
.field("_tracing_guard", &self._tracing_guard.is_some())
.field("flexi_logger_handles", &self.flexi_logger_handles.is_some())
.field("tracing_guard", &self.tracing_guard.is_some())
.finish()
}
}
@@ -110,9 +105,42 @@ impl Drop for OtelGuard {
eprintln!("Logger shutdown error: {err:?}");
}
}
if let Some(handle) = self.flexi_logger_handles.take() {
handle.shutdown();
println!("flexi_logger shutdown completed");
}
if let Some(guard) = self.tracing_guard.take() {
// The guard will be dropped here, flushing any remaining logs
drop(guard);
println!("Tracing guard dropped, flushing logs.");
}
}
}
#[derive(Debug)]
pub enum TelemetryError {
BuildSpanExporter(String),
BuildMetricExporter(String),
BuildLogExporter(String),
InstallMetricsRecorder(String),
SubscriberInit(String),
}
impl std::fmt::Display for TelemetryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TelemetryError::BuildSpanExporter(e) => write!(f, "Span exporter build failed: {e}"),
TelemetryError::BuildMetricExporter(e) => write!(f, "Metric exporter build failed: {e}"),
TelemetryError::BuildLogExporter(e) => write!(f, "Log exporter build failed: {e}"),
TelemetryError::InstallMetricsRecorder(e) => write!(f, "Install metrics recorder failed: {e}"),
TelemetryError::SubscriberInit(e) => write!(f, "Tracing subscriber init failed: {e}"),
}
}
}
impl std::error::Error for TelemetryError {}
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
Resource::builder()
@@ -141,456 +169,17 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
.build()
}
/// Initialize Telemetry
pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Environment-aware stdout configuration
// Check for explicit environment control via RUSTFS_OBS_ENVIRONMENT
let is_production = environment.to_lowercase() == DEFAULT_OBS_ENVIRONMENT_PRODUCTION;
// Default stdout behavior based on environment
let default_use_stdout = if is_production {
false // Disable stdout in production for security and log aggregation
} else {
USE_STDOUT // Use configured default for dev/test environments
};
let use_stdout = config.use_stdout.unwrap_or(default_use_stdout);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
// Configure flexi_logger to cut by time and size
let mut flexi_logger_handle = None;
if !endpoint.is_empty() {
// Pre-create resource objects to avoid repeated construction
let res = resource(config);
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if (0.0..1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone());
let tracer_provider = if endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let builder = if use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
};
builder.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
};
// initialize meter provider
let meter_provider = {
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
if endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(meter_interval));
} else {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
}
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
};
match metrics_exporter_opentelemetry::Recorder::builder("order-service").install_global() {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to set global metrics recorder: {e:?}");
}
}
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
if endpoint.is_empty() {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
} else {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
}
builder.build()
};
// configuring tracing
{
// configure the formatting layer
let fmt_layer = {
let enable_color = std::io::stdout().is_terminal();
let mut layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json()
.with_current_span(true)
.with_span_list(true);
let span_event = if is_production {
FmtSpan::CLOSE
} else {
// Only add full span events tracking in the development environment
FmtSpan::FULL
};
layer = layer.with_span_events(span_event);
layer.with_filter(build_env_filter(logger_level, None))
};
let filter = build_env_filter(logger_level, None);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure registry to avoid repeated calls to filter methods
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
Some(fmt_layer)
} else {
None
})
.with(OpenTelemetryLayer::new(tracer))
.with(otel_layer)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
IS_OBSERVABILITY_ENABLED.set(true).ok();
}
}
counter!("rustfs.start.total").increment(1);
return OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
_flexi_logger_handles: flexi_logger_handle,
_tracing_guard: None,
};
}
// Obtain the log directory and file name configuration
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
// Enhanced error handling for directory creation
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: Failed to create log directory '{log_directory}': {e}");
eprintln!("Ensure the parent directory exists and you have write permissions.");
eprintln!("Attempting to continue with logging, but file logging may fail.");
} else {
eprintln!("Log directory ready: {log_directory}");
}
#[cfg(unix)]
{
// Linux/macOS Setting Permissions with better error handling
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => {
eprintln!("WARNING: Failed to set log directory permissions for '{log_directory}': {e}");
eprintln!("This may affect log file access. Consider checking directory ownership and permissions.");
}
}
}
if endpoint.is_empty() && !is_production {
// Create a file appender (rolling by day), add the -tracing suffix to the file name to avoid conflicts
// let file_appender = tracing_appender::rolling::hourly(log_directory, format!("{log_filename}-tracing.log"));
let file_appender = RollingFileAppender::builder()
.rotation(Rotation::HOURLY) // rotate log files once every hour
.filename_prefix(format!("{log_filename}-tracing")) // log file names will be prefixed with `myapp.`
.filename_suffix("log") // log file names will be suffixed with `.log`
.build(log_directory) // try to build an appender that stores log files in `/var/log`
.expect("initializing rolling file appender failed");
let (nb_writer, guard) = tracing_appender::non_blocking(file_appender);
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(nb_writer) // Specify writing file
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id
let env_filter = build_env_filter(logger_level, None);
// Use registry() to register fmt_layer directly to ensure trace_id is output to the log
tracing_subscriber::registry()
.with(env_filter)
.with(ErrorLayer::default())
.with(fmt_layer)
.with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
let stdout_fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(std::io::stdout) // Specify writing file
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id;
Some(stdout_fmt_layer)
} else {
None
})
.init();
info!("Tracing telemetry initialized for non-production with trace_id logging.");
IS_OBSERVABILITY_ENABLED.set(false).ok();
return OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: None,
_tracing_guard: Some(guard),
};
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger level '{logger_level}': {e}. Using default 'info' level.");
LogSpecification::info()
});
// Environment-aware stdout configuration
// In production: disable stdout completely (Duplicate::None)
// In development/test: use level-based filtering
let level_filter = if is_production {
flexi_logger::Duplicate::None // No stdout output in production
} else {
// Convert the logger_level string to the corresponding LevelFilter for dev/test
match logger_level.to_lowercase().as_str() {
"trace" => flexi_logger::Duplicate::Trace,
"debug" => flexi_logger::Duplicate::Debug,
"info" => flexi_logger::Duplicate::Info,
"warn" | "warning" => flexi_logger::Duplicate::Warn,
"error" => flexi_logger::Duplicate::Error,
"off" => flexi_logger::Duplicate::None,
_ => flexi_logger::Duplicate::Info, // the default is info
}
};
// Choose write mode based on environment
let write_mode = if is_production {
get_env_async_with().unwrap_or_else(|| {
eprintln!(
"Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables."
);
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
})
} else {
BufferAndFlush
};
// Configure the flexi_logger with enhanced error handling
let mut flexi_logger_builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
flexi_logger::Logger::with(log_spec.clone())
})
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.write_mode(write_mode)
.append(); // Avoid clearing existing logs at startup
// Environment-aware stdout configuration
flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter);
// Only add stdout formatting and startup messages in non-production environments
if !is_production {
flexi_logger_builder = flexi_logger_builder
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.print_message(); // Startup information output to console
}
let flexi_logger_result = flexi_logger_builder.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
// Environment-aware success messages
if is_production {
eprintln!("Production logging initialized: file-only mode to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging disabled in production environment for security and log aggregation.");
} else {
eprintln!("Development/Test logging initialized with file logging to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging enabled for debugging. Environment: {environment}");
}
// Log rotation configuration details
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => {
eprintln!("Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files")
}
(Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"),
(None, Some(size)) => {
eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files")
}
_ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"),
}
} else {
eprintln!("CRITICAL: Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
eprintln!("Possible causes:");
eprintln!(" 1. Insufficient permissions to write to log directory: {log_directory}");
eprintln!(" 2. Log directory does not exist or is not accessible");
eprintln!(" 3. Invalid log configuration parameters");
eprintln!(" 4. Disk space issues");
eprintln!("Application will continue but logging to files will not work properly.");
}
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
_tracing_guard: None,
}
}
// Read the AsyncWith parameter from the environment variable
fn get_env_async_with() -> Option<WriteMode> {
let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::<u64>().ok());
let pool_capa = get_env_usize(ENV_OBS_LOG_POOL_CAPA, DEFAULT_OBS_LOG_POOL_CAPA);
let message_capa = get_env_usize(ENV_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_MESSAGE_CAPA);
let flush_ms = get_env_u64(ENV_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_FLUSH_MS);
match (pool_capa, message_capa, flush_ms) {
(Some(pool), Some(msg), Some(flush)) => Some(AsyncWith {
pool_capa: pool,
message_capa: msg,
flush_interval: Duration::from_millis(flush),
}),
_ => None,
}
Some(AsyncWith {
pool_capa,
message_capa,
flush_interval: Duration::from_millis(flush_ms),
})
}
fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter {
@@ -655,9 +244,339 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
)
}
/// stdout + span information (fix: retain WorkerGuard to avoid releasing after initialization)
fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
let env_filter = build_env_filter(logger_level, None);
let (nb, guard) = tracing_appender::non_blocking(std::io::stdout());
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_writer(nb)
.json()
.with_current_span(true)
.with_span_list(true)
.with_span_events(if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL });
tracing_subscriber::registry()
.with(env_filter)
.with(ErrorLayer::default())
.with(fmt_layer)
.init();
IS_OBSERVABILITY_ENABLED.set(false).ok();
counter!("rustfs.start.total").increment(1);
info!("Init stdout logging (level: {})", logger_level);
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
flexi_logger_handles: None,
tracing_guard: Some(guard),
}
}
/// File rolling log (size switching + number retained)
fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
use flexi_logger::{
Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming,
WriteMode::{AsyncWith, BufferAndFlush},
};
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: create log dir '{}': {e}", log_directory);
}
#[cfg(unix)]
{
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
let _ = fs::set_permissions(log_directory, Permissions::from_mode(0o755));
}
// parsing level
let log_spec = LogSpecification::parse(logger_level)
.unwrap_or_else(|_| LogSpecification::parse(DEFAULT_LOG_LEVEL).unwrap_or(LogSpecification::error()));
// Switch by size (MB), Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
// write mode
let write_mode = get_env_async_with().unwrap_or(if is_production {
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
} else {
BufferAndFlush
});
// Build
let mut builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
flexi_logger::Logger::with(log_spec.clone())
})
.format_for_stderr(format_with_color)
.format_for_files(format_for_file)
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files))
.write_mode(write_mode);
// Optional copy to stdout (for local observation)
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::All);
} else {
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::None);
}
let handle = match builder.start() {
Ok(h) => Some(h),
Err(e) => {
eprintln!("ERROR: start flexi_logger failed: {e}");
None
}
};
IS_OBSERVABILITY_ENABLED.set(false).ok();
counter!("rustfs.start.total").increment(1);
info!(
"Init file logging at '{}', roll size {:?}MB, keep {}",
log_directory, config.log_rotation_size_mb, keep_files
);
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
flexi_logger_handles: handle,
tracing_guard: None,
}
}
/// Observability (HTTP export, supports three sub-endpoints; if not, fallback to unified endpoint)
fn init_observability_http(config: &OtelConfig, logger_level: &str, is_production: bool) -> Result<OtelGuard, TelemetryError> {
// Resources and sampling
let res = resource(config);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME).to_owned();
let use_stdout = config.use_stdout.unwrap_or(!is_production);
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if (0.0..1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
// Endpoint
let root_ep = config.endpoint.as_str();
let trace_ep = config.trace_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep);
let metric_ep = config.metric_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep);
let log_ep = config.log_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep);
// TracerHTTP
let tracer_provider = {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(trace_ep)
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Zstd)
.build()
.map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?;
let mut builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone())
.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default());
}
let provider = builder.build();
global::set_tracer_provider(provider.clone());
provider
};
// MeterHTTP
let meter_provider = {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(metric_ep)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Zstd)
.build()
.map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?;
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
let provider = builder.build();
global::set_meter_provider(provider.clone());
provider
};
// metrics crate -> OTel
let _ = metrics_exporter_opentelemetry::Recorder::builder(service_name.clone()).install_global();
// LoggerHTTP
let logger_provider = {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_endpoint(log_ep)
.with_protocol(Protocol::HttpBinary)
.with_compression(Compression::Zstd)
.build()
.map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?;
let mut builder = SdkLoggerProvider::builder().with_resource(res);
builder = builder.with_batch_exporter(exporter);
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
builder.build()
};
// Tracing layer
let fmt_layer_opt = {
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
let enable_color = std::io::stdout().is_terminal();
let mut layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
.with_ansi(enable_color)
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json()
.with_current_span(true)
.with_span_list(true);
let span_event = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL };
layer = layer.with_span_events(span_event);
Some(layer.with_filter(build_env_filter(logger_level, None)))
} else {
None
}
};
let filter = build_env_filter(logger_level, None);
let otel_bridge = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(service_name.to_string());
tracing_subscriber::registry()
.with(filter)
.with(ErrorLayer::default())
.with(fmt_layer_opt)
.with(OpenTelemetryLayer::new(tracer))
.with(otel_bridge)
.with(MetricsLayer::new(meter_provider.clone()))
.init();
IS_OBSERVABILITY_ENABLED.set(true).ok();
counter!("rustfs.start.total").increment(1);
info!(
"Init observability (HTTP): trace='{}', metric='{}', log='{}'",
trace_ep, metric_ep, log_ep
);
Ok(OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
flexi_logger_handles: None,
tracing_guard: None,
})
}
/// Initialize Telemetry,Entrance: three rules
pub(crate) fn init_telemetry(config: &OtelConfig) -> Result<OtelGuard, TelemetryError> {
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
let is_production = environment.eq_ignore_ascii_case(DEFAULT_OBS_ENVIRONMENT_PRODUCTION);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
// Rule 3: Observability (any endpoint is enabled if it is not empty)
let has_obs = !config.endpoint.is_empty()
|| config.trace_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false)
|| config.metric_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false)
|| config.log_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false);
if has_obs {
return init_observability_http(config, logger_level, is_production);
}
// Rule 2: The user has explicitly customized the log directory (determined by whether ENV_OBS_LOG_DIRECTORY is set)
let user_set_log_dir = env::var(ENV_OBS_LOG_DIRECTORY).is_ok();
if user_set_log_dir {
return Ok(init_file_logging(config, logger_level, is_production));
}
// Rule 1: Default stdout (error level)
Ok(init_stdout_logging(config, DEFAULT_LOG_LEVEL, is_production))
}
#[cfg(test)]
mod tests {
use super::*;
use rustfs_config::USE_STDOUT;
#[test]
fn test_production_environment_detection() {

390
crates/utils/src/envs.rs Normal file
View File

@@ -0,0 +1,390 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 8-bit type: signed i8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `i8`: The parsed value as i8 if successful, otherwise the default value.
///
pub fn get_env_i8(key: &str, default: i8) -> i8 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 8-bit type: signed i8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i8>`: The parsed value as i8 if successful, otherwise None
///
pub fn get_env_opt_i8(key: &str) -> Option<i8> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 8-bit type: unsigned u8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `u8`: The parsed value as u8 if successful, otherwise the default value.
///
pub fn get_env_u8(key: &str, default: u8) -> u8 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 8-bit type: unsigned u8
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u8>`: The parsed value as u8 if successful, otherwise None
///
pub fn get_env_opt_u8(key: &str) -> Option<u8> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 16-bit type: signed i16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `i16`: The parsed value as i16 if successful, otherwise the default value.
///
pub fn get_env_i16(key: &str, default: i16) -> i16 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 16-bit type: signed i16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i16>`: The parsed value as i16 if successful, otherwise None
///
pub fn get_env_opt_i16(key: &str) -> Option<i16> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 16-bit type: unsigned u16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `u16`: The parsed value as u16 if successful, otherwise the default value.
///
pub fn get_env_u16(key: &str, default: u16) -> u16 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 16-bit type: unsigned u16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u16>`: The parsed value as u16 if successful, otherwise None
///
pub fn get_env_u16_opt(key: &str) -> Option<u16> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 16-bit type: unsigned u16
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u16>`: The parsed value as u16 if successful, otherwise None
///
pub fn get_env_opt_u16(key: &str) -> Option<u16> {
get_env_u16_opt(key)
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 32-bit type: signed i32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `i32`: The parsed value as i32 if successful, otherwise the default value.
///
pub fn get_env_i32(key: &str, default: i32) -> i32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 32-bit type: signed i32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i32>`: The parsed value as i32 if successful, otherwise None
///
pub fn get_env_opt_i32(key: &str) -> Option<i32> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
/// 32-bit type: unsigned u32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing fails.
///
/// #Returns
/// - `u32`: The parsed value as u32 if successful, otherwise the default value.
///
pub fn get_env_u32(key: &str, default: u32) -> u32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
/// 32-bit type: unsigned u32
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u32>`: The parsed value as u32 if successful, otherwise None
///
pub fn get_env_opt_u32(key: &str) -> Option<u32> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `f32`: The parsed value as f32 if successful, otherwise the default value
///
pub fn get_env_f32(key: &str, default: f32) -> f32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<f32>`: The parsed value as f32 if successful, otherwise None
///
pub fn get_env_opt_f32(key: &str) -> Option<f32> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `i64`: The parsed value as i64 if successful, otherwise the default value
///
pub fn get_env_i64(key: &str, default: i64) -> i64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<i64>`: The parsed value as i64 if successful, otherwise None
///
pub fn get_env_opt_i64(key: &str) -> Option<i64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, returning Option<Option<i64>> if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<Option<i64>>`: The parsed value as i64 if successful, otherwise None
///
pub fn get_env_opt_opt_i64(key: &str) -> Option<Option<i64>> {
env::var(key).ok().map(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `u64`: The parsed value as u64 if successful, otherwise the default value.
///
pub fn get_env_u64(key: &str, default: u64) -> u64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<u64>`: The parsed value as u64 if successful, otherwise None
///
pub fn get_env_opt_u64(key: &str) -> Option<u64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `f64`: The parsed value as f64 if successful, otherwise the default value.
///
pub fn get_env_f64(key: &str, default: f64) -> f64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<f64>`: The parsed value as f64 if successful, otherwise None
///
pub fn get_env_opt_f64(key: &str) -> Option<f64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default value to return if the environment variable is not set or parsing
///
/// #Returns
/// - `usize`: The parsed value as usize if successful, otherwise the default value.
///
pub fn get_env_usize(key: &str, default: usize) -> usize {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<usize>`: The parsed value as usize if successful, otherwise None
///
pub fn get_env_usize_opt(key: &str) -> Option<usize> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<usize>`: The parsed value as usize if successful, otherwise None
///
pub fn get_env_opt_usize(key: &str) -> Option<usize> {
get_env_usize_opt(key)
}
/// Retrieve an environment variable as a String, with a default value if not set.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default string value to return if the environment variable is not set.
///
/// #Returns
/// - `String`: The environment variable value if set, otherwise the default value.
///
pub fn get_env_str(key: &str, default: &str) -> String {
env::var(key).unwrap_or_else(|_| default.to_string())
}
/// Retrieve an environment variable as a String, returning None if not set.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<String>`: The environment variable value if set, otherwise None.
///
pub fn get_env_opt_str(key: &str) -> Option<String> {
env::var(key).ok()
}
/// Retrieve an environment variable as a boolean, with a default value if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
/// - `default`: The default boolean value to return if the environment variable is not set or cannot be parsed.
///
/// #Returns
/// - `bool`: The parsed boolean value if successful, otherwise the default value.
///
pub fn get_env_bool(key: &str, default: bool) -> bool {
env::var(key)
.ok()
.and_then(|v| match v.to_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
.unwrap_or(default)
}
/// Retrieve an environment variable as a boolean, returning None if not set or parsing fails.
///
/// #Parameters
/// - `key`: The environment variable key to look up.
///
/// #Returns
/// - `Option<bool>`: The parsed boolean value if successful, otherwise None.
///
pub fn get_env_opt_bool(key: &str) -> Option<bool> {
env::var(key).ok().and_then(|v| match v.to_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
}

View File

@@ -81,8 +81,8 @@ pub mod sys;
#[cfg(feature = "sys")]
pub use sys::user_agent::*;
#[cfg(feature = "sys")]
pub use sys::envs::*;
#[cfg(feature = "notify")]
pub use notify::*;
mod envs;
pub use envs::*;

View File

@@ -1,42 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
pub fn get_env_usize(key: &str, default: usize) -> usize {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub fn get_env_u64(key: &str, default: u64) -> u64 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub fn get_env_u32(key: &str, default: u32) -> u32 {
env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub fn get_env_str(key: &str, default: &str) -> String {
env::var(key).unwrap_or_else(|_| default.to_string())
}
pub fn get_env_opt_u64(key: &str) -> Option<u64> {
env::var(key).ok().and_then(|v| v.parse().ok())
}
pub fn get_env_bool(key: &str, default: bool) -> bool {
env::var(key)
.ok()
.and_then(|v| match v.to_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"0" | "false" | "no" => Some(false),
_ => None,
})
.unwrap_or(default)
}

View File

@@ -12,5 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod envs;
pub(crate) mod user_agent;

View File

@@ -89,7 +89,6 @@ const LOGO: &str = r#"
#[instrument]
fn print_server_info() {
let current_year = chrono::Utc::now().year();
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
@@ -112,10 +111,13 @@ async fn async_main() -> Result<()> {
init_license(opt.license.clone());
// Initialize Observability
let guard = init_obs(Some(opt.clone().obs_endpoint)).await;
// print startup logo
info!("{}", LOGO);
let guard = match init_obs(Some(opt.clone().obs_endpoint)).await {
Ok(g) => g,
Err(e) => {
println!("Failed to initialize observability: {}", e);
return Err(Error::other(e));
}
};
// Store in global storage
match set_global_guard(guard).map_err(Error::other) {
@@ -126,6 +128,9 @@ async fn async_main() -> Result<()> {
}
}
// print startup logo
info!("{}", LOGO);
// Initialize performance profiling if enabled
#[cfg(not(target_os = "windows"))]
profiling::init_from_env().await;

View File

@@ -51,7 +51,14 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
# export RUSTFS_TLS_PATH="./deploy/certs"
# Observability related configuration
#export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector address
#export RUSTFS_OBS_ENDPOINT=http://localhost:4318 # OpenTelemetry Collector address
# RustFS OR OTEL exporter configuration
#export RUSTFS_OBS_TRACE_ENDPOINT=http://localhost:4318 # OpenTelemetry Collector trace address http://localhost:4318/v1/traces
#export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
#export RUSTFS_OBS_METRIC_ENDPOINT=http://localhost:9090/api/v1/otlp # OpenTelemetry Collector metric address
#export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://localhost:9090/api/v1/otlp/v1/metrics
#export RUSTFS_OBS_LOG_ENDPOINT=http://loki:3100/otlp # OpenTelemetry Collector logs address http://loki:3100/otlp/v1/logs
#export OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=http://loki:3100/otlp/v1/logs
#export RUSTFS_OBS_USE_STDOUT=false # Whether to use standard output
#export RUSTFS_OBS_SAMPLE_RATIO=2.0 # Sample ratio, between 0.0-1.0, 0.0 means no sampling, 1.0 means full sampling
#export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds
@@ -60,7 +67,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9001"
export RUSTFS_OBS_ENVIRONMENT=develop # Environment name
export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error
export RUSTFS_OBS_LOG_STDOUT_ENABLED=true # Whether to enable local stdout logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
#export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB
export RUSTFS_OBS_LOG_POOL_CAPA=10240