diff --git a/.docker/observability/docker-compose.yml b/.docker/observability/docker-compose.yml index c86d3542..2fdeae10 100644 --- a/.docker/observability/docker-compose.yml +++ b/.docker/observability/docker-compose.yml @@ -16,7 +16,7 @@ services: tempo-init: image: busybox:latest - command: ["sh", "-c", "chown -R 10001:10001 /var/tempo"] + command: [ "sh", "-c", "chown -R 10001:10001 /var/tempo" ] volumes: - ./tempo-data:/var/tempo user: root @@ -39,7 +39,7 @@ services: - otel-network otel-collector: - image: otel/opentelemetry-collector-contrib:0.129.1 + image: otel/opentelemetry-collector-contrib:latest environment: - TZ=Asia/Shanghai volumes: @@ -55,7 +55,7 @@ services: networks: - otel-network jaeger: - image: jaegertracing/jaeger:2.8.0 + image: jaegertracing/jaeger:latest environment: - TZ=Asia/Shanghai ports: @@ -65,17 +65,21 @@ services: networks: - otel-network prometheus: - image: prom/prometheus:v3.4.2 + image: prom/prometheus:latest environment: - TZ=Asia/Shanghai volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml ports: - "9090:9090" + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--web.enable-otlp-receiver' # Enable OTLP + - '--enable-feature=promql-experimental-functions' # Enable info() networks: - otel-network loki: - image: grafana/loki:3.5.1 + image: grafana/loki:latest environment: - TZ=Asia/Shanghai volumes: @@ -86,7 +90,7 @@ services: networks: - otel-network grafana: - image: grafana/grafana:12.0.2 + image: grafana/grafana:latest ports: - "3000:3000" # Web UI volumes: diff --git a/.docker/observability/grafana-datasources.yaml b/.docker/observability/grafana-datasources.yaml index fca5cff4..cbd09a17 100644 --- a/.docker/observability/grafana-datasources.yaml +++ b/.docker/observability/grafana-datasources.yaml @@ -29,4 +29,80 @@ datasources: serviceMap: datasourceUid: prometheus streamingEnabled: - search: true \ No newline at end of file + search: true + tracesToLogsV2: + # Field with an internal link pointing to a logs data source in Grafana. + # datasourceUid value must match the uid value of the logs data source. + datasourceUid: 'loki' + spanStartTimeShift: '-1h' + spanEndTimeShift: '1h' + tags: [ 'job', 'instance', 'pod', 'namespace' ] + filterByTraceID: false + filterBySpanID: false + customQuery: true + query: 'method="$${__span.tags.method}"' + tracesToMetrics: + datasourceUid: 'prom' + spanStartTimeShift: '-1h' + spanEndTimeShift: '1h' + tags: [ { key: 'service.name', value: 'service' }, { key: 'job' } ] + queries: + - name: 'Sample query' + query: 'sum(rate(traces_spanmetrics_latency_bucket{$$__tags}[5m]))' + tracesToProfiles: + datasourceUid: 'grafana-pyroscope-datasource' + tags: [ 'job', 'instance', 'pod', 'namespace' ] + profileTypeId: 'process_cpu:cpu:nanoseconds:cpu:nanoseconds' + customQuery: true + query: 'method="$${__span.tags.method}"' + serviceMap: + datasourceUid: 'prometheus' + nodeGraph: + enabled: true + search: + hide: false + traceQuery: + timeShiftEnabled: true + spanStartTimeShift: '-1h' + spanEndTimeShift: '1h' + spanBar: + type: 'Tag' + tag: 'http.path' + streamingEnabled: + search: true + - name: Jaeger + type: jaeger + uid: Jaeger + url: http://jaeger:16686 + basicAuth: false + access: proxy + readOnly: false + isDefault: false + jsonData: + tracesToLogsV2: + # Field with an internal link pointing to a logs data source in Grafana. + # datasourceUid value must match the uid value of the logs data source. + datasourceUid: 'loki' + spanStartTimeShift: '1h' + spanEndTimeShift: '-1h' + tags: [ 'job', 'instance', 'pod', 'namespace' ] + filterByTraceID: false + filterBySpanID: false + customQuery: true + query: 'method="$${__span.tags.method}"' + tracesToMetrics: + datasourceUid: 'prom' + spanStartTimeShift: '1h' + spanEndTimeShift: '-1h' + tags: [ { key: 'service.name', value: 'service' }, { key: 'job' } ] + queries: + - name: 'Sample query' + query: 'sum(rate(traces_spanmetrics_latency_bucket{$$__tags}[5m]))' + nodeGraph: + enabled: true + traceQuery: + timeShiftEnabled: true + spanStartTimeShift: '1h' + spanEndTimeShift: '-1h' + spanBar: + type: 'None' \ No newline at end of file diff --git a/.docker/observability/loki-config.yaml b/.docker/observability/loki-config.yaml index caf00ba6..f3991e04 100644 --- a/.docker/observability/loki-config.yaml +++ b/.docker/observability/loki-config.yaml @@ -63,6 +63,7 @@ ruler: frontend: encoding: protobuf + # By default, Loki will send anonymous, but uniquely-identifiable usage and configuration # analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ # diff --git a/.docker/observability/otel-collector-config.yaml b/.docker/observability/otel-collector-config.yaml index 217fa872..09af9dc9 100644 --- a/.docker/observability/otel-collector-config.yaml +++ b/.docker/observability/otel-collector-config.yaml @@ -43,7 +43,6 @@ exporters: send_timestamps: true # 发送时间戳 # enable_open_metrics: true otlphttp/loki: # Loki 导出器,用于日志数据 - # endpoint: "http://loki:3100/otlp/v1/logs" endpoint: "http://loki:3100/otlp/v1/logs" tls: insecure: true diff --git a/.docker/observability/prometheus.yml b/.docker/observability/prometheus.yml index 8d7526d7..5254087e 100644 --- a/.docker/observability/prometheus.yml +++ b/.docker/observability/prometheus.yml @@ -13,16 +13,43 @@ # limitations under the License. global: - scrape_interval: 5s # 刮取间隔 + scrape_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. scrape_configs: - job_name: 'otel-collector' static_configs: - - targets: [ 'otel-collector:8888' ] # 从 Collector 刮取指标 + - targets: [ 'otel-collector:8888' ] # Scrape metrics from Collector - job_name: 'otel-metrics' static_configs: - - targets: [ 'otel-collector:8889' ] # 应用指标 + - targets: [ 'otel-collector:8889' ] # Application indicators - job_name: 'tempo' static_configs: - - targets: [ 'tempo:3200' ] - + - targets: [ 'tempo:3200' ] # Scrape metrics from Tempo + +otlp: + # Recommended attributes to be promoted to labels. + promote_resource_attributes: + - service.instance.id + - service.name + - service.namespace + - cloud.availability_zone + - cloud.region + - container.name + - deployment.environment.name + - k8s.cluster.name + - k8s.container.name + - k8s.cronjob.name + - k8s.daemonset.name + - k8s.deployment.name + - k8s.job.name + - k8s.namespace.name + - k8s.pod.name + - k8s.replicaset.name + - k8s.statefulset.name + # Ingest OTLP data keeping all characters in metric/label names. + translation_strategy: NoUTF8EscapingWithSuffixes + +storage: + # OTLP is a push-based protocol, Out of order samples is a common scenario. + tsdb: + out_of_order_time_window: 30m \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 52e4c5e9..4d034b00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,7 +505,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -527,7 +527,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -538,7 +538,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -564,7 +564,7 @@ checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -605,9 +605,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.8" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf26925f4a5b59eb76722b63c2892b1d70d06fa053c72e4a100ec308c1d47bc" +checksum = "86590e57ea40121d47d3f2e131bfd873dea15d78dc2f4604f4734537ad9e56c4" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -640,9 +640,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.13" +version = "1.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f2402da1a5e16868ba98725e5d73f26b8116eaa892e56f2cd0bf5eec7985f70" +checksum = "8fe0fd441565b0b318c76e7206c8d1d0b0166b3e986cf30e890b61feb6192045" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -665,9 +665,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.110.0" +version = "1.112.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a811ec867f77c01aa0f0abfaa9fedef647cc83608ad8e67949f95d30d04a7fd" +checksum = "eee73a27721035c46da0572b390a69fbdb333d0177c24f3d8f7ff952eeb96690" dependencies = [ "aws-credential-types", "aws-runtime", @@ -699,9 +699,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.88.0" +version = "1.89.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d05b276777560aa9a196dbba2e3aada4d8006d3d7eeb3ba7fe0c317227d933c4" +checksum = "a9c1b1af02288f729e95b72bd17988c009aa72e26dcb59b3200f86d7aea726c9" dependencies = [ "aws-credential-types", "aws-runtime", @@ -721,9 +721,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.90.0" +version = "1.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9be14d6d9cd761fac3fd234a0f47f7ed6c0df62d83c0eeb7012750e4732879b" +checksum = "4e8122301558dc7c6c68e878af918880b82ff41897a60c8c4e18e4dc4d93e9f1" dependencies = [ "aws-credential-types", "aws-runtime", @@ -743,9 +743,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.90.0" +version = "1.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98a862d704c817d865c8740b62d8bbeb5adcb30965e93b471df8a5bcefa20a80" +checksum = "8f8090151d4d1e971269957b10dbf287bba551ab812e591ce0516b1c73b75d27" dependencies = [ "aws-credential-types", "aws-runtime", @@ -805,9 +805,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.63.10" +version = "0.63.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb9a26b2831e728924ec0089e92697a78a2f9cdcf90d81e8cfcc6a6c85080369" +checksum = "95bd108f7b3563598e4dc7b62e1388c9982324a2abd622442167012690184591" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -1193,7 +1193,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -1273,7 +1273,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -1614,7 +1614,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -1707,7 +1707,7 @@ checksum = "a08a8aee16926ee1c4ad18868b8c3dfe5106359053f91e035861ec2a17116988" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -1738,9 +1738,9 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" [[package]] name = "convert_case" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +checksum = "db05ffb6856bf0ecdf6367558a76a0e8a77b1713044eb92845c692100ed50190" dependencies = [ "unicode-segmentation", ] @@ -1806,15 +1806,15 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" [[package]] name = "crc-fast" -version = "1.3.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bf62af4cc77d8fe1c22dde4e721d87f2f54056139d8c412e1366b740305f56f" +checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3" dependencies = [ "crc", "digest 0.10.7", - "libc", "rand 0.9.2", "regex", + "rustversion", ] [[package]] @@ -1963,9 +1963,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.2.0-rc.4" +version = "0.2.0-rc.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8235645834fbc6832939736ce2f2d08192652269e11010a6240f61b908a1c6" +checksum = "919bd05924682a5480aec713596b9e2aabed3a0a6022fab6847f85a99e5f190a" dependencies = [ "hybrid-array", ] @@ -2024,7 +2024,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2082,7 +2082,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2096,7 +2096,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2118,7 +2118,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core 0.20.11", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2129,7 +2129,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core 0.21.3", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2604,7 +2604,7 @@ checksum = "ec6f637bce95efac05cdfb9b6c19579ed4aa5f6b94d951cfa5bb054b7bb4f730" dependencies = [ "datafusion-expr", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2843,7 +2843,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2885,7 +2885,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2905,7 +2905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core 0.20.2", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -2925,7 +2925,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", "unicode-xid", ] @@ -2955,7 +2955,7 @@ checksum = "6c478574b20020306f98d61c8ca3322d762e1ff08117422ac6106438605ea516" dependencies = [ "block-buffer 0.11.0-rc.5", "const-oid 0.10.1", - "crypto-common 0.2.0-rc.4", + "crypto-common 0.2.0-rc.5", "subtle", ] @@ -2988,7 +2988,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -3159,7 +3159,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -3180,7 +3180,7 @@ dependencies = [ "darling 0.21.3", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -3219,7 +3219,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -3230,9 +3230,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "erased-serde" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b" +checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3" dependencies = [ "serde", "serde_core", @@ -3500,7 +3500,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -4566,9 +4566,9 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "10.1.0" +version = "10.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d119c6924272d16f0ab9ce41f7aa0bfef9340c00b0bb7ca3dd3b263d4a9150b" +checksum = "c76e1c7d7df3e34443b3621b459b066a7b79644f059fc8b2db7070c825fd417e" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -5490,6 +5490,7 @@ dependencies = [ "bytes", "http 1.3.1", "opentelemetry 0.31.0", + "reqwest", ] [[package]] @@ -5504,10 +5505,10 @@ dependencies = [ "opentelemetry-proto", "opentelemetry_sdk 0.31.0", "prost 0.14.1", + "reqwest", "thiserror 2.0.17", - "tokio", - "tonic", "tracing", + "zstd", ] [[package]] @@ -5839,7 +5840,7 @@ dependencies = [ "phf_shared 0.11.3", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -5877,7 +5878,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6072,7 +6073,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6131,7 +6132,7 @@ dependencies = [ "pulldown-cmark", "pulldown-cmark-to-cmark", "regex", - "syn 2.0.108", + "syn 2.0.109", "tempfile", ] @@ -6145,7 +6146,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6158,7 +6159,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6338,9 +6339,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.41" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -6453,7 +6454,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6505,7 +6506,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6635,9 +6636,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2c66318b30535ccd0d3b39afaa4240d6b5a35328fb7672a28e3386c97472805" +checksum = "e5947688160b56fb6c827e3c20a72c90392a1d7e9dec74749197aa1780ac42ca" dependencies = [ "base64 0.22.1", "chrono", @@ -6645,7 +6646,7 @@ dependencies = [ "paste", "pin-project-lite", "rmcp-macros", - "schemars 1.0.5", + "schemars 1.1.0", "serde", "serde_json", "thiserror 2.0.17", @@ -6656,15 +6657,15 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a19193e0d69bb1c96324b1b4daec078d4c8e76d734e53404c107437858a4d2" +checksum = "01263441d3f8635c628e33856c468b96ebbce1af2d3699ea712ca71432d4ee7a" dependencies = [ "darling 0.21.3", "proc-macro2", "quote", "serde_json", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -6751,7 +6752,7 @@ dependencies = [ "quote", "rust-embed-utils", "shellexpand", - "syn 2.0.108", + "syn 2.0.109", "walkdir", ] @@ -6918,7 +6919,9 @@ name = "rustfs-audit" version = "0.0.5" dependencies = [ "chrono", + "const-str", "futures", + "metrics", "rumqttc", "rustfs-config", "rustfs-ecstore", @@ -7188,7 +7191,7 @@ dependencies = [ "clap", "mime_guess", "rmcp", - "schemars 1.0.5", + "schemars 1.1.0", "serde", "serde_json", "tokio", @@ -7740,9 +7743,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1317c3bf3e7df961da95b0a56a172a02abead31276215a0497241a7624b487ce" +checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" dependencies = [ "chrono", "dyn-clone", @@ -7754,14 +7757,14 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f760a6150d45dd66ec044983c124595ae76912e77ed0b44124cb3e415cce5d9" +checksum = "301858a4023d78debd2353c7426dc486001bddc91ae31a76fb1f55132f7e2633" dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -7915,7 +7918,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -7926,7 +7929,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -7995,7 +7998,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.12.0", "schemars 0.9.0", - "schemars 1.0.5", + "schemars 1.1.0", "serde_core", "serde_json", "serde_with_macros", @@ -8011,7 +8014,7 @@ dependencies = [ "darling 0.21.3", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8036,7 +8039,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8225,7 +8228,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8311,7 +8314,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8405,7 +8408,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8417,7 +8420,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8540,9 +8543,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.108" +version = "2.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" +checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f" dependencies = [ "proc-macro2", "quote", @@ -8587,7 +8590,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8686,7 +8689,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8697,7 +8700,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", "test-case-core", ] @@ -8727,7 +8730,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8738,7 +8741,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -8894,7 +8897,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -9037,7 +9040,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -9062,7 +9065,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn 2.0.108", + "syn 2.0.109", "tempfile", "tonic-build", ] @@ -9155,7 +9158,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -9397,7 +9400,7 @@ checksum = "d9384a660318abfbd7f8932c34d67e4d1ec511095f95972ddc01e19d7ba8413f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -9553,7 +9556,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", "wasm-bindgen-shared", ] @@ -9622,9 +9625,9 @@ dependencies = [ [[package]] name = "wildmatch" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39b7d07a236abaef6607536ccfaf19b396dbe3f5110ddb73d39f4562902ed382" +checksum = "2d654e41fe05169e03e27b97e0c23716535da037c1652a31fd99c6b2fad84059" dependencies = [ "serde", ] @@ -9727,7 +9730,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -9738,7 +9741,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -10008,7 +10011,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -10073,7 +10076,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", "synstructure 0.13.2", ] @@ -10094,7 +10097,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -10114,7 +10117,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", "synstructure 0.13.2", ] @@ -10135,7 +10138,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] @@ -10168,7 +10171,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn 2.0.109", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6350c2f4..4718eaf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,13 +129,13 @@ flatbuffers = "25.9.23" form_urlencoded = "1.2.2" prost = "0.14.1" quick-xml = "0.38.3" -rmcp = { version = "0.8.4" } +rmcp = { version = "0.8.5" } rmp = { version = "0.8.14" } rmp-serde = { version = "1.3.0" } serde = { version = "1.0.228", features = ["derive"] } serde_json = { version = "1.0.145", features = ["raw_value"] } serde_urlencoded = "0.7.1" -schemars = "1.0.5" +schemars = "1.1.0" # Cryptography and Security aes-gcm = { version = "0.10.3", features = ["std"] } @@ -147,7 +147,7 @@ crc32c = "0.6.8" crc32fast = "1.5.0" crc64fast-nvme = "1.2.0" hmac = "0.12.1" -jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] } +jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } pbkdf2 = "0.12.2" rsa = { version = "0.9.8" } rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false } @@ -169,8 +169,8 @@ astral-tokio-tar = "0.5.6" atoi = "2.0.0" atomic_enum = "0.3.0" aws-config = { version = "1.8.10" } -aws-credential-types = { version = "1.2.8" } -aws-sdk-s3 = { version = "1.110.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] } +aws-credential-types = { version = "1.2.9" } +aws-sdk-s3 = { version = "1.112.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] } aws-smithy-types = { version = "1.3.4" } base64 = "0.22.1" base64-simd = "0.8.0" @@ -178,7 +178,7 @@ brotli = "8.0.2" cfg-if = "1.0.4" clap = { version = "4.5.51", features = ["derive", "env"] } const-str = { version = "0.7.0", features = ["std", "proc"] } -convert_case = "0.8.0" +convert_case = "0.9.0" criterion = { version = "0.7", features = ["html_reports"] } crossbeam-queue = "0.3.12" datafusion = "50.3.0" @@ -253,7 +253,7 @@ urlencoding = "2.1.3" uuid = { version = "1.18.1", features = ["v4", "fast-rng", "macro-diagnostics"] } vaultrs = { version = "0.7.4" } walkdir = "2.5.0" -wildmatch = { version = "2.5.0", features = ["serde"] } +wildmatch = { version = "2.6.0", features = ["serde"] } winapi = { version = "0.3.9" } xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] } zip = "6.0.0" @@ -262,7 +262,7 @@ zstd = "0.13.3" # Observability and Metrics opentelemetry = { version = "0.31.0" } opentelemetry-appender-tracing = { version = "0.31.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes", "spec_unstable_logs_enabled"] } -opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] } +opentelemetry-otlp = { version = "0.31.0", features = ["http-proto", "zstd-http"] } opentelemetry_sdk = { version = "0.31.0" } opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] } opentelemetry-stdout = { version = "0.31.0" } diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index e847bb2b..ea430d5b 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -30,7 +30,9 @@ rustfs-targets = { workspace = true } rustfs-config = { workspace = true, features = ["audit", "constants"] } rustfs-ecstore = { workspace = true } chrono = { workspace = true } +const-str = { workspace = true } futures = { workspace = true } +metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } @@ -39,5 +41,6 @@ tracing = { workspace = true, features = ["std", "attributes"] } url = { workspace = true } rumqttc = { workspace = true } + [lints] workspace = true diff --git a/crates/audit/src/observability.rs b/crates/audit/src/observability.rs index abbcda21..03461f8e 100644 --- a/crates/audit/src/observability.rs +++ b/crates/audit/src/observability.rs @@ -21,12 +21,47 @@ //! - Error rate monitoring //! - Queue depth monitoring +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use tokio::sync::RwLock; use tracing::info; +const RUSTFS_AUDIT_METRICS_NAMESPACE: &str = "rustfs.audit."; + +const M_AUDIT_EVENTS_TOTAL: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "events.total"); +const M_AUDIT_EVENTS_FAILED: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "events.failed"); +const M_AUDIT_DISPATCH_NS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "dispatch.ns"); +const M_AUDIT_EPS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "eps"); +const M_AUDIT_TARGET_OPS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "target.ops"); +const M_AUDIT_CONFIG_RELOADS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "config.reloads"); +const M_AUDIT_SYSTEM_STARTS: &str = const_str::concat!(RUSTFS_AUDIT_METRICS_NAMESPACE, "system.starts"); + +const L_RESULT: &str = "result"; +const L_STATUS: &str = "status"; + +const V_SUCCESS: &str = "success"; +const V_FAILURE: &str = "failure"; + +/// One-time registration of indicator meta information +/// This function ensures that metric descriptors are registered only once. +pub fn init_observability_metrics() { + static METRICS_DESC_INIT: OnceLock<()> = OnceLock::new(); + METRICS_DESC_INIT.get_or_init(|| { + // Event/Time-consuming + describe_counter!(M_AUDIT_EVENTS_TOTAL, "Total audit events (labeled by result)."); + describe_counter!(M_AUDIT_EVENTS_FAILED, "Total failed audit events."); + describe_histogram!(M_AUDIT_DISPATCH_NS, "Dispatch time per event (ns)."); + describe_gauge!(M_AUDIT_EPS, "Events per second since last reset."); + + // Target operation/system event + describe_counter!(M_AUDIT_TARGET_OPS, "Total target operations (labeled by status)."); + describe_counter!(M_AUDIT_CONFIG_RELOADS, "Total configuration reloads."); + describe_counter!(M_AUDIT_SYSTEM_STARTS, "Total system starts."); + }); +} + /// Metrics collector for audit system observability #[derive(Debug)] pub struct AuditMetrics { @@ -56,6 +91,7 @@ impl Default for AuditMetrics { impl AuditMetrics { /// Creates a new metrics collector pub fn new() -> Self { + init_observability_metrics(); Self { total_events_processed: AtomicU64::new(0), total_events_failed: AtomicU64::new(0), @@ -68,11 +104,28 @@ impl AuditMetrics { } } + // Suggestion: Call this auxiliary function in the existing "Successful Event Recording" method body to complete the instrumentation + #[inline] + fn emit_event_success_metrics(&self, dispatch_time: Duration) { + // count + histogram + counter!(M_AUDIT_EVENTS_TOTAL, L_RESULT => V_SUCCESS).increment(1); + histogram!(M_AUDIT_DISPATCH_NS).record(dispatch_time.as_nanos() as f64); + } + + // Suggestion: Call this auxiliary function in the existing "Failure Event Recording" method body to complete the instrumentation + #[inline] + fn emit_event_failure_metrics(&self, dispatch_time: Duration) { + counter!(M_AUDIT_EVENTS_TOTAL, L_RESULT => V_FAILURE).increment(1); + counter!(M_AUDIT_EVENTS_FAILED).increment(1); + histogram!(M_AUDIT_DISPATCH_NS).record(dispatch_time.as_nanos() as f64); + } + /// Records a successful event dispatch pub fn record_event_success(&self, dispatch_time: Duration) { self.total_events_processed.fetch_add(1, Ordering::Relaxed); self.total_dispatch_time_ns .fetch_add(dispatch_time.as_nanos() as u64, Ordering::Relaxed); + self.emit_event_success_metrics(dispatch_time); } /// Records a failed event dispatch @@ -80,27 +133,32 @@ impl AuditMetrics { self.total_events_failed.fetch_add(1, Ordering::Relaxed); self.total_dispatch_time_ns .fetch_add(dispatch_time.as_nanos() as u64, Ordering::Relaxed); + self.emit_event_failure_metrics(dispatch_time); } /// Records a successful target operation pub fn record_target_success(&self) { self.target_success_count.fetch_add(1, Ordering::Relaxed); + counter!(M_AUDIT_TARGET_OPS, L_STATUS => V_SUCCESS).increment(1); } /// Records a failed target operation pub fn record_target_failure(&self) { self.target_failure_count.fetch_add(1, Ordering::Relaxed); + counter!(M_AUDIT_TARGET_OPS, L_STATUS => V_FAILURE).increment(1); } /// Records a configuration reload pub fn record_config_reload(&self) { self.config_reload_count.fetch_add(1, Ordering::Relaxed); + counter!(M_AUDIT_CONFIG_RELOADS).increment(1); info!("Audit configuration reloaded"); } /// Records a system start pub fn record_system_start(&self) { self.system_start_count.fetch_add(1, Ordering::Relaxed); + counter!(M_AUDIT_SYSTEM_STARTS).increment(1); info!("Audit system started"); } @@ -110,11 +168,14 @@ impl AuditMetrics { let elapsed = reset_time.elapsed(); let total_events = self.total_events_processed.load(Ordering::Relaxed) + self.total_events_failed.load(Ordering::Relaxed); - if elapsed.as_secs_f64() > 0.0 { + let eps = if elapsed.as_secs_f64() > 0.0 { total_events as f64 / elapsed.as_secs_f64() } else { 0.0 - } + }; + // EPS is reported in gauge + gauge!(M_AUDIT_EPS).set(eps); + eps } /// Gets the average dispatch latency in milliseconds @@ -166,6 +227,8 @@ impl AuditMetrics { let mut reset_time = self.last_reset_time.write().await; *reset_time = Instant::now(); + // Reset EPS to zero after reset + gauge!(M_AUDIT_EPS).set(0.0); info!("Audit metrics reset"); } diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index 5686e39d..f62c73ae 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -145,7 +145,7 @@ pub const DEFAULT_LOG_ROTATION_TIME: &str = "hour"; /// It is used to keep the logs of the application. /// Default value: 30 /// Environment variable: RUSTFS_OBS_LOG_KEEP_FILES -pub const DEFAULT_LOG_KEEP_FILES: u16 = 30; +pub const DEFAULT_LOG_KEEP_FILES: usize = 30; /// Default log local logging enabled for rustfs /// This is the default log local logging enabled for rustfs. diff --git a/crates/config/src/constants/profiler.rs b/crates/config/src/constants/profiler.rs index ce9ce200..282b5b5c 100644 --- a/crates/config/src/constants/profiler.rs +++ b/crates/config/src/constants/profiler.rs @@ -12,30 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. +/// Profiler related environment variable names and default values pub const ENV_ENABLE_PROFILING: &str = "RUSTFS_ENABLE_PROFILING"; // CPU profiling pub const ENV_CPU_MODE: &str = "RUSTFS_PROF_CPU_MODE"; // off|continuous|periodic +/// Frequency of CPU profiling samples pub const ENV_CPU_FREQ: &str = "RUSTFS_PROF_CPU_FREQ"; +/// Interval between CPU profiling sessions (for periodic mode) pub const ENV_CPU_INTERVAL_SECS: &str = "RUSTFS_PROF_CPU_INTERVAL_SECS"; +/// Duration of each CPU profiling session (for periodic mode) pub const ENV_CPU_DURATION_SECS: &str = "RUSTFS_PROF_CPU_DURATION_SECS"; -// Memory profiling (jemalloc) +/// Memory profiling (jemalloc) pub const ENV_MEM_PERIODIC: &str = "RUSTFS_PROF_MEM_PERIODIC"; +/// Interval between memory profiling snapshots (for periodic mode) pub const ENV_MEM_INTERVAL_SECS: &str = "RUSTFS_PROF_MEM_INTERVAL_SECS"; -// Output directory +/// Output directory pub const ENV_OUTPUT_DIR: &str = "RUSTFS_PROF_OUTPUT_DIR"; -// Defaults +/// Defaults for profiler settings pub const DEFAULT_ENABLE_PROFILING: bool = false; - +/// CPU profiling pub const DEFAULT_CPU_MODE: &str = "off"; +/// Frequency of CPU profiling samples pub const DEFAULT_CPU_FREQ: usize = 100; +/// Interval between CPU profiling sessions (for periodic mode) pub const DEFAULT_CPU_INTERVAL_SECS: u64 = 300; +/// Duration of each CPU profiling session (for periodic mode) pub const DEFAULT_CPU_DURATION_SECS: u64 = 60; - +/// Memory profiling (jemalloc) pub const DEFAULT_MEM_PERIODIC: bool = false; +/// Interval between memory profiling snapshots (for periodic mode) pub const DEFAULT_MEM_INTERVAL_SECS: u64 = 300; - +/// Output directory pub const DEFAULT_OUTPUT_DIR: &str = "."; diff --git a/crates/config/src/observability/metrics.rs b/crates/config/src/observability/metrics.rs new file mode 100644 index 00000000..dc2053a7 --- /dev/null +++ b/crates/config/src/observability/metrics.rs @@ -0,0 +1,19 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Metrics collection interval in milliseconds for system metrics (CPU, memory, disk, network). +pub const DEFAULT_METRICS_SYSTEM_INTERVAL_MS: u64 = 30000; + +/// Environment variable for setting the metrics collection interval for system metrics. +pub const ENV_OBS_METRICS_SYSTEM_INTERVAL_MS: &str = "RUSTFS_OBS_METRICS_SYSTEM_INTERVAL_MS"; diff --git a/crates/config/src/observability/mod.rs b/crates/config/src/observability/mod.rs index e723882c..ad1dc5bb 100644 --- a/crates/config/src/observability/mod.rs +++ b/crates/config/src/observability/mod.rs @@ -14,7 +14,13 @@ // Observability Keys +mod metrics; +pub use metrics::*; + pub const ENV_OBS_ENDPOINT: &str = "RUSTFS_OBS_ENDPOINT"; +pub const ENV_OBS_TRACE_ENDPOINT: &str = "RUSTFS_OBS_TRACE_ENDPOINT"; +pub const ENV_OBS_METRIC_ENDPOINT: &str = "RUSTFS_OBS_METRIC_ENDPOINT"; +pub const ENV_OBS_LOG_ENDPOINT: &str = "RUSTFS_OBS_LOG_ENDPOINT"; pub const ENV_OBS_USE_STDOUT: &str = "RUSTFS_OBS_USE_STDOUT"; pub const ENV_OBS_SAMPLE_RATIO: &str = "RUSTFS_OBS_SAMPLE_RATIO"; pub const ENV_OBS_METER_INTERVAL: &str = "RUSTFS_OBS_METER_INTERVAL"; @@ -65,6 +71,9 @@ mod tests { #[test] fn test_env_keys() { assert_eq!(ENV_OBS_ENDPOINT, "RUSTFS_OBS_ENDPOINT"); + assert_eq!(ENV_OBS_TRACE_ENDPOINT, "RUSTFS_OBS_TRACE_ENDPOINT"); + assert_eq!(ENV_OBS_METRIC_ENDPOINT, "RUSTFS_OBS_METRIC_ENDPOINT"); + assert_eq!(ENV_OBS_LOG_ENDPOINT, "RUSTFS_OBS_LOG_ENDPOINT"); assert_eq!(ENV_OBS_USE_STDOUT, "RUSTFS_OBS_USE_STDOUT"); assert_eq!(ENV_OBS_SAMPLE_RATIO, "RUSTFS_OBS_SAMPLE_RATIO"); assert_eq!(ENV_OBS_METER_INTERVAL, "RUSTFS_OBS_METER_INTERVAL"); diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 13781e20..b27ae42b 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -45,7 +45,7 @@ opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-stdout = { workspace = true } -opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] } +opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } serde = { workspace = true } smallvec = { workspace = true, features = ["serde"] } diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index ee225dc7..25e2c5e5 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -13,9 +13,10 @@ // limitations under the License. use rustfs_config::observability::{ - ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES, - ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL, - ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT, + ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_ENDPOINT, ENV_OBS_LOG_FILENAME, + ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, + ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME, + ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_USE_STDOUT, }; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME, @@ -23,6 +24,7 @@ use rustfs_config::{ USE_STDOUT, }; use rustfs_utils::dirs::get_log_directory_to_string; +use rustfs_utils::{get_env_bool, get_env_f64, get_env_opt_str, get_env_str, get_env_u64, get_env_usize}; use serde::{Deserialize, Serialize}; use std::env; @@ -55,6 +57,9 @@ use std::env; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct OtelConfig { pub endpoint: String, // Endpoint for metric collection + pub trace_endpoint: Option, // Endpoint for trace collection + pub metric_endpoint: Option, // Endpoint for metric collection + pub log_endpoint: Option, // Endpoint for log collection pub use_stdout: Option, // Output to stdout pub sample_ratio: Option, // Trace sampling ratio pub meter_interval: Option, // Metric collection interval @@ -68,7 +73,7 @@ pub struct OtelConfig { pub log_filename: Option, // The name of the log file pub log_rotation_size_mb: Option, // Log file size cut threshold (MB) pub log_rotation_time: Option, // Logs are cut by time (Hour, Day,Minute, Second) - pub log_keep_files: Option, // Number of log files to be retained + pub log_keep_files: Option, // Number of log files to be retained } impl OtelConfig { @@ -83,62 +88,29 @@ impl OtelConfig { } else { env::var(ENV_OBS_ENDPOINT).unwrap_or_else(|_| "".to_string()) }; - let mut use_stdout = env::var(ENV_OBS_USE_STDOUT) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(USE_STDOUT)); + let mut use_stdout = get_env_bool(ENV_OBS_USE_STDOUT, USE_STDOUT); if endpoint.is_empty() { - use_stdout = Some(true); + use_stdout = true; } OtelConfig { endpoint, - use_stdout, - sample_ratio: env::var(ENV_OBS_SAMPLE_RATIO) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(SAMPLE_RATIO)), - meter_interval: env::var(ENV_OBS_METER_INTERVAL) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(METER_INTERVAL)), - service_name: env::var(ENV_OBS_SERVICE_NAME) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(APP_NAME.to_string())), - service_version: env::var(ENV_OBS_SERVICE_VERSION) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(SERVICE_VERSION.to_string())), - environment: env::var(ENV_OBS_ENVIRONMENT) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(ENVIRONMENT.to_string())), - logger_level: env::var(ENV_OBS_LOGGER_LEVEL) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_LOG_LEVEL.to_string())), - log_stdout_enabled: env::var(ENV_OBS_LOG_STDOUT_ENABLED) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_OBS_LOG_STDOUT_ENABLED)), + trace_endpoint: get_env_opt_str(ENV_OBS_TRACE_ENDPOINT), + metric_endpoint: get_env_opt_str(ENV_OBS_METRIC_ENDPOINT), + log_endpoint: get_env_opt_str(ENV_OBS_LOG_ENDPOINT), + use_stdout: Some(use_stdout), + sample_ratio: Some(get_env_f64(ENV_OBS_SAMPLE_RATIO, SAMPLE_RATIO)), + meter_interval: Some(get_env_u64(ENV_OBS_METER_INTERVAL, METER_INTERVAL)), + service_name: Some(get_env_str(ENV_OBS_SERVICE_NAME, APP_NAME)), + service_version: Some(get_env_str(ENV_OBS_SERVICE_VERSION, SERVICE_VERSION)), + environment: Some(get_env_str(ENV_OBS_ENVIRONMENT, ENVIRONMENT)), + logger_level: Some(get_env_str(ENV_OBS_LOGGER_LEVEL, DEFAULT_LOG_LEVEL)), + log_stdout_enabled: Some(get_env_bool(ENV_OBS_LOG_STDOUT_ENABLED, DEFAULT_OBS_LOG_STDOUT_ENABLED)), log_directory: Some(get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY)), - log_filename: env::var(ENV_OBS_LOG_FILENAME) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_OBS_LOG_FILENAME.to_string())), - log_rotation_size_mb: env::var(ENV_OBS_LOG_ROTATION_SIZE_MB) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB - log_rotation_time: env::var(ENV_OBS_LOG_ROTATION_TIME) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day" - log_keep_files: env::var(ENV_OBS_LOG_KEEP_FILES) - .ok() - .and_then(|v| v.parse().ok()) - .or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files + log_filename: Some(get_env_str(ENV_OBS_LOG_FILENAME, DEFAULT_OBS_LOG_FILENAME)), + log_rotation_size_mb: Some(get_env_u64(ENV_OBS_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB + log_rotation_time: Some(get_env_str(ENV_OBS_LOG_ROTATION_TIME, DEFAULT_LOG_ROTATION_TIME)), // Default to "Hour" + log_keep_files: Some(get_env_usize(ENV_OBS_LOG_KEEP_FILES, DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files } } diff --git a/crates/obs/src/global.rs b/crates/obs/src/global.rs index 04791982..7eade486 100644 --- a/crates/obs/src/global.rs +++ b/crates/obs/src/global.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::telemetry::{OtelGuard, init_telemetry}; -use crate::{AppConfig, SystemObserver}; +use crate::{AppConfig, OtelGuard, SystemObserver, TelemetryError, telemetry::init_telemetry}; use std::sync::{Arc, Mutex}; use tokio::sync::{OnceCell, SetError}; use tracing::{error, info}; @@ -52,6 +51,8 @@ pub enum GlobalError { SendFailed(&'static str), #[error("Operation timed out: {0}")] Timeout(&'static str), + #[error("Telemetry initialization failed: {0}")] + TelemetryError(#[from] TelemetryError), } /// Initialize the observability module @@ -68,14 +69,17 @@ pub enum GlobalError { /// /// # #[tokio::main] /// # async fn main() { -/// # let guard = init_obs(None).await; +/// # match init_obs(None).await { +/// # Ok(guard) => {} +/// # Err(e) => { eprintln!("Failed to initialize observability: {}", e); } +/// # } /// # } /// ``` -pub async fn init_obs(endpoint: Option) -> OtelGuard { +pub async fn init_obs(endpoint: Option) -> Result { // Load the configuration file let config = AppConfig::new_with_endpoint(endpoint); - let otel_guard = init_telemetry(&config.observability); + let otel_guard = init_telemetry(&config.observability)?; // Server will be created per connection - this ensures isolation tokio::spawn(async move { // Record the PID-related metrics of the current process @@ -90,7 +94,7 @@ pub async fn init_obs(endpoint: Option) -> OtelGuard { } }); - otel_guard + Ok(otel_guard) } /// Set the global guard for OpenTelemetry @@ -107,7 +111,10 @@ pub async fn init_obs(endpoint: Option) -> OtelGuard { /// # use rustfs_obs::{ init_obs, set_global_guard}; /// /// # async fn init() -> Result<(), Box> { -/// # let guard = init_obs(None).await; +/// # let guard = match init_obs(None).await{ +/// # Ok(g) => g, +/// # Err(e) => { return Err(Box::new(e)); } +/// # }; /// # set_global_guard(guard)?; /// # Ok(()) /// # } diff --git a/crates/obs/src/lib.rs b/crates/obs/src/lib.rs index 43bc0542..39f9afe8 100644 --- a/crates/obs/src/lib.rs +++ b/crates/obs/src/lib.rs @@ -38,7 +38,19 @@ /// /// # #[tokio::main] /// # async fn main() { -/// # let guard = init_obs(None).await; +/// # let _guard = match init_obs(None).await { +/// # Ok(g) => g, +/// # Err(e) => { +/// # panic!("Failed to initialize observability: {:?}", e); +/// # } +/// # }; +/// # // Application logic here +/// # { +/// # // Simulate some work +/// # tokio::time::sleep(std::time::Duration::from_secs(2)).await; +/// # println!("Application is running..."); +/// # } +/// # // Guard will be dropped here, flushing telemetry data /// # } /// ``` mod config; @@ -49,4 +61,6 @@ mod telemetry; pub use config::{AppConfig, OtelConfig}; pub use global::*; +pub use metrics::*; pub use system::SystemObserver; +pub use telemetry::{OtelGuard, TelemetryError}; diff --git a/crates/obs/src/metrics/audit.rs b/crates/obs/src/metrics/audit.rs index ea192b22..54564c6b 100644 --- a/crates/obs/src/metrics/audit.rs +++ b/crates/obs/src/metrics/audit.rs @@ -17,10 +17,15 @@ /// audit related metric descriptors /// /// This module contains the metric descriptors for the audit subsystem. -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; const TARGET_ID: &str = "target_id"; +pub const RESULT: &str = "result"; // success / failure +pub const STATUS: &str = "status"; // success / failure + +pub const SUCCESS: &str = "success"; +pub const FAILURE: &str = "failure"; pub static AUDIT_FAILED_MESSAGES_MD: LazyLock = LazyLock::new(|| { new_counter_md( diff --git a/crates/obs/src/metrics/bucket.rs b/crates/obs/src/metrics/bucket.rs index ab25944b..516662e3 100644 --- a/crates/obs/src/metrics/bucket.rs +++ b/crates/obs/src/metrics/bucket.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// bucket level s3 metric descriptor -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems}; use std::sync::LazyLock; pub static BUCKET_API_TRAFFIC_SENT_BYTES_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/bucket_replication.rs b/crates/obs/src/metrics/bucket_replication.rs index 8a2554ad..df26b757 100644 --- a/crates/obs/src/metrics/bucket_replication.rs +++ b/crates/obs/src/metrics/bucket_replication.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Bucket copy metric descriptor -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; /// Bucket level replication metric descriptor diff --git a/crates/obs/src/metrics/cluster_config.rs b/crates/obs/src/metrics/cluster_config.rs index 912d8b0b..14379648 100644 --- a/crates/obs/src/metrics/cluster_config.rs +++ b/crates/obs/src/metrics/cluster_config.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Metric descriptors related to cluster configuration -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; use std::sync::LazyLock; diff --git a/crates/obs/src/metrics/cluster_erasure_set.rs b/crates/obs/src/metrics/cluster_erasure_set.rs index 64dd31a7..ecb23222 100644 --- a/crates/obs/src/metrics/cluster_erasure_set.rs +++ b/crates/obs/src/metrics/cluster_erasure_set.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Erasure code set related metric descriptors -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; use std::sync::LazyLock; /// The label for the pool ID diff --git a/crates/obs/src/metrics/cluster_health.rs b/crates/obs/src/metrics/cluster_health.rs index 31ea7a40..b951c06e 100644 --- a/crates/obs/src/metrics/cluster_health.rs +++ b/crates/obs/src/metrics/cluster_health.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Cluster health-related metric descriptors -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; use std::sync::LazyLock; pub static HEALTH_DRIVES_OFFLINE_COUNT_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/cluster_iam.rs b/crates/obs/src/metrics/cluster_iam.rs index 0dc4dee4..021fd567 100644 --- a/crates/obs/src/metrics/cluster_iam.rs +++ b/crates/obs/src/metrics/cluster_iam.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// IAM related metric descriptors -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, subsystems}; use std::sync::LazyLock; pub static LAST_SYNC_DURATION_MILLIS_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/cluster_notification.rs b/crates/obs/src/metrics/cluster_notification.rs index 4d867d96..04d46faa 100644 --- a/crates/obs/src/metrics/cluster_notification.rs +++ b/crates/obs/src/metrics/cluster_notification.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Notify the relevant metric descriptor -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, subsystems}; use std::sync::LazyLock; pub static NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/cluster_usage.rs b/crates/obs/src/metrics/cluster_usage.rs index fb2695df..315b55ec 100644 --- a/crates/obs/src/metrics/cluster_usage.rs +++ b/crates/obs/src/metrics/cluster_usage.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Descriptors of metrics related to cluster object and bucket usage -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; use std::sync::LazyLock; /// Bucket labels diff --git a/crates/obs/src/metrics/entry/descriptor.rs b/crates/obs/src/metrics/entry/descriptor.rs index a80861f0..e8ddb699 100644 --- a/crates/obs/src/metrics/entry/descriptor.rs +++ b/crates/obs/src/metrics/entry/descriptor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::metrics::{MetricName, MetricNamespace, MetricSubsystem, MetricType}; +use crate::{MetricName, MetricNamespace, MetricSubsystem, MetricType}; use std::collections::HashSet; /// MetricDescriptor - Metric descriptors diff --git a/crates/obs/src/metrics/entry/mod.rs b/crates/obs/src/metrics/entry/mod.rs index b632769f..55ec5e5c 100644 --- a/crates/obs/src/metrics/entry/mod.rs +++ b/crates/obs/src/metrics/entry/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::metrics::{MetricDescriptor, MetricName, MetricNamespace, MetricSubsystem, MetricType}; +use crate::{MetricDescriptor, MetricName, MetricNamespace, MetricSubsystem, MetricType}; pub(crate) mod descriptor; pub(crate) mod metric_name; @@ -76,7 +76,7 @@ pub fn new_histogram_md( #[cfg(test)] mod tests { use super::*; - use crate::metrics::subsystems; + use crate::subsystems; #[test] fn test_new_histogram_md() { diff --git a/crates/obs/src/metrics/entry/subsystem.rs b/crates/obs/src/metrics/entry/subsystem.rs index 56413eb3..03529800 100644 --- a/crates/obs/src/metrics/entry/subsystem.rs +++ b/crates/obs/src/metrics/entry/subsystem.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::metrics::entry::path_utils::format_path_to_metric_name; +use crate::entry::path_utils::format_path_to_metric_name; /// The metrics subsystem is a subgroup of metrics within a namespace /// The metrics subsystem, which represents a subgroup of metrics within a namespace @@ -204,8 +204,8 @@ pub mod subsystems { #[cfg(test)] mod tests { use super::*; - use crate::metrics::MetricType; - use crate::metrics::{MetricDescriptor, MetricName, MetricNamespace}; + use crate::MetricType; + use crate::{MetricDescriptor, MetricName, MetricNamespace}; #[test] fn test_metric_subsystem_formatting() { diff --git a/crates/obs/src/metrics/ilm.rs b/crates/obs/src/metrics/ilm.rs index 06ef0372..a2db5178 100644 --- a/crates/obs/src/metrics/ilm.rs +++ b/crates/obs/src/metrics/ilm.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// ILM-related metric descriptors -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; pub static ILM_EXPIRY_PENDING_TASKS_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/logger_webhook.rs b/crates/obs/src/metrics/logger_webhook.rs index 279cf37a..9981ea2b 100644 --- a/crates/obs/src/metrics/logger_webhook.rs +++ b/crates/obs/src/metrics/logger_webhook.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// A descriptor for metrics related to webhook logs -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; /// Define label constants for webhook metrics diff --git a/crates/obs/src/metrics/replication.rs b/crates/obs/src/metrics/replication.rs index 8f820e98..69444368 100644 --- a/crates/obs/src/metrics/replication.rs +++ b/crates/obs/src/metrics/replication.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Metrics for replication subsystem -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; use std::sync::LazyLock; pub static REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/request.rs b/crates/obs/src/metrics/request.rs index 15dcafe4..aaeb1f1b 100644 --- a/crates/obs/src/metrics/request.rs +++ b/crates/obs/src/metrics/request.rs @@ -14,7 +14,7 @@ #![allow(dead_code)] -use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; pub static API_REJECTED_AUTH_TOTAL_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/scanner.rs b/crates/obs/src/metrics/scanner.rs index 12f57daa..a8ab6b59 100644 --- a/crates/obs/src/metrics/scanner.rs +++ b/crates/obs/src/metrics/scanner.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Scanner-related metric descriptors -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; pub static SCANNER_BUCKET_SCANS_FINISHED_MD: LazyLock = LazyLock::new(|| { diff --git a/crates/obs/src/metrics/system_cpu.rs b/crates/obs/src/metrics/system_cpu.rs index 53b2c345..6a11b152 100644 --- a/crates/obs/src/metrics/system_cpu.rs +++ b/crates/obs/src/metrics/system_cpu.rs @@ -14,7 +14,7 @@ #![allow(dead_code)] -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; /// CPU system-related metric descriptors use std::sync::LazyLock; diff --git a/crates/obs/src/metrics/system_drive.rs b/crates/obs/src/metrics/system_drive.rs index 2e6258bc..8d406a2e 100644 --- a/crates/obs/src/metrics/system_drive.rs +++ b/crates/obs/src/metrics/system_drive.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] /// Drive-related metric descriptors -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; /// drive related labels diff --git a/crates/obs/src/metrics/system_memory.rs b/crates/obs/src/metrics/system_memory.rs index 5a2c3b2c..f0935290 100644 --- a/crates/obs/src/metrics/system_memory.rs +++ b/crates/obs/src/metrics/system_memory.rs @@ -20,7 +20,7 @@ /// These descriptors are initialized lazily using `std::sync::LazyLock` to ensure /// they are only created when actually needed, improving performance and reducing /// startup overhead. -use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; use std::sync::LazyLock; /// Total memory available on the node diff --git a/crates/obs/src/metrics/system_network.rs b/crates/obs/src/metrics/system_network.rs index e988ed23..1b6888f0 100644 --- a/crates/obs/src/metrics/system_network.rs +++ b/crates/obs/src/metrics/system_network.rs @@ -20,7 +20,7 @@ /// - Error counts for connection and general internode calls /// - Network dial performance metrics /// - Data transfer volume in both directions -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; /// Total number of failed internode calls counter diff --git a/crates/obs/src/metrics/system_process.rs b/crates/obs/src/metrics/system_process.rs index 239dc2ef..be327e98 100644 --- a/crates/obs/src/metrics/system_process.rs +++ b/crates/obs/src/metrics/system_process.rs @@ -19,7 +19,7 @@ /// This module defines various system process metrics used for monitoring /// the RustFS process performance, resource usage, and system integration. /// Metrics are implemented using std::sync::LazyLock for thread-safe lazy initialization. -use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; +use crate::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; use std::sync::LazyLock; /// Number of current READ locks on this peer diff --git a/crates/obs/src/system/mod.rs b/crates/obs/src/system/mod.rs index 4fdf6fcd..7174c11e 100644 --- a/crates/obs/src/system/mod.rs +++ b/crates/obs/src/system/mod.rs @@ -13,7 +13,8 @@ // limitations under the License. use crate::{GlobalError, is_observability_enabled}; -use opentelemetry::global::meter; +use opentelemetry::{global::meter, metrics::Meter}; +use sysinfo::Pid; mod attributes; mod collector; @@ -30,7 +31,8 @@ impl SystemObserver { pub async fn init_process_observer() -> Result<(), GlobalError> { if is_observability_enabled() { let meter = meter("system"); - return SystemObserver::init_process_observer_for_pid(meter, 30000).await; + let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?; + return SystemObserver::init_process_observer_for_pid(meter, pid).await; } Ok(()) } @@ -38,9 +40,12 @@ impl SystemObserver { /// Initialize the metric collector for the specified PID process /// This function will create a new `Collector` instance and start collecting metrics. /// It will run indefinitely until the process is terminated. - pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> { - let pid = sysinfo::Pid::from_u32(pid); - let mut collector = collector::Collector::new(pid, meter, 30000)?; + pub async fn init_process_observer_for_pid(meter: Meter, pid: Pid) -> Result<(), GlobalError> { + let interval_ms = rustfs_utils::get_env_u64( + rustfs_config::observability::ENV_OBS_METRICS_SYSTEM_INTERVAL_MS, + rustfs_config::observability::DEFAULT_METRICS_SYSTEM_INTERVAL_MS, + ); + let mut collector = collector::Collector::new(pid, meter, interval_ms)?; collector.run().await } } diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 42d22e55..40ad7201 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -14,17 +14,13 @@ use crate::config::OtelConfig; use crate::global::IS_OBSERVABILITY_ENABLED; -use flexi_logger::{ - Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, - WriteMode::{AsyncWith, BufferAndFlush}, - style, -}; +use flexi_logger::{DeferredNow, Record, WriteMode, WriteMode::AsyncWith, style}; use metrics::counter; use nu_ansi_term::Color; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; -use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::{Compression, Protocol, WithExportConfig, WithHttpConfig}; use opentelemetry_sdk::{ Resource, logs::SdkLoggerProvider, @@ -35,22 +31,22 @@ use opentelemetry_semantic_conventions::{ SCHEMA_URL, attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, }; +use rustfs_config::observability::{ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA}; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL, - SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, + SAMPLE_RATIO, SERVICE_VERSION, observability::{ DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA, ENV_OBS_LOG_DIRECTORY, }, }; -use rustfs_utils::get_local_ip_with_default; +use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default}; use smallvec::SmallVec; use std::borrow::Cow; use std::io::IsTerminal; use std::time::Duration; use std::{env, fs}; use tracing::info; -use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; use tracing_subscriber::fmt::format::FmtSpan; @@ -74,20 +70,19 @@ pub struct OtelGuard { meter_provider: Option, logger_provider: Option, // Add a flexi_logger handle to keep the logging alive - _flexi_logger_handles: Option, + flexi_logger_handles: Option, // WorkerGuard for writing tracing files - _tracing_guard: Option, + tracing_guard: Option, } -// Implement debug manually and avoid relying on all fields to implement debug impl std::fmt::Debug for OtelGuard { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OtelGuard") .field("tracer_provider", &self.tracer_provider.is_some()) .field("meter_provider", &self.meter_provider.is_some()) .field("logger_provider", &self.logger_provider.is_some()) - .field("_flexi_logger_handles", &self._flexi_logger_handles.is_some()) - .field("_tracing_guard", &self._tracing_guard.is_some()) + .field("flexi_logger_handles", &self.flexi_logger_handles.is_some()) + .field("tracing_guard", &self.tracing_guard.is_some()) .finish() } } @@ -110,9 +105,42 @@ impl Drop for OtelGuard { eprintln!("Logger shutdown error: {err:?}"); } } + + if let Some(handle) = self.flexi_logger_handles.take() { + handle.shutdown(); + println!("flexi_logger shutdown completed"); + } + + if let Some(guard) = self.tracing_guard.take() { + // The guard will be dropped here, flushing any remaining logs + drop(guard); + println!("Tracing guard dropped, flushing logs."); + } } } +#[derive(Debug)] +pub enum TelemetryError { + BuildSpanExporter(String), + BuildMetricExporter(String), + BuildLogExporter(String), + InstallMetricsRecorder(String), + SubscriberInit(String), +} + +impl std::fmt::Display for TelemetryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TelemetryError::BuildSpanExporter(e) => write!(f, "Span exporter build failed: {e}"), + TelemetryError::BuildMetricExporter(e) => write!(f, "Metric exporter build failed: {e}"), + TelemetryError::BuildLogExporter(e) => write!(f, "Log exporter build failed: {e}"), + TelemetryError::InstallMetricsRecorder(e) => write!(f, "Install metrics recorder failed: {e}"), + TelemetryError::SubscriberInit(e) => write!(f, "Tracing subscriber init failed: {e}"), + } + } +} +impl std::error::Error for TelemetryError {} + /// create OpenTelemetry Resource fn resource(config: &OtelConfig) -> Resource { Resource::builder() @@ -141,456 +169,17 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader OtelGuard { - // avoid repeated access to configuration fields - let endpoint = &config.endpoint; - let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT); - - // Environment-aware stdout configuration - // Check for explicit environment control via RUSTFS_OBS_ENVIRONMENT - let is_production = environment.to_lowercase() == DEFAULT_OBS_ENVIRONMENT_PRODUCTION; - - // Default stdout behavior based on environment - let default_use_stdout = if is_production { - false // Disable stdout in production for security and log aggregation - } else { - USE_STDOUT // Use configured default for dev/test environments - }; - - let use_stdout = config.use_stdout.unwrap_or(default_use_stdout); - let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); - let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL); - let service_name = config.service_name.as_deref().unwrap_or(APP_NAME); - - // Configure flexi_logger to cut by time and size - let mut flexi_logger_handle = None; - if !endpoint.is_empty() { - // Pre-create resource objects to avoid repeated construction - let res = resource(config); - - // initialize tracer provider - let tracer_provider = { - let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); - let sampler = if (0.0..1.0).contains(&sample_ratio) { - Sampler::TraceIdRatioBased(sample_ratio) - } else { - Sampler::AlwaysOn - }; - - let builder = SdkTracerProvider::builder() - .with_sampler(sampler) - .with_id_generator(RandomIdGenerator::default()) - .with_resource(res.clone()); - - let tracer_provider = if endpoint.is_empty() { - builder - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) - .build() - } else { - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - .unwrap(); - - let builder = if use_stdout { - builder - .with_batch_exporter(exporter) - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) - } else { - builder.with_batch_exporter(exporter) - }; - - builder.build() - }; - - global::set_tracer_provider(tracer_provider.clone()); - tracer_provider - }; - - // initialize meter provider - let meter_provider = { - let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); - - if endpoint.is_empty() { - builder = builder.with_reader(create_periodic_reader(meter_interval)); - } else { - let exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) - .build() - .unwrap(); - - builder = builder.with_reader( - PeriodicReader::builder(exporter) - .with_interval(Duration::from_secs(meter_interval)) - .build(), - ); - - if use_stdout { - builder = builder.with_reader(create_periodic_reader(meter_interval)); - } - } - - let meter_provider = builder.build(); - global::set_meter_provider(meter_provider.clone()); - meter_provider - }; - - match metrics_exporter_opentelemetry::Recorder::builder("order-service").install_global() { - Ok(_) => {} - Err(e) => { - eprintln!("Failed to set global metrics recorder: {e:?}"); - } - } - - // initialize logger provider - let logger_provider = { - let mut builder = SdkLoggerProvider::builder().with_resource(res); - - if endpoint.is_empty() { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); - } else { - let exporter = opentelemetry_otlp::LogExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - .unwrap(); - - builder = builder.with_batch_exporter(exporter); - - if use_stdout { - builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); - } - } - - builder.build() - }; - - // configuring tracing - { - // configure the formatting layer - let fmt_layer = { - let enable_color = std::io::stdout().is_terminal(); - let mut layer = tracing_subscriber::fmt::layer() - .with_timer(LocalTime::rfc_3339()) - .with_target(true) - .with_ansi(enable_color) - .with_thread_names(true) - .with_thread_ids(true) - .with_file(true) - .with_line_number(true) - .json() - .with_current_span(true) - .with_span_list(true); - let span_event = if is_production { - FmtSpan::CLOSE - } else { - // Only add full span events tracking in the development environment - FmtSpan::FULL - }; - - layer = layer.with_span_events(span_event); - layer.with_filter(build_env_filter(logger_level, None)) - }; - - let filter = build_env_filter(logger_level, None); - let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None)); - let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); - - // Configure registry to avoid repeated calls to filter methods - tracing_subscriber::registry() - .with(filter) - .with(ErrorLayer::default()) - .with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) { - Some(fmt_layer) - } else { - None - }) - .with(OpenTelemetryLayer::new(tracer)) - .with(otel_layer) - .with(MetricsLayer::new(meter_provider.clone())) - .init(); - - if !endpoint.is_empty() { - info!( - "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", - endpoint, - logger_level, - env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) - ); - IS_OBSERVABILITY_ENABLED.set(true).ok(); - } - } - counter!("rustfs.start.total").increment(1); - return OtelGuard { - tracer_provider: Some(tracer_provider), - meter_provider: Some(meter_provider), - logger_provider: Some(logger_provider), - _flexi_logger_handles: flexi_logger_handle, - _tracing_guard: None, - }; - } - - // Obtain the log directory and file name configuration - let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY); - let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str()); - let log_filename = config.log_filename.as_deref().unwrap_or(service_name); - - // Enhanced error handling for directory creation - if let Err(e) = fs::create_dir_all(log_directory) { - eprintln!("ERROR: Failed to create log directory '{log_directory}': {e}"); - eprintln!("Ensure the parent directory exists and you have write permissions."); - eprintln!("Attempting to continue with logging, but file logging may fail."); - } else { - eprintln!("Log directory ready: {log_directory}"); - } - - #[cfg(unix)] - { - // Linux/macOS Setting Permissions with better error handling - use std::fs::Permissions; - use std::os::unix::fs::PermissionsExt; - match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) { - Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"), - Err(e) => { - eprintln!("WARNING: Failed to set log directory permissions for '{log_directory}': {e}"); - eprintln!("This may affect log file access. Consider checking directory ownership and permissions."); - } - } - } - - if endpoint.is_empty() && !is_production { - // Create a file appender (rolling by day), add the -tracing suffix to the file name to avoid conflicts - // let file_appender = tracing_appender::rolling::hourly(log_directory, format!("{log_filename}-tracing.log")); - let file_appender = RollingFileAppender::builder() - .rotation(Rotation::HOURLY) // rotate log files once every hour - .filename_prefix(format!("{log_filename}-tracing")) // log file names will be prefixed with `myapp.` - .filename_suffix("log") // log file names will be suffixed with `.log` - .build(log_directory) // try to build an appender that stores log files in `/var/log` - .expect("initializing rolling file appender failed"); - let (nb_writer, guard) = tracing_appender::non_blocking(file_appender); - - let enable_color = std::io::stdout().is_terminal(); - let fmt_layer = tracing_subscriber::fmt::layer() - .with_timer(LocalTime::rfc_3339()) - .with_target(true) - .with_ansi(enable_color) - .with_thread_names(true) - .with_thread_ids(true) - .with_file(true) - .with_line_number(true) - .with_writer(nb_writer) // Specify writing file - .json() - .with_current_span(true) - .with_span_list(true) - .with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id - - let env_filter = build_env_filter(logger_level, None); - - // Use registry() to register fmt_layer directly to ensure trace_id is output to the log - tracing_subscriber::registry() - .with(env_filter) - .with(ErrorLayer::default()) - .with(fmt_layer) - .with(if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) { - let stdout_fmt_layer = tracing_subscriber::fmt::layer() - .with_timer(LocalTime::rfc_3339()) - .with_target(true) - .with_thread_names(true) - .with_thread_ids(true) - .with_file(true) - .with_line_number(true) - .with_writer(std::io::stdout) // Specify writing file - .json() - .with_current_span(true) - .with_span_list(true) - .with_span_events(FmtSpan::CLOSE); // Log span lifecycle events, including trace_id; - Some(stdout_fmt_layer) - } else { - None - }) - .init(); - - info!("Tracing telemetry initialized for non-production with trace_id logging."); - IS_OBSERVABILITY_ENABLED.set(false).ok(); - - return OtelGuard { - tracer_provider: None, - meter_provider: None, - logger_provider: None, - _flexi_logger_handles: None, - _tracing_guard: Some(guard), - }; - } - - // Build log cutting conditions - let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { - // Cut by time and size at the same time - (Some(time), Some(size)) => { - let age = match time.to_lowercase().as_str() { - "hour" => Age::Hour, - "day" => Age::Day, - "minute" => Age::Minute, - "second" => Age::Second, - _ => Age::Day, // The default is by day - }; - Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes - } - // Cut by time only - (Some(time), None) => { - let age = match time.to_lowercase().as_str() { - "hour" => Age::Hour, - "day" => Age::Day, - "minute" => Age::Minute, - "second" => Age::Second, - _ => Age::Day, // The default is by day - }; - Criterion::Age(age) - } - // Cut by size only - (None, Some(size)) => { - Criterion::Size(size * 1024 * 1024) // Convert to bytes - } - // By default, it is cut by the day - _ => Criterion::Age(Age::Day), - }; - - // The number of log files retained - let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES); - - // Parsing the log level - let log_spec = LogSpecification::parse(logger_level).unwrap_or_else(|e| { - eprintln!("WARNING: Invalid logger level '{logger_level}': {e}. Using default 'info' level."); - LogSpecification::info() - }); - - // Environment-aware stdout configuration - // In production: disable stdout completely (Duplicate::None) - // In development/test: use level-based filtering - let level_filter = if is_production { - flexi_logger::Duplicate::None // No stdout output in production - } else { - // Convert the logger_level string to the corresponding LevelFilter for dev/test - match logger_level.to_lowercase().as_str() { - "trace" => flexi_logger::Duplicate::Trace, - "debug" => flexi_logger::Duplicate::Debug, - "info" => flexi_logger::Duplicate::Info, - "warn" | "warning" => flexi_logger::Duplicate::Warn, - "error" => flexi_logger::Duplicate::Error, - "off" => flexi_logger::Duplicate::None, - _ => flexi_logger::Duplicate::Info, // the default is info - } - }; - - // Choose write mode based on environment - let write_mode = if is_production { - get_env_async_with().unwrap_or_else(|| { - eprintln!( - "Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables." - ); - AsyncWith { - pool_capa: DEFAULT_OBS_LOG_POOL_CAPA, - message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA, - flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS), - } - }) - } else { - BufferAndFlush - }; - - // Configure the flexi_logger with enhanced error handling - let mut flexi_logger_builder = flexi_logger::Logger::try_with_env_or_str(logger_level) - .unwrap_or_else(|e| { - eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}"); - eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}"); - flexi_logger::Logger::with(log_spec.clone()) - }) - .log_to_file( - FileSpec::default() - .directory(log_directory) - .basename(log_filename) - .suppress_timestamp(), - ) - .rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into())) - .format_for_files(format_for_file) // Add a custom formatting function for file output - .write_mode(write_mode) - .append(); // Avoid clearing existing logs at startup - - // Environment-aware stdout configuration - flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter); - // Only add stdout formatting and startup messages in non-production environments - if !is_production { - flexi_logger_builder = flexi_logger_builder - .format_for_stdout(format_with_color) // Add a custom formatting function for terminal output - .print_message(); // Startup information output to console - } - - let flexi_logger_result = flexi_logger_builder.start(); - - if let Ok(logger) = flexi_logger_result { - // Save the logger handle to keep the logging - flexi_logger_handle = Some(logger); - - // Environment-aware success messages - if is_production { - eprintln!("Production logging initialized: file-only mode to {log_directory}/{log_filename}.log"); - eprintln!("Stdout logging disabled in production environment for security and log aggregation."); - } else { - eprintln!("Development/Test logging initialized with file logging to {log_directory}/{log_filename}.log"); - eprintln!("Stdout logging enabled for debugging. Environment: {environment}"); - } - - // Log rotation configuration details - match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { - (Some(time), Some(size)) => { - eprintln!("Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files") - } - (Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"), - (None, Some(size)) => { - eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files") - } - _ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"), - } - } else { - eprintln!("CRITICAL: Failed to initialize flexi_logger: {:?}", flexi_logger_result.err()); - eprintln!("Possible causes:"); - eprintln!(" 1. Insufficient permissions to write to log directory: {log_directory}"); - eprintln!(" 2. Log directory does not exist or is not accessible"); - eprintln!(" 3. Invalid log configuration parameters"); - eprintln!(" 4. Disk space issues"); - eprintln!("Application will continue but logging to files will not work properly."); - } - - OtelGuard { - tracer_provider: None, - meter_provider: None, - logger_provider: None, - _flexi_logger_handles: flexi_logger_handle, - _tracing_guard: None, - } -} - // Read the AsyncWith parameter from the environment variable fn get_env_async_with() -> Option { - let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA") - .ok() - .and_then(|v| v.parse::().ok()); - let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA") - .ok() - .and_then(|v| v.parse::().ok()); - let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::().ok()); + let pool_capa = get_env_usize(ENV_OBS_LOG_POOL_CAPA, DEFAULT_OBS_LOG_POOL_CAPA); + let message_capa = get_env_usize(ENV_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_MESSAGE_CAPA); + let flush_ms = get_env_u64(ENV_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_FLUSH_MS); - match (pool_capa, message_capa, flush_ms) { - (Some(pool), Some(msg), Some(flush)) => Some(AsyncWith { - pool_capa: pool, - message_capa: msg, - flush_interval: Duration::from_millis(flush), - }), - _ => None, - } + Some(AsyncWith { + pool_capa, + message_capa, + flush_interval: Duration::from_millis(flush_ms), + }) } fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter { @@ -655,9 +244,339 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R ) } +/// stdout + span information (fix: retain WorkerGuard to avoid releasing after initialization) +fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard { + let env_filter = build_env_filter(logger_level, None); + + let (nb, guard) = tracing_appender::non_blocking(std::io::stdout()); + + let enable_color = std::io::stdout().is_terminal(); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_timer(LocalTime::rfc_3339()) + .with_target(true) + .with_ansi(enable_color) + .with_thread_names(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true) + .with_writer(nb) + .json() + .with_current_span(true) + .with_span_list(true) + .with_span_events(if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL }); + + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with(fmt_layer) + .init(); + + IS_OBSERVABILITY_ENABLED.set(false).ok(); + counter!("rustfs.start.total").increment(1); + info!("Init stdout logging (level: {})", logger_level); + + OtelGuard { + tracer_provider: None, + meter_provider: None, + logger_provider: None, + flexi_logger_handles: None, + tracing_guard: Some(guard), + } +} + +/// File rolling log (size switching + number retained) +fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard { + use flexi_logger::{ + Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming, + WriteMode::{AsyncWith, BufferAndFlush}, + }; + + let service_name = config.service_name.as_deref().unwrap_or(APP_NAME); + let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY); + let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str()); + let log_filename = config.log_filename.as_deref().unwrap_or(service_name); + let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES); + + if let Err(e) = fs::create_dir_all(log_directory) { + eprintln!("ERROR: create log dir '{}': {e}", log_directory); + } + + #[cfg(unix)] + { + use std::fs::Permissions; + use std::os::unix::fs::PermissionsExt; + let _ = fs::set_permissions(log_directory, Permissions::from_mode(0o755)); + } + + // parsing level + let log_spec = LogSpecification::parse(logger_level) + .unwrap_or_else(|_| LogSpecification::parse(DEFAULT_LOG_LEVEL).unwrap_or(LogSpecification::error())); + + // Switch by size (MB), Build log cutting conditions + let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) { + // Cut by time and size at the same time + (Some(time), Some(size)) => { + let age = match time.to_lowercase().as_str() { + "hour" => Age::Hour, + "day" => Age::Day, + "minute" => Age::Minute, + "second" => Age::Second, + _ => Age::Day, // The default is by day + }; + Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes + } + // Cut by time only + (Some(time), None) => { + let age = match time.to_lowercase().as_str() { + "hour" => Age::Hour, + "day" => Age::Day, + "minute" => Age::Minute, + "second" => Age::Second, + _ => Age::Day, // The default is by day + }; + Criterion::Age(age) + } + // Cut by size only + (None, Some(size)) => { + Criterion::Size(size * 1024 * 1024) // Convert to bytes + } + // By default, it is cut by the day + _ => Criterion::Age(Age::Day), + }; + + // write mode + let write_mode = get_env_async_with().unwrap_or(if is_production { + AsyncWith { + pool_capa: DEFAULT_OBS_LOG_POOL_CAPA, + message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA, + flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS), + } + } else { + BufferAndFlush + }); + + // Build + let mut builder = flexi_logger::Logger::try_with_env_or_str(logger_level) + .unwrap_or_else(|e| { + eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}"); + eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}"); + flexi_logger::Logger::with(log_spec.clone()) + }) + .format_for_stderr(format_with_color) + .format_for_files(format_for_file) + .log_to_file( + FileSpec::default() + .directory(log_directory) + .basename(log_filename) + .suppress_timestamp(), + ) + .rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files)) + .write_mode(write_mode); + + // Optional copy to stdout (for local observation) + if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) { + builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::All); + } else { + builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::None); + } + + let handle = match builder.start() { + Ok(h) => Some(h), + Err(e) => { + eprintln!("ERROR: start flexi_logger failed: {e}"); + None + } + }; + + IS_OBSERVABILITY_ENABLED.set(false).ok(); + counter!("rustfs.start.total").increment(1); + info!( + "Init file logging at '{}', roll size {:?}MB, keep {}", + log_directory, config.log_rotation_size_mb, keep_files + ); + + OtelGuard { + tracer_provider: None, + meter_provider: None, + logger_provider: None, + flexi_logger_handles: handle, + tracing_guard: None, + } +} + +/// Observability (HTTP export, supports three sub-endpoints; if not, fallback to unified endpoint) +fn init_observability_http(config: &OtelConfig, logger_level: &str, is_production: bool) -> Result { + // Resources and sampling + let res = resource(config); + let service_name = config.service_name.as_deref().unwrap_or(APP_NAME).to_owned(); + let use_stdout = config.use_stdout.unwrap_or(!is_production); + let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); + let sampler = if (0.0..1.0).contains(&sample_ratio) { + Sampler::TraceIdRatioBased(sample_ratio) + } else { + Sampler::AlwaysOn + }; + + // Endpoint + let root_ep = config.endpoint.as_str(); + let trace_ep = config.trace_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep); + let metric_ep = config.metric_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep); + let log_ep = config.log_endpoint.as_deref().filter(|s| !s.is_empty()).unwrap_or(root_ep); + + // Tracer(HTTP) + let tracer_provider = { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_endpoint(trace_ep) + .with_protocol(Protocol::HttpBinary) + .with_compression(Compression::Zstd) + .build() + .map_err(|e| TelemetryError::BuildSpanExporter(e.to_string()))?; + + let mut builder = SdkTracerProvider::builder() + .with_sampler(sampler) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(res.clone()) + .with_batch_exporter(exporter); + + if use_stdout { + builder = builder.with_batch_exporter(opentelemetry_stdout::SpanExporter::default()); + } + + let provider = builder.build(); + global::set_tracer_provider(provider.clone()); + provider + }; + + // Meter(HTTP) + let meter_provider = { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_endpoint(metric_ep) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .with_protocol(Protocol::HttpBinary) + .with_compression(Compression::Zstd) + .build() + .map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?; + let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); + let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); + builder = builder.with_reader( + PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(meter_interval)) + .build(), + ); + if use_stdout { + builder = builder.with_reader(create_periodic_reader(meter_interval)); + } + + let provider = builder.build(); + global::set_meter_provider(provider.clone()); + provider + }; + + // metrics crate -> OTel + let _ = metrics_exporter_opentelemetry::Recorder::builder(service_name.clone()).install_global(); + + // Logger(HTTP) + let logger_provider = { + let exporter = opentelemetry_otlp::LogExporter::builder() + .with_http() + .with_endpoint(log_ep) + .with_protocol(Protocol::HttpBinary) + .with_compression(Compression::Zstd) + .build() + .map_err(|e| TelemetryError::BuildLogExporter(e.to_string()))?; + + let mut builder = SdkLoggerProvider::builder().with_resource(res); + builder = builder.with_batch_exporter(exporter); + if use_stdout { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); + } + builder.build() + }; + + // Tracing layer + let fmt_layer_opt = { + if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) { + let enable_color = std::io::stdout().is_terminal(); + let mut layer = tracing_subscriber::fmt::layer() + .with_timer(LocalTime::rfc_3339()) + .with_target(true) + .with_ansi(enable_color) + .with_thread_names(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true) + .json() + .with_current_span(true) + .with_span_list(true); + let span_event = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL }; + layer = layer.with_span_events(span_event); + Some(layer.with_filter(build_env_filter(logger_level, None))) + } else { + None + } + }; + + let filter = build_env_filter(logger_level, None); + let otel_bridge = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None)); + let tracer = tracer_provider.tracer(service_name.to_string()); + + tracing_subscriber::registry() + .with(filter) + .with(ErrorLayer::default()) + .with(fmt_layer_opt) + .with(OpenTelemetryLayer::new(tracer)) + .with(otel_bridge) + .with(MetricsLayer::new(meter_provider.clone())) + .init(); + + IS_OBSERVABILITY_ENABLED.set(true).ok(); + counter!("rustfs.start.total").increment(1); + info!( + "Init observability (HTTP): trace='{}', metric='{}', log='{}'", + trace_ep, metric_ep, log_ep + ); + + Ok(OtelGuard { + tracer_provider: Some(tracer_provider), + meter_provider: Some(meter_provider), + logger_provider: Some(logger_provider), + flexi_logger_handles: None, + tracing_guard: None, + }) +} + +/// Initialize Telemetry,Entrance: three rules +pub(crate) fn init_telemetry(config: &OtelConfig) -> Result { + let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT); + let is_production = environment.eq_ignore_ascii_case(DEFAULT_OBS_ENVIRONMENT_PRODUCTION); + let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL); + + // Rule 3: Observability (any endpoint is enabled if it is not empty) + let has_obs = !config.endpoint.is_empty() + || config.trace_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false) + || config.metric_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false) + || config.log_endpoint.as_deref().map(|s| !s.is_empty()).unwrap_or(false); + + if has_obs { + return init_observability_http(config, logger_level, is_production); + } + + // Rule 2: The user has explicitly customized the log directory (determined by whether ENV_OBS_LOG_DIRECTORY is set) + let user_set_log_dir = env::var(ENV_OBS_LOG_DIRECTORY).is_ok(); + if user_set_log_dir { + return Ok(init_file_logging(config, logger_level, is_production)); + } + + // Rule 1: Default stdout (error level) + Ok(init_stdout_logging(config, DEFAULT_LOG_LEVEL, is_production)) +} + #[cfg(test)] mod tests { use super::*; + use rustfs_config::USE_STDOUT; #[test] fn test_production_environment_detection() { diff --git a/crates/utils/src/envs.rs b/crates/utils/src/envs.rs new file mode 100644 index 00000000..bc05bf42 --- /dev/null +++ b/crates/utils/src/envs.rs @@ -0,0 +1,390 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// 8-bit type: signed i8 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing fails. +/// +/// #Returns +/// - `i8`: The parsed value as i8 if successful, otherwise the default value. +/// +pub fn get_env_i8(key: &str, default: i8) -> i8 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 8-bit type: signed i8 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as i8 if successful, otherwise None +/// +pub fn get_env_opt_i8(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// 8-bit type: unsigned u8 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing fails. +/// +/// #Returns +/// - `u8`: The parsed value as u8 if successful, otherwise the default value. +/// +pub fn get_env_u8(key: &str, default: u8) -> u8 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 8-bit type: unsigned u8 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as u8 if successful, otherwise None +/// +pub fn get_env_opt_u8(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// 16-bit type: signed i16 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing fails. +/// +/// #Returns +/// - `i16`: The parsed value as i16 if successful, otherwise the default value. +/// +pub fn get_env_i16(key: &str, default: i16) -> i16 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 16-bit type: signed i16 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as i16 if successful, otherwise None +/// +pub fn get_env_opt_i16(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// 16-bit type: unsigned u16 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing fails. +/// +/// #Returns +/// - `u16`: The parsed value as u16 if successful, otherwise the default value. +/// +pub fn get_env_u16(key: &str, default: u16) -> u16 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 16-bit type: unsigned u16 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as u16 if successful, otherwise None +/// +pub fn get_env_u16_opt(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 16-bit type: unsigned u16 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as u16 if successful, otherwise None +/// +pub fn get_env_opt_u16(key: &str) -> Option { + get_env_u16_opt(key) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// 32-bit type: signed i32 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `i32`: The parsed value as i32 if successful, otherwise the default value. +/// +pub fn get_env_i32(key: &str, default: i32) -> i32 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 32-bit type: signed i32 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as i32 if successful, otherwise None +/// +pub fn get_env_opt_i32(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// 32-bit type: unsigned u32 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing fails. +/// +/// #Returns +/// - `u32`: The parsed value as u32 if successful, otherwise the default value. +/// +pub fn get_env_u32(key: &str, default: u32) -> u32 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// 32-bit type: unsigned u32 +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as u32 if successful, otherwise None +/// +pub fn get_env_opt_u32(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing +/// +/// #Returns +/// - `f32`: The parsed value as f32 if successful, otherwise the default value +/// +pub fn get_env_f32(key: &str, default: f32) -> f32 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as f32 if successful, otherwise None +/// +pub fn get_env_opt_f32(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing +/// +/// #Returns +/// - `i64`: The parsed value as i64 if successful, otherwise the default value +/// +pub fn get_env_i64(key: &str, default: i64) -> i64 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as i64 if successful, otherwise None +/// +pub fn get_env_opt_i64(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, returning Option> if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option>`: The parsed value as i64 if successful, otherwise None +/// +pub fn get_env_opt_opt_i64(key: &str) -> Option> { + env::var(key).ok().map(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing +/// +/// #Returns +/// - `u64`: The parsed value as u64 if successful, otherwise the default value. +/// +pub fn get_env_u64(key: &str, default: u64) -> u64 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as u64 if successful, otherwise None +/// +pub fn get_env_opt_u64(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing +/// +/// #Returns +/// - `f64`: The parsed value as f64 if successful, otherwise the default value. +/// +pub fn get_env_f64(key: &str, default: f64) -> f64 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as f64 if successful, otherwise None +/// +pub fn get_env_opt_f64(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, with a default value if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default value to return if the environment variable is not set or parsing +/// +/// #Returns +/// - `usize`: The parsed value as usize if successful, otherwise the default value. +/// +pub fn get_env_usize(key: &str, default: usize) -> usize { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as usize if successful, otherwise None +/// +pub fn get_env_usize_opt(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +/// Retrieve an environment variable as a specific type, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed value as usize if successful, otherwise None +/// +pub fn get_env_opt_usize(key: &str) -> Option { + get_env_usize_opt(key) +} + +/// Retrieve an environment variable as a String, with a default value if not set. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default string value to return if the environment variable is not set. +/// +/// #Returns +/// - `String`: The environment variable value if set, otherwise the default value. +/// +pub fn get_env_str(key: &str, default: &str) -> String { + env::var(key).unwrap_or_else(|_| default.to_string()) +} + +/// Retrieve an environment variable as a String, returning None if not set. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The environment variable value if set, otherwise None. +/// +pub fn get_env_opt_str(key: &str) -> Option { + env::var(key).ok() +} + +/// Retrieve an environment variable as a boolean, with a default value if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// - `default`: The default boolean value to return if the environment variable is not set or cannot be parsed. +/// +/// #Returns +/// - `bool`: The parsed boolean value if successful, otherwise the default value. +/// +pub fn get_env_bool(key: &str, default: bool) -> bool { + env::var(key) + .ok() + .and_then(|v| match v.to_lowercase().as_str() { + "1" | "true" | "yes" => Some(true), + "0" | "false" | "no" => Some(false), + _ => None, + }) + .unwrap_or(default) +} + +/// Retrieve an environment variable as a boolean, returning None if not set or parsing fails. +/// +/// #Parameters +/// - `key`: The environment variable key to look up. +/// +/// #Returns +/// - `Option`: The parsed boolean value if successful, otherwise None. +/// +pub fn get_env_opt_bool(key: &str) -> Option { + env::var(key).ok().and_then(|v| match v.to_lowercase().as_str() { + "1" | "true" | "yes" => Some(true), + "0" | "false" | "no" => Some(false), + _ => None, + }) +} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 627c8aa1..ecf92c44 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -81,8 +81,8 @@ pub mod sys; #[cfg(feature = "sys")] pub use sys::user_agent::*; -#[cfg(feature = "sys")] -pub use sys::envs::*; - #[cfg(feature = "notify")] pub use notify::*; + +mod envs; +pub use envs::*; diff --git a/crates/utils/src/sys/envs.rs b/crates/utils/src/sys/envs.rs deleted file mode 100644 index 6f7fea5e..00000000 --- a/crates/utils/src/sys/envs.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::env; - -pub fn get_env_usize(key: &str, default: usize) -> usize { - env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) -} -pub fn get_env_u64(key: &str, default: u64) -> u64 { - env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) -} -pub fn get_env_u32(key: &str, default: u32) -> u32 { - env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) -} -pub fn get_env_str(key: &str, default: &str) -> String { - env::var(key).unwrap_or_else(|_| default.to_string()) -} -pub fn get_env_opt_u64(key: &str) -> Option { - env::var(key).ok().and_then(|v| v.parse().ok()) -} - -pub fn get_env_bool(key: &str, default: bool) -> bool { - env::var(key) - .ok() - .and_then(|v| match v.to_lowercase().as_str() { - "1" | "true" | "yes" => Some(true), - "0" | "false" | "no" => Some(false), - _ => None, - }) - .unwrap_or(default) -} diff --git a/crates/utils/src/sys/mod.rs b/crates/utils/src/sys/mod.rs index 23a3fe33..492617c2 100644 --- a/crates/utils/src/sys/mod.rs +++ b/crates/utils/src/sys/mod.rs @@ -12,5 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod envs; pub(crate) mod user_agent; diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index bc19148e..edcc61a9 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -89,7 +89,6 @@ const LOGO: &str = r#" #[instrument] fn print_server_info() { let current_year = chrono::Utc::now().year(); - // Use custom macros to print server information info!("RustFS Object Storage Server"); info!("Copyright: 2024-{} RustFS, Inc", current_year); @@ -112,10 +111,13 @@ async fn async_main() -> Result<()> { init_license(opt.license.clone()); // Initialize Observability - let guard = init_obs(Some(opt.clone().obs_endpoint)).await; - - // print startup logo - info!("{}", LOGO); + let guard = match init_obs(Some(opt.clone().obs_endpoint)).await { + Ok(g) => g, + Err(e) => { + println!("Failed to initialize observability: {}", e); + return Err(Error::other(e)); + } + }; // Store in global storage match set_global_guard(guard).map_err(Error::other) { @@ -126,6 +128,9 @@ async fn async_main() -> Result<()> { } } + // print startup logo + info!("{}", LOGO); + // Initialize performance profiling if enabled #[cfg(not(target_os = "windows"))] profiling::init_from_env().await; diff --git a/scripts/run.sh b/scripts/run.sh index 9d0ba5d4..e397b4f4 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -51,7 +51,14 @@ export RUSTFS_CONSOLE_ADDRESS=":9001" # export RUSTFS_TLS_PATH="./deploy/certs" # Observability related configuration -#export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector address +#export RUSTFS_OBS_ENDPOINT=http://localhost:4318 # OpenTelemetry Collector address +# RustFS OR OTEL exporter configuration +#export RUSTFS_OBS_TRACE_ENDPOINT=http://localhost:4318 # OpenTelemetry Collector trace address http://localhost:4318/v1/traces +#export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces +#export RUSTFS_OBS_METRIC_ENDPOINT=http://localhost:9090/api/v1/otlp # OpenTelemetry Collector metric address +#export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://localhost:9090/api/v1/otlp/v1/metrics +#export RUSTFS_OBS_LOG_ENDPOINT=http://loki:3100/otlp # OpenTelemetry Collector logs address http://loki:3100/otlp/v1/logs +#export OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=http://loki:3100/otlp/v1/logs #export RUSTFS_OBS_USE_STDOUT=false # Whether to use standard output #export RUSTFS_OBS_SAMPLE_RATIO=2.0 # Sample ratio, between 0.0-1.0, 0.0 means no sampling, 1.0 means full sampling #export RUSTFS_OBS_METER_INTERVAL=1 # Sampling interval in seconds @@ -60,7 +67,7 @@ export RUSTFS_CONSOLE_ADDRESS=":9001" export RUSTFS_OBS_ENVIRONMENT=develop # Environment name export RUSTFS_OBS_LOGGER_LEVEL=info # Log level, supports trace, debug, info, warn, error export RUSTFS_OBS_LOG_STDOUT_ENABLED=true # Whether to enable local stdout logging -export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory +#export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day" export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB export RUSTFS_OBS_LOG_POOL_CAPA=10240