improve code for observability

This commit is contained in:
houseme
2025-03-12 00:46:01 +08:00
parent 6727e15055
commit 3b2df514a7
14 changed files with 1480 additions and 690 deletions

276
Cargo.lock generated
View File

@@ -176,6 +176,12 @@ dependencies = [
"password-hash",
]
[[package]]
name = "arraydeque"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
[[package]]
name = "arrayvec"
version = "0.7.6"
@@ -352,9 +358,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
[[package]]
name = "async-trait"
version = "0.1.86"
version = "0.1.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d"
checksum = "d556ec1359574147ec0c4fc5eb525f3f23263a592b1a9c07e0a75b427de55c97"
dependencies = [
"proc-macro2",
"quote",
@@ -486,6 +492,12 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "base64"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.1"
@@ -907,6 +919,25 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "config"
version = "0.15.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb07d21d12f9f0bc5e7c3e97ccc78b2341b9b4a4604eac3ed7c1d0d6e2c3b23e"
dependencies = [
"async-trait",
"convert_case 0.6.0",
"json5",
"pathdiff",
"ron",
"rust-ini",
"serde",
"serde_json",
"toml",
"winnow 0.7.3",
"yaml-rust2",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.7"
@@ -923,6 +954,26 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cb3c4a0d3776f7535c32793be81d6d5fec0d48ac70955d9834e643aa249a52f"
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom 0.2.15",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "const-serialize"
version = "0.6.2"
@@ -1135,7 +1186,7 @@ dependencies = [
"serde_json",
"sha2 0.10.8",
"test-case",
"thiserror 2.0.11",
"thiserror 2.0.12",
"time",
]
@@ -1415,7 +1466,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b0cca3e7a10a4a3df37ea52c4cc7a53e5c9233489e03ee3f2829471fc3099a"
dependencies = [
"async-trait",
"base64",
"base64 0.22.1",
"cocoa 0.25.0",
"core-foundation 0.9.4",
"dioxus-cli-config",
@@ -1509,7 +1560,7 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe99b48a1348eec385b5c4bd3e80fd863b0d3b47257d34e2ddc58754dec5d128"
dependencies = [
"base64",
"base64 0.22.1",
"bytes",
"ciborium",
"dioxus-desktop",
@@ -1850,6 +1901,15 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "dlv-list"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
dependencies = [
"const-random",
]
[[package]]
name = "downcast-rs"
version = "1.2.1"
@@ -1948,7 +2008,7 @@ dependencies = [
"sha2 0.11.0-pre.4",
"siphasher 1.0.1",
"tempfile",
"thiserror 2.0.11",
"thiserror 2.0.12",
"time",
"tokio",
"tokio-stream",
@@ -2132,6 +2192,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f"
[[package]]
name = "foreign-types"
version = "0.5.0"
@@ -2720,6 +2786,18 @@ name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
dependencies = [
"foldhash",
]
[[package]]
name = "hashlink"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
dependencies = [
"hashbrown 0.15.2",
]
[[package]]
name = "heck"
@@ -2962,7 +3040,7 @@ dependencies = [
"serde_json",
"strum",
"test-case",
"thiserror 2.0.11",
"thiserror 2.0.12",
"time",
"tokio",
"tracing",
@@ -3287,13 +3365,24 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json5"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
dependencies = [
"pest",
"pest_derive",
"serde",
]
[[package]]
name = "jsonwebtoken"
version = "9.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde"
dependencies = [
"base64",
"base64 0.22.1",
"js-sys",
"pem",
"ring",
@@ -4218,7 +4307,7 @@ dependencies = [
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tracing",
]
@@ -4265,7 +4354,7 @@ dependencies = [
"opentelemetry_sdk",
"prost",
"reqwest",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tonic",
"tracing",
@@ -4301,7 +4390,7 @@ dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"serde",
"thiserror 2.0.11",
"thiserror 2.0.12",
]
[[package]]
@@ -4319,7 +4408,7 @@ dependencies = [
"percent-encoding",
"rand 0.8.5",
"serde_json",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
"tracing",
@@ -4331,6 +4420,16 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "ordered-multimap"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79"
dependencies = [
"dlv-list",
"hashbrown 0.14.5",
]
[[package]]
name = "ordered-stream"
version = "0.2.0"
@@ -4473,6 +4572,12 @@ dependencies = [
"once_cell",
]
[[package]]
name = "pathdiff"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -4489,7 +4594,7 @@ version = "3.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3"
dependencies = [
"base64",
"base64 0.22.1",
"serde",
]
@@ -4499,6 +4604,51 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pest"
version = "2.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc"
dependencies = [
"memchr",
"thiserror 2.0.12",
"ucd-trie",
]
[[package]]
name = "pest_derive"
version = "2.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "816518421cfc6887a0d62bf441b6ffb4536fcc926395a69e1a85852d4363f57e"
dependencies = [
"pest",
"pest_generator",
]
[[package]]
name = "pest_generator"
version = "2.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d1396fd3a870fc7838768d171b4616d5c91f6cc25e377b673d714567d99377b"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.98",
]
[[package]]
name = "pest_meta"
version = "2.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1e58089ea25d717bfd31fb534e4f3afcc2cc569c70de3e239778991ea3b7dea"
dependencies = [
"once_cell",
"pest",
"sha2 0.10.8",
]
[[package]]
name = "petgraph"
version = "0.7.1"
@@ -4952,7 +5102,7 @@ dependencies = [
"rustc-hash 2.1.1",
"rustls",
"socket2",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tracing",
]
@@ -4971,7 +5121,7 @@ dependencies = [
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tinyvec",
"tracing",
"web-time",
@@ -4988,7 +5138,7 @@ dependencies = [
"once_cell",
"socket2",
"tracing",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -5164,7 +5314,7 @@ dependencies = [
"pin-project-lite",
"s3s",
"sha2 0.11.0-pre.4",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tracing",
]
@@ -5206,7 +5356,7 @@ checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom 0.2.15",
"libredox",
"thiserror 2.0.11",
"thiserror 2.0.12",
]
[[package]]
@@ -5274,7 +5424,7 @@ version = "0.12.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da"
dependencies = [
"base64",
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-channel",
@@ -5401,6 +5551,18 @@ dependencies = [
"serde",
]
[[package]]
name = "ron"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
dependencies = [
"base64 0.21.7",
"bitflags 2.9.0",
"serde",
"serde_derive",
]
[[package]]
name = "rust-embed"
version = "8.6.0"
@@ -5436,6 +5598,17 @@ dependencies = [
"walkdir",
]
[[package]]
name = "rust-ini"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f"
dependencies = [
"cfg-if",
"ordered-multimap",
"trim-in-place",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@@ -5553,7 +5726,10 @@ dependencies = [
name = "rustfs-obs"
version = "0.0.1"
dependencies = [
"async-trait",
"chrono",
"config",
"local-ip-address",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-otlp",
@@ -5564,8 +5740,10 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"thiserror 2.0.12",
"tokio",
"tracing",
"tracing-core",
"tracing-opentelemetry",
"tracing-subscriber",
]
@@ -5676,7 +5854,7 @@ dependencies = [
"smallvec",
"std-next",
"sync_wrapper",
"thiserror 2.0.11",
"thiserror 2.0.12",
"time",
"tokio",
"tower 0.5.2",
@@ -5694,7 +5872,7 @@ dependencies = [
"indexmap 2.7.1",
"serde",
"serde_json",
"thiserror 2.0.11",
"thiserror 2.0.12",
]
[[package]]
@@ -6066,7 +6244,7 @@ checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb"
dependencies = [
"num-bigint",
"num-traits",
"thiserror 2.0.11",
"thiserror 2.0.12",
"time",
]
@@ -6197,7 +6375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bafdb55260d9b29c04fa52351e0db2a4aaeadc462cd884ccd7771c5a31aaf1aa"
dependencies = [
"simdutf8",
"thiserror 2.0.11",
"thiserror 2.0.12",
]
[[package]]
@@ -6467,11 +6645,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.11"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
dependencies = [
"thiserror-impl 2.0.11",
"thiserror-impl 2.0.12",
]
[[package]]
@@ -6487,9 +6665,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.11"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
dependencies = [
"proc-macro2",
"quote",
@@ -6539,6 +6717,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]]
name = "tinystr"
version = "0.7.6"
@@ -6566,9 +6753,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.43.0"
version = "1.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a"
dependencies = [
"backtrace",
"bytes",
@@ -6694,7 +6881,7 @@ dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"base64 0.22.1",
"bytes",
"flate2",
"h2",
@@ -6960,10 +7147,16 @@ dependencies = [
"objc2-foundation 0.3.0",
"once_cell",
"png",
"thiserror 2.0.11",
"thiserror 2.0.12",
"windows-sys 0.59.0",
]
[[package]]
name = "trim-in-place"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
[[package]]
name = "try-lock"
version = "0.2.5"
@@ -6994,6 +7187,12 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
name = "ucd-trie"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971"
[[package]]
name = "uds_windows"
version = "1.1.0"
@@ -7870,7 +8069,7 @@ version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac0099a336829fbf54c26b5f620c68980ebbe37196772aeaf6118df4931b5cb0"
dependencies = [
"base64",
"base64 0.22.1",
"block",
"cocoa 0.26.0",
"core-graphics 0.24.0",
@@ -7941,6 +8140,17 @@ version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
[[package]]
name = "yaml-rust2"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232bdb534d65520716bef0bbb205ff8f2db72d807b19c0bc3020853b92a0cd4b"
dependencies = [
"arraydeque",
"encoding_rs",
"hashlink",
]
[[package]]
name = "yoke"
version = "0.7.5"

View File

@@ -32,12 +32,13 @@ all = "warn"
[workspace.dependencies]
madmin = { path = "./madmin" }
async-trait = "0.1.86"
async-trait = "0.1.87"
backon = "1.3.0"
bytes = "1.9.0"
bytesize = "1.3.0"
chrono = { version = "0.4.40", features = ["serde"] }
clap = { version = "4.5.31", features = ["derive", "env"] }
config = "0.15.9"
dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
ecstore = { path = "./ecstore" }
@@ -63,7 +64,7 @@ local-ip-address = "0.6.3"
mime = "0.3.17"
netif = "0.1.6"
opentelemetry = { version = "0.28" }
opentelemetry-appender-tracing = { version = "0.28.1" }
opentelemetry-appender-tracing = { version = "0.28.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { version = "0.28" }
opentelemetry-stdout = { version = "0.28.0" }
opentelemetry-otlp = { version = "0.28" }
@@ -76,7 +77,7 @@ prost-types = "0.13.4"
protobuf = "3.7"
protos = { path = "./common/protos" }
rand = "0.8.5"
reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream"] }
reqwest = { version = "0.12.12", default-features = false, features = ["json", "rustls-tls", "charset", "http2", "macos-system-configuration", "stream"] }
rdkafka = { version = "0.37", features = ["tokio"] }
rfd = { version = "0.15.2", default-features = false, features = ["xdg-portal", "tokio"] }
rmp = "0.8.14"
@@ -92,7 +93,7 @@ serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"
sha2 = "0.10.8"
tempfile = "3.16.0"
thiserror = "2.0.11"
thiserror = "2.0.12"
time = { version = "0.3.37", features = [
"std",
"parsing",
@@ -100,13 +101,14 @@ time = { version = "0.3.37", features = [
"macros",
"serde",
] }
tokio = { version = "1.43.0", features = ["fs", "rt-multi-thread"] }
tokio = { version = "1.44.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.12.3", features = ["gzip"] }
tonic-build = "0.12.3"
tonic-reflection = "0.12"
tokio-stream = "0.1.17"
tower = { version = "0.5.2", features = ["timeout"] }
tracing = "0.1.41"
tracing-core = "0.1.33"
tracing-error = "0.2.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
tracing-appender = "0.2.3"

View File

@@ -10,11 +10,15 @@ version.workspace = true
workspace = true
[features]
default = []
audit-kafka = ["dep:rdkafka", "dep:serde_json"]
audit-webhook = ["dep:reqwest"]
default = ["file"]
kafka = ["dep:rdkafka", "dep:serde_json"]
webhook = ["dep:reqwest"]
file = []
[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
@@ -23,13 +27,15 @@ opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics"] }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
serde = { workspace = true }
tracing = { workspace = true }
tracing-core = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["sync", "fs"] }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, features = ["json"], optional = true }
reqwest = { workspace = true, optional = true, default-features = false }
serde_json = { workspace = true, optional = true }
thiserror = { workspace = true }
local-ip-address = { workspace = true }
[dev-dependencies]

View File

@@ -0,0 +1,31 @@
[observability]
endpoint = "http://localhost:4317"
use_stdout = true
sample_ratio = 0.5
meter_interval = 30
service_name = "logging_service"
service_version = "0.1.0"
deployment_environment = "develop"
[sinks]
[sinks.kafka]
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
[sinks.webhook]
enabled = false
url = "http://localhost:8080/webhook"
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "app.log"
batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[logger]
queue_capacity = 10000

View File

@@ -0,0 +1,78 @@
use opentelemetry::global;
use opentelemetry::trace::TraceContextExt;
use rustfs_obs::{init_logging, load_config, LogEntry};
use std::time::{Duration, SystemTime};
use tracing::{info, instrument, Span};
use tracing_core::Level;
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[tokio::main]
async fn main() {
let start_time = SystemTime::now();
let config = load_config(Some("packages/obs/examples/config".to_string()));
info!("Configuration file loading is complete {:?}", config.clone());
let (logger, _guard) = init_logging(config);
info!("Log module initialization is completed");
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
// Record Metrics
let meter = global::meter("rustfs.rs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "put_object")],
);
// Gets the current span
let span = Span::current();
// Use 'OpenTelemetrySpanExt' to get 'SpanContext'
let span_context = span.context(); // Get context via OpenTelemetrySpanExt
let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId
let trace_id = span_context.span().span_context().trace_id().to_string(); // Get the TraceId
let result = logger
.log(LogEntry::new(
Level::INFO,
"Process user requests".to_string(),
"api_handler".to_string(),
Some("req-12345".to_string()),
Some("user-6789".to_string()),
vec![
("endpoint".to_string(), "/api/v1/data".to_string()),
("method".to_string(), "GET".to_string()),
("span_id".to_string(), span_id),
("trace_id".to_string(), trace_id),
],
))
.await;
info!("Logging is completed {:?}", result);
put_object("bucket".to_string(), "object".to_string(), "user".to_string()).await;
info!("Logging is completed");
tokio::time::sleep(Duration::from_secs(2)).await;
info!("Program ends");
}
#[instrument(fields(bucket, object, user))]
async fn put_object(bucket: String, object: String, user: String) {
let start_time = SystemTime::now();
info!("Starting PUT operation");
// Gets the current span
let span = Span::current();
// Use 'OpenTelemetrySpanExt' to get 'SpanContext'
let span_context = span.context(); // Get context via OpenTelemetrySpanExt
let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId
let trace_id = span_context.span().span_context().trace_id().to_string(); // Get the TraceId
info!(
"Starting PUT operation content: bucket = {}, object = {}, user = {},span_id = {},trace_id = {},start_time = {}",
bucket,
object,
user,
span_id,
trace_id,
start_time.elapsed().unwrap().as_secs_f64()
);
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
info!("PUT operation completed");
}

View File

@@ -1,319 +0,0 @@
#[cfg(feature = "audit-kafka")]
use rdkafka::{
producer::{FutureProducer, FutureRecord},
ClientConfig,
};
#[cfg(feature = "audit-webhook")]
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
/// AuditEntry is a struct that represents an audit entry
/// that can be logged
/// # Fields
/// * `version` - The version of the audit entry
/// * `event_type` - The type of event that occurred
/// * `bucket` - The bucket that was accessed
/// * `object` - The object that was accessed
/// * `user` - The user that accessed the object
/// * `time` - The time the event occurred
/// * `user_agent` - The user agent that accessed the object
/// * `span_id` - The span ID of the event
/// # Example
/// ```
/// use rustfs_obs::AuditEntry;
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// ```
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AuditEntry {
pub version: String,
pub event_type: String,
pub bucket: String,
pub object: String,
pub user: String,
pub time: String,
pub user_agent: String,
pub span_id: String,
}
/// AuditTarget is a trait that defines the interface for audit targets
/// that can receive audit entries
pub trait AuditTarget: Send + Sync {
fn send(&self, entry: AuditEntry);
}
/// FileAuditTarget is an audit target that logs audit entries to a file
pub struct FileAuditTarget;
impl AuditTarget for FileAuditTarget {
/// Send an audit entry to a file
/// # Arguments
/// * `entry` - The audit entry to send
///
/// # Example
/// ```
/// use rustfs_obs::{AuditEntry, AuditTarget, FileAuditTarget};
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// FileAuditTarget.send(entry);
/// ```
fn send(&self, entry: AuditEntry) {
println!("File audit: {:?}", entry);
}
}
#[cfg(feature = "audit-webhook")]
/// Webhook audit objectives
/// #Arguments
/// * `client` - The reqwest client
/// * `url` - The URL of the webhook
/// # Example
/// ```
/// use rustfs_obs::WebhookAuditTarget;
/// let target = WebhookAuditTarget::new("http://localhost:8080");
/// ```
pub struct WebhookAuditTarget {
client: Client,
url: String,
}
#[cfg(feature = "audit-webhook")]
impl WebhookAuditTarget {
pub fn new(url: &str) -> Self {
Self {
client: Client::new(),
url: url.to_string(),
}
}
}
#[cfg(feature = "audit-webhook")]
impl AuditTarget for WebhookAuditTarget {
fn send(&self, entry: AuditEntry) {
let client = self.client.clone();
let url = self.url.clone();
tokio::spawn(async move {
if let Err(e) = client.post(&url).json(&entry).send().await {
eprintln!("Failed to send to Webhook: {:?}", e);
}
});
}
}
#[cfg(feature = "audit-kafka")]
/// Kafka audit objectives
/// # Arguments
/// * `producer` - The Kafka producer
/// * `topic` - The Kafka topic
/// # Example
/// ```
/// use rustfs_obs::KafkaAuditTarget;
/// let target = KafkaAuditTarget::new("localhost:9092", "minio-audit");
/// ```
/// # Note
/// This feature requires the `rdkafka` crate
/// # Example
/// ```toml
/// [dependencies]
/// rdkafka = "0.26.0"
/// rustfs_obs = { version = "0.1.0", features = ["audit-kafka"] }
/// ```
/// # Note
/// The `rdkafka` crate requires the `librdkafka` library to be installed
/// # Example
/// ```sh
/// sudo apt-get install librdkafka-dev
/// ```
/// # Note
/// The `rdkafka` crate requires the `libssl-dev` and `pkg-config` packages to be installed
/// # Example
/// ```sh
/// sudo apt-get install libssl-dev pkg-config
/// ```
/// # Note
/// The `rdkafka` crate requires the `zlib1g-dev` package to be installed
/// # Example
/// ```sh
/// sudo apt-get install zlib1g-dev
/// ```
/// # Note
/// The `rdkafka` crate requires the `zstd` package to be installed
/// # Example
/// ```sh
/// sudo apt-get install zstd
/// ```
/// # Note
/// The `rdkafka` crate requires the `lz4` package to be installed
/// # Example
/// ```sh
/// sudo apt-get install lz4
/// ```
pub struct KafkaAuditTarget {
producer: FutureProducer,
topic: String,
}
#[cfg(feature = "audit-kafka")]
impl KafkaAuditTarget {
pub fn new(brokers: &str, topic: &str) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Kafka producer creation failed");
Self {
producer,
topic: topic.to_string(),
}
}
}
#[cfg(feature = "audit-kafka")]
impl AuditTarget for KafkaAuditTarget {
fn send(&self, entry: AuditEntry) {
let topic = self.topic.clone();
let span_id = entry.span_id.clone();
let payload = serde_json::to_string(&entry).unwrap();
// let record = FutureRecord::to(&topic).payload(&payload).key(&span_id);
tokio::spawn({
// 在异步闭包内部创建 record
let topic = topic;
let payload = payload;
let span_id = span_id;
let producer = self.producer.clone();
async move {
let record = FutureRecord::to(&topic).payload(&payload).key(&span_id);
if let Err(e) = producer.send(record, std::time::Duration::from_secs(0)).await {
eprintln!("Failed to send to Kafka: {:?}", e);
}
}
});
}
}
/// AuditLogger is a logger that logs audit entries
/// to multiple targets
///
/// # Example
/// ```
/// use rustfs_obs::{AuditEntry, AuditLogger, FileAuditTarget};
///
/// #[tokio::main]
/// async fn main() {
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// logger.log(entry).await;
/// }
/// ```
#[derive(Debug)]
/// AuditLogger is a logger that logs audit entries
/// to multiple targets
/// # Example
/// ```
/// use rustfs_obs::{AuditEntry, AuditLogger, FileAuditTarget};
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// ```
/// # Note
/// This feature requires the `tokio` crate
/// # Example
/// ```toml
/// [dependencies]
/// tokio = { version = "1", features = ["full"] }
/// rustfs_obs = { version = "0.1.0"}
/// ```
/// # Note
/// This feature requires the `serde` crate
/// # Example
/// ```toml
/// [dependencies]
/// serde = { version = "1", features = ["derive"] }
/// rustfs_obs = { version = "0.1.0"}
/// ```
pub struct AuditLogger {
tx: mpsc::Sender<AuditEntry>,
}
impl AuditLogger {
/// Create a new AuditLogger with the given targets
/// that will receive audit entries
/// # Arguments
/// * `targets` - A vector of audit targets
/// # Returns
/// * An AuditLogger
/// # Example
/// ```
/// use rustfs_obs::{AuditLogger, AuditEntry, FileAuditTarget};
///
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// ```
pub fn new(targets: Vec<Box<dyn AuditTarget>>) -> Self {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(1000);
tokio::spawn(async move {
while let Some(entry) = rx.recv().await {
for target in &targets {
target.send(entry.clone());
}
}
});
Self { tx }
}
/// Log an audit entry
/// # Arguments
/// * `entry` - The audit entry to log
/// # Example
/// ```
/// use rustfs_obs::{AuditEntry, AuditLogger, FileAuditTarget};
///
/// #[tokio::main]
/// async fn main() {
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// logger.log(entry).await;
/// }
/// ```
pub async fn log(&self, entry: AuditEntry) {
// 将日志消息记录到当前 Span
tracing::Span::current()
.record("log_message", &entry.bucket)
.record("source", &entry.event_type);
let _ = self.tx.send(entry).await;
}
}

View File

@@ -0,0 +1,90 @@
use config::{Config, File, FileFormat};
use serde::Deserialize;
use std::env;
/// OpenTelemetry Configuration
#[derive(Debug, Deserialize, Clone, Default)]
pub struct OtelConfig {
pub endpoint: String,
pub use_stdout: bool,
pub sample_ratio: f64,
pub meter_interval: u64,
pub service_name: String,
pub service_version: String,
pub deployment_environment: String,
}
/// Kafka Sink Configuration - Add batch parameters
#[derive(Debug, Deserialize, Clone)]
pub struct KafkaSinkConfig {
pub enabled: bool,
pub bootstrap_servers: String,
pub topic: String,
pub batch_size: Option<usize>, // Batch size, default 100
pub batch_timeout_ms: Option<u64>, // Batch timeout time, default 1000ms
}
/// Webhook Sink Configuration - Add Retry Parameters
#[derive(Debug, Deserialize, Clone)]
pub struct WebhookSinkConfig {
pub enabled: bool,
pub url: String,
pub max_retries: Option<usize>, // Maximum number of retry times, default 3
pub retry_delay_ms: Option<u64>, // Retry the delay cardinality, default 100ms
}
/// File Sink Configuration - Add buffering parameters
#[derive(Debug, Deserialize, Clone)]
pub struct FileSinkConfig {
pub enabled: bool,
pub path: String,
pub buffer_size: Option<usize>, // Write buffer size, default 8192
pub flush_interval_ms: Option<u64>, // Refresh interval time, default 1000ms
pub flush_threshold: Option<usize>, // Refresh threshold, default 100 logs
}
/// Sink configuration collection
#[derive(Debug, Deserialize, Clone)]
pub struct SinkConfig {
pub kafka: KafkaSinkConfig,
pub webhook: WebhookSinkConfig,
pub file: FileSinkConfig,
}
///Logger Configuration
#[derive(Debug, Deserialize, Clone)]
pub struct LoggerConfig {
pub queue_capacity: Option<usize>,
}
/// Overall application configuration
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub observability: OtelConfig,
pub sinks: SinkConfig,
pub logger: LoggerConfig,
}
/// Loading the configuration file
/// Supports TOML, YAML and .env formats, read in order by priority
pub fn load_config(config_dir: Option<String>) -> AppConfig {
let config_dir = config_dir.unwrap_or_else(|| {
env::current_dir()
.map(|path| path.to_string_lossy().to_string())
.unwrap_or_else(|_| {
eprintln!("Warning: Failed to get current directory, using empty path");
String::new()
})
});
println!("config_dir: {}", config_dir);
let config = Config::builder()
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml))
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false))
.add_source(config::Environment::with_prefix(""))
.build()
.unwrap();
config.try_deserialize().unwrap()
}

80
packages/obs/src/entry.rs Normal file
View File

@@ -0,0 +1,80 @@
use chrono::{DateTime, Utc};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing_core::Level;
/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize`
#[derive(Debug, Clone)]
pub struct SerializableLevel(pub Level);
impl From<Level> for SerializableLevel {
fn from(level: Level) -> Self {
SerializableLevel(level)
}
}
impl From<SerializableLevel> for Level {
fn from(serializable_level: SerializableLevel) -> Self {
serializable_level.0
}
}
impl Serialize for SerializableLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.as_str())
}
}
impl<'de> Deserialize<'de> for SerializableLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"TRACE" => Ok(SerializableLevel(Level::TRACE)),
"DEBUG" => Ok(SerializableLevel(Level::DEBUG)),
"INFO" => Ok(SerializableLevel(Level::INFO)),
"WARN" => Ok(SerializableLevel(Level::WARN)),
"ERROR" => Ok(SerializableLevel(Level::ERROR)),
_ => Err(D::Error::custom("unknown log level")),
}
}
}
/// Server log entry structure
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>, // Log timestamp
pub level: SerializableLevel, // Log Level
pub message: String, // Log messages
pub source: String, // Log source (such as module name)
pub request_id: Option<String>, // Request ID (Common Server Fields)
pub user_id: Option<String>, // User ID (Common Server Fields)
pub fields: Vec<(String, String)>, // Attached fields (key value pairs)
}
impl LogEntry {
/// Create a new LogEntry
pub fn new(
level: Level,
message: String,
source: String,
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
) -> Self {
LogEntry {
timestamp: Utc::now(),
level: SerializableLevel::from(level),
message,
source,
request_id,
user_id,
fields,
}
}
}

View File

@@ -1,190 +1,32 @@
//! Logging utilities
/// # obs
///
/// This crate provides utilities for logging.
///
/// # Examples
/// ```
/// use rustfs_obs::{log_info, log_error};
///
/// log_info("This is an informational message");
/// log_error("This is an error message");
/// ```
#[cfg(feature = "audit-kafka")]
pub use audit::KafkaAuditTarget;
#[cfg(feature = "audit-webhook")]
pub use audit::WebhookAuditTarget;
pub use audit::{AuditEntry, AuditLogger, AuditTarget, FileAuditTarget};
pub use logger::{log_debug, log_error, log_info};
pub use telemetry::Telemetry;
mod audit;
/// `obs` is a logging and observability library for Rust.
/// It provides a simple and easy-to-use interface for logging and observability.
/// It is built on top of the `log` crate and `opentelemetry` crate.
mod config;
mod entry;
mod logger;
mod sink;
mod telemetry;
mod utils;
mod worker;
#[cfg(test)]
mod tests {
use crate::{log_info, AuditEntry, AuditLogger, AuditTarget, FileAuditTarget, Telemetry};
use chrono::Utc;
use opentelemetry::global;
use opentelemetry::trace::{TraceContextExt, Tracer};
use std::time::{Duration, SystemTime};
use tracing::{instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub use config::load_config;
pub use config::{AppConfig, OtelConfig};
pub use entry::{LogEntry, SerializableLevel};
pub use logger::start_logger;
pub use logger::{LogError, Logger};
pub use sink::Sink;
pub use telemetry::init_telemetry;
pub use utils::{get_local_ip, get_local_ip_with_default};
pub use worker::start_worker;
#[instrument(fields(bucket, object, user))]
async fn put_object(audit_logger: &AuditLogger, bucket: String, object: String, user: String) {
let start_time = SystemTime::now();
log_info("Starting PUT operation");
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
// Record Metrics
let meter = global::meter("rustfs.rs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "put_object")],
);
// Gets the current span
let span = Span::current();
// Use 'OpenTelemetrySpanExt' to get 'SpanContext'
let span_context = span.context(); // Get context via OpenTelemetrySpanExt
let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId
// Audit events are logged
let audit_entry = AuditEntry {
version: "1.0".to_string(),
event_type: "s3_put_object".to_string(),
bucket,
object,
user,
time: Utc::now().to_rfc3339(),
user_agent: "rustfs.rs-client".to_string(),
span_id,
};
audit_logger.log(audit_entry).await;
log_info("PUT operation completed");
}
#[tokio::test]
// #[cfg(feature = "audit-webhook")]
// #[cfg(feature = "audit-kafka")]
async fn test_main() {
let telemetry = Telemetry::init();
// Initialize multiple audit objectives
let audit_targets: Vec<Box<dyn AuditTarget>> = vec![
Box::new(FileAuditTarget),
// Box::new(KafkaAuditTarget::new("localhost:9092", "minio-audit")),
// Box::new(WebhookAuditTarget::new("http://localhost:8080/audit")),
];
let audit_logger = AuditLogger::new(audit_targets);
// 创建根 Span 并执行操作
// let tracer = global::tracer("main");
// tracer.in_span("main_operation", |cx| {
// Span::current().set_parent(cx);
// log_info("Starting test async");
// tokio::runtime::Runtime::new().unwrap().block_on(async {
log_info("Starting test");
// Test the PUT operation
put_object(&audit_logger, "my-bucket".to_string(), "my-object.txt".to_string(), "user123".to_string()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
query_object(&audit_logger, "my-bucket".to_string(), "my-object.txt".to_string(), "user123".to_string()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 0..100 {
put_object(
&audit_logger,
format!("my-bucket-{}", i),
format!("my-object-{}", i),
"user123".to_string(),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
query_object(
&audit_logger,
format!("my-bucket-{}", i),
format!("my-object-{}", i),
"user123".to_string(),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Wait for the export to complete
tokio::time::sleep(Duration::from_secs(2)).await;
log_info("Test completed");
// });
// });
drop(telemetry); // Make sure to clean up
}
#[instrument(fields(bucket, object, user))]
async fn query_object(audit_logger: &AuditLogger, bucket: String, object: String, user: String) {
let start_time = SystemTime::now();
log_info("Starting query operation");
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
// Record Metrics
let meter = global::meter("rustfs.rs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "query_object")],
);
// Gets the current span
let span = Span::current();
// Use 'OpenTelemetrySpanExt' to get 'SpanContext'
let span_context = span.context(); // Get context via OpenTelemetrySpanExt
let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId
query_one(user.clone());
query_two(user.clone());
query_three(user.clone());
// Audit events are logged
let audit_entry = AuditEntry {
version: "1.0".to_string(),
event_type: "s3_query_object".to_string(),
bucket,
object,
user,
time: Utc::now().to_rfc3339(),
user_agent: "rustfs.rs-client".to_string(),
span_id,
};
audit_logger.log(audit_entry).await;
log_info("query operation completed");
}
#[instrument(fields(user))]
fn query_one(user: String) {
// 初始化 OpenTelemetry Tracer
let tracer = global::tracer("query_one");
tracer.in_span("doing_work", |cx| {
// Traced app logic here...
Span::current().set_parent(cx);
log_info("Doing work...");
let current_span = Span::current();
let span_context = current_span.context();
let trace_id = span_context.clone().span().span_context().trace_id().to_string();
let span_id = span_context.clone().span().span_context().span_id().to_string();
log_info(format!("trace_id: {}, span_id: {}", trace_id, span_id).as_str());
});
log_info(format!("Starting query_one operation user:{}", user).as_str());
}
#[instrument(fields(user))]
fn query_two(user: String) {
log_info(format!("Starting query_two operation user:{}", user).as_str());
}
#[instrument(fields(user))]
fn query_three(user: String) {
log_info(format!("Starting query_three operation user: {}", user).as_str());
}
/// Log module initialization function
///
/// Return to Logger and Clean Guard
pub fn init_logging(config: AppConfig) -> (Logger, telemetry::OtelGuard) {
let guard = init_telemetry(&config.observability);
let sinks = sink::create_sinks(&config);
let logger = start_logger(&config, sinks);
(logger, guard)
}

View File

@@ -1,46 +1,238 @@
use tracing::{debug, error, info};
use crate::{AppConfig, LogEntry, SerializableLevel, Sink};
use std::sync::Arc;
use tokio::sync::mpsc::{self, Receiver, Sender};
/// Log an info message
///
/// # Arguments
/// msg: &str - The message to log
///
/// # Example
/// ```
/// use rustfs_obs::log_info;
///
/// log_info("This is an info message");
/// ```
pub fn log_info(msg: &str) {
info!("{}", msg);
/// Server log processor
pub struct Logger {
sender: Sender<LogEntry>, // Log sending channel
queue_capacity: usize,
}
/// Log an error message
///
/// # Arguments
/// msg: &str - The message to log
///
/// # Example
/// ```
/// use rustfs_obs::log_error;
///
/// log_error("This is an error message");
/// ```
pub fn log_error(msg: &str) {
error!("{}", msg);
impl Logger {
/// Create a new Logger instance
/// Returns Logger and corresponding Receiver
pub fn new(config: &AppConfig) -> (Self, Receiver<LogEntry>) {
// Get queue capacity from configuration, or use default values 10000
let queue_capacity = config.logger.queue_capacity.unwrap_or(10000);
let (sender, receiver) = mpsc::channel(queue_capacity);
(
Logger {
sender,
queue_capacity,
},
receiver,
)
}
// Add a method to get queue capacity
pub fn queue_capacity(&self) -> usize {
self.queue_capacity
}
/// Asynchronous logging of server logs
/// Attach the log to the current Span and generate a separate Tracing Event
#[tracing::instrument(skip(self), fields(log_source = "logger"))]
pub async fn log(&self, entry: LogEntry) -> Result<(), LogError> {
// Log messages to the current Span
tracing::Span::current()
.record("log_message", &entry.message)
.record("source", &entry.source);
// Record queue utilization (if a certain threshold is exceeded)
let queue_len = self.sender.capacity();
let utilization = queue_len as f64 / self.queue_capacity as f64;
if utilization > 0.8 {
tracing::warn!("Log queue utilization high: {:.1}%", utilization * 100.0);
}
// Generate independent Tracing Events with full LogEntry information
// Generate corresponding events according to level
match entry.level {
SerializableLevel(tracing::Level::ERROR) => {
tracing::error!(
target: "server_logs",
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
SerializableLevel(tracing::Level::WARN) => {
tracing::warn!(
target: "server_logs",
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
SerializableLevel(tracing::Level::INFO) => {
tracing::info!(
target: "server_logs",
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
SerializableLevel(tracing::Level::DEBUG) => {
tracing::debug!(
target: "server_logs",
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
SerializableLevel(tracing::Level::TRACE) => {
tracing::trace!(
target: "server_logs",
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
}
// Send logs to asynchronous queues to improve error handling
match self.sender.try_send(entry) {
Ok(_) => Ok(()),
Err(mpsc::error::TrySendError::Full(entry)) => {
// Processing strategy when queue is full
tracing::warn!("Log queue full, applying backpressure");
match tokio::time::timeout(
std::time::Duration::from_millis(500),
self.sender.send(entry),
)
.await
{
Ok(Ok(_)) => Ok(()),
Ok(Err(_)) => Err(LogError::SendFailed("Channel closed")),
Err(_) => Err(LogError::Timeout("Queue backpressure timeout")),
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Err(LogError::SendFailed("Logger channel closed"))
}
}
}
// Add convenient methods to simplify logging
// Fix the info() method, replacing None with an empty vector instead of the Option type
pub async fn info(&self, message: &str, source: &str) -> Result<(), LogError> {
self.log(LogEntry::new(
tracing::Level::INFO,
message.to_string(),
source.to_string(),
None,
None,
Vec::new(), // 使用空向量代替 None
))
.await
}
/// Add warn() method
pub async fn error(&self, message: &str, source: &str) -> Result<(), LogError> {
self.log(LogEntry::new(
tracing::Level::ERROR,
message.to_string(),
source.to_string(),
None,
None,
Vec::new(),
))
.await
}
/// Add warn() method
pub async fn warn(&self, message: &str, source: &str) -> Result<(), LogError> {
self.log(LogEntry::new(
tracing::Level::WARN,
message.to_string(),
source.to_string(),
None,
None,
Vec::new(),
))
.await
}
/// Add debug() method
pub async fn debug(&self, message: &str, source: &str) -> Result<(), LogError> {
self.log(LogEntry::new(
tracing::Level::DEBUG,
message.to_string(),
source.to_string(),
None,
None,
Vec::new(),
))
.await
}
/// Add trace() method
pub async fn trace(&self, message: &str, source: &str) -> Result<(), LogError> {
self.log(LogEntry::new(
tracing::Level::TRACE,
message.to_string(),
source.to_string(),
None,
None,
Vec::new(),
))
.await
}
// Add extension methods with context information for more flexibility
pub async fn info_with_context(
&self,
message: &str,
source: &str,
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
) -> Result<(), LogError> {
self.log(LogEntry::new(
tracing::Level::INFO,
message.to_string(),
source.to_string(),
request_id,
user_id,
fields,
))
.await
}
// Add elegant closing method
pub async fn shutdown(self) -> Result<(), LogError> {
drop(self.sender); //Close the sending end so that the receiver knows that there is no new message
Ok(())
}
}
/// Log a debug message
///
/// # Arguments
/// msg: &str - The message to log
///
/// # Example
/// ```
/// use rustfs_obs::log_debug;
///
/// log_debug("This is a debug message");
/// ```
pub fn log_debug(msg: &str) {
debug!("{}", msg);
// Define custom error type
#[derive(Debug, thiserror::Error)]
pub enum LogError {
#[error("Failed to send log: {0}")]
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
}
/// Start the log module
pub fn start_logger(config: &AppConfig, sinks: Vec<Arc<dyn Sink>>) -> Logger {
let (logger, receiver) = Logger::new(config);
tokio::spawn(crate::worker::start_worker(receiver, sinks));
logger
}

450
packages/obs/src/sink.rs Normal file
View File

@@ -0,0 +1,450 @@
use crate::{AppConfig, LogEntry};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::AsyncWriteExt;
/// Sink Trait definition, asynchronously write logs
#[async_trait]
pub trait Sink: Send + Sync {
async fn write(&self, entry: &LogEntry);
}
#[cfg(feature = "kafka")]
/// Kafka Sink Implementation
pub struct KafkaSink {
producer: rdkafka::producer::FutureProducer,
topic: String,
batch_size: usize,
batch_timeout_ms: u64,
entries: Arc<tokio::sync::Mutex<Vec<LogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
}
#[cfg(feature = "kafka")]
impl KafkaSink {
/// Create a new KafkaSink instance
pub fn new(producer: rdkafka::producer::FutureProducer, topic: String, batch_size: usize, batch_timeout_ms: u64) -> Self {
// Create Arc-wrapped values first
let entries = Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(batch_size)));
let last_flush = Arc::new(std::sync::atomic::AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
));
let sink = KafkaSink {
producer: producer.clone(),
topic: topic.clone(),
batch_size,
batch_timeout_ms,
entries: entries.clone(),
last_flush: last_flush.clone(),
};
// Start background flusher
tokio::spawn(Self::periodic_flush(producer, topic, entries, last_flush, batch_timeout_ms));
sink
}
/// Add a getter method to read the batch_timeout_ms field
#[allow(dead_code)]
pub fn batch_timeout(&self) -> u64 {
self.batch_timeout_ms
}
/// Add a method to dynamically adjust the timeout if needed
#[allow(dead_code)]
pub fn set_batch_timeout(&mut self, new_timeout_ms: u64) {
self.batch_timeout_ms = new_timeout_ms;
}
async fn periodic_flush(
producer: rdkafka::producer::FutureProducer,
topic: String,
entries: Arc<tokio::sync::Mutex<Vec<LogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
timeout_ms: u64,
) {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms / 2)).await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = last_flush.load(std::sync::atomic::Ordering::Relaxed);
if now - last >= timeout_ms {
let mut batch = entries.lock().await;
if !batch.is_empty() {
Self::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec<LogEntry>) {
for entry in entries {
let payload = match serde_json::to_string(&entry) {
Ok(p) => p,
Err(e) => {
eprintln!("Failed to serialize log entry: {}", e);
continue;
}
};
let span_id = entry.timestamp.to_rfc3339();
let _ = producer
.send(
rdkafka::producer::FutureRecord::to(topic).payload(&payload).key(&span_id),
std::time::Duration::from_secs(5),
)
.await;
}
}
}
#[cfg(feature = "kafka")]
#[async_trait]
impl Sink for KafkaSink {
async fn write(&self, entry: &LogEntry) {
let mut batch = self.entries.lock().await;
batch.push(entry.clone());
let should_flush_by_size = batch.len() >= self.batch_size;
let should_flush_by_time = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.batch_timeout_ms
};
if should_flush_by_size || should_flush_by_time {
// Existing flush logic
let entries_to_send: Vec<LogEntry> = batch.drain(..).collect();
let producer = self.producer.clone();
let topic = self.topic.clone();
self.last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
tokio::spawn(async move {
KafkaSink::send_batch(&producer, &topic, entries_to_send).await;
});
}
}
}
#[cfg(feature = "kafka")]
impl Drop for KafkaSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to flush any remaining entries
let producer = self.producer.clone();
let topic = self.topic.clone();
let entries = self.entries.clone();
let last_flush = self.last_flush.clone();
tokio::spawn(async move {
let mut batch = entries.lock().await;
if !batch.is_empty() {
KafkaSink::send_batch(&producer, &topic, batch.drain(..).collect()).await;
last_flush.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
}
});
eprintln!("Dropping KafkaSink with topic: {}", self.topic);
}
}
#[cfg(feature = "webhook")]
/// Webhook Sink Implementation
pub struct WebhookSink {
url: String,
client: reqwest::Client,
max_retries: usize,
retry_delay_ms: u64,
}
#[cfg(feature = "webhook")]
impl WebhookSink {
pub fn new(url: String, max_retries: usize, retry_delay_ms: u64) -> Self {
WebhookSink {
url,
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_else(|_| reqwest::Client::new()),
max_retries,
retry_delay_ms,
}
}
}
#[cfg(feature = "webhook")]
#[async_trait]
impl Sink for WebhookSink {
async fn write(&self, entry: &LogEntry) {
let mut retries = 0;
let url = self.url.clone();
let entry_clone = entry.clone();
while retries < self.max_retries {
match self.client.post(&url).json(&entry_clone).send().await {
Ok(response) if response.status().is_success() => {
return;
}
_ => {
retries += 1;
if retries < self.max_retries {
tokio::time::sleep(tokio::time::Duration::from_millis(
self.retry_delay_ms * (1 << retries), // Exponential backoff
))
.await;
}
}
}
}
eprintln!("Failed to send log to webhook after {} retries", self.max_retries);
}
}
#[cfg(feature = "webhook")]
impl Drop for WebhookSink {
fn drop(&mut self) {
// Perform any necessary cleanup here
// For example, you might want to log that the sink is being dropped
eprintln!("Dropping WebhookSink with URL: {}", self.url);
}
}
#[cfg(feature = "file")]
/// File Sink Implementation
pub struct FileSink {
path: String,
buffer_size: usize,
writer: Arc<tokio::sync::Mutex<io::BufWriter<tokio::fs::File>>>,
entry_count: std::sync::atomic::AtomicUsize,
last_flush: std::sync::atomic::AtomicU64,
flush_interval_ms: u64, // Time between flushes
flush_threshold: usize, // Number of entries before flush
}
#[cfg(feature = "file")]
impl FileSink {
#[allow(dead_code)]
pub async fn new(
path: String,
buffer_size: usize,
flush_interval_ms: u64,
flush_threshold: usize,
) -> Result<Self, std::io::Error> {
let file = OpenOptions::new().append(true).create(true).open(&path).await?;
let writer = tokio::io::BufWriter::with_capacity(buffer_size, file);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(FileSink {
path,
buffer_size,
writer: Arc::new(tokio::sync::Mutex::new(writer)),
entry_count: std::sync::atomic::AtomicUsize::new(0),
last_flush: std::sync::atomic::AtomicU64::new(now),
flush_interval_ms,
flush_threshold,
})
}
#[allow(dead_code)]
async fn initialize_writer(&mut self) -> io::Result<()> {
let file = tokio::fs::File::create(&self.path).await?;
// Use buffer_size to create a buffer writer with a specified capacity
let buf_writer = io::BufWriter::with_capacity(self.buffer_size, file);
// Replace the original writer with the new Mutex
self.writer = Arc::new(tokio::sync::Mutex::new(buf_writer));
Ok(())
}
// Get the current buffer size
#[allow(dead_code)]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
// How to dynamically adjust the buffer size
#[allow(dead_code)]
pub async fn set_buffer_size(&mut self, new_size: usize) -> io::Result<()> {
if self.buffer_size != new_size {
self.buffer_size = new_size;
// Reinitialize the writer directly, without checking is_some()
self.initialize_writer().await?;
}
Ok(())
}
// Check if flushing is needed based on count or time
fn should_flush(&self) -> bool {
// Check entry count threshold
if self.entry_count.load(std::sync::atomic::Ordering::Relaxed) >= self.flush_threshold {
return true;
}
// Check time threshold
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed);
now - last >= self.flush_interval_ms
}
}
#[cfg(feature = "file")]
#[async_trait]
impl Sink for FileSink {
async fn write(&self, entry: &LogEntry) {
let line = format!("{:?}\n", entry);
let mut writer = self.writer.lock().await;
if let Err(e) = writer.write_all(line.as_bytes()).await {
eprintln!("Failed to write log to file {}: {}", self.path, e);
return;
}
// Only flush periodically to improve performance
// Logic to determine when to flush could be added here
// Increment the entry count
self.entry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Check if we should flush
if self.should_flush() {
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush log file {}: {}", self.path, e);
return;
}
// Reset counters
self.entry_count.store(0, std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.last_flush.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
}
#[cfg(feature = "file")]
impl Drop for FileSink {
fn drop(&mut self) {
let writer = self.writer.clone();
let path = self.path.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut writer = writer.lock().await;
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush log file {}: {}", path, e);
}
});
});
}
}
/// Create a list of Sink instances
pub fn create_sinks(config: &AppConfig) -> Vec<Arc<dyn Sink>> {
let mut sinks: Vec<Arc<dyn Sink>> = Vec::new();
#[cfg(feature = "kafka")]
if config.sinks.kafka.enabled {
match rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &config.sinks.kafka.bootstrap_servers)
.set("message.timeout.ms", "5000")
.create()
{
Ok(producer) => {
sinks.push(Arc::new(KafkaSink::new(
producer,
config.sinks.kafka.topic.clone(),
config.sinks.kafka.batch_size.unwrap_or(100),
config.sinks.kafka.batch_timeout_ms.unwrap_or(1000),
)));
}
Err(e) => eprintln!("Failed to create Kafka producer: {}", e),
}
}
#[cfg(feature = "webhook")]
if config.sinks.webhook.enabled {
sinks.push(Arc::new(WebhookSink::new(
config.sinks.webhook.url.clone(),
config.sinks.webhook.max_retries.unwrap_or(3),
config.sinks.webhook.retry_delay_ms.unwrap_or(100),
)));
}
#[cfg(feature = "file")]
{
let path = if config.sinks.file.enabled {
config.sinks.file.path.clone()
} else {
"default.log".to_string()
};
// Use synchronous file operations
let file_result = std::fs::OpenOptions::new().append(true).create(true).open(&path);
match file_result {
Ok(file) => {
let buffer_size = config.sinks.file.buffer_size.unwrap_or(8192);
let writer = tokio::io::BufWriter::with_capacity(buffer_size, tokio::fs::File::from_std(file));
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
sinks.push(Arc::new(FileSink {
path: path.clone(),
buffer_size,
writer: Arc::new(tokio::sync::Mutex::new(writer)),
entry_count: std::sync::atomic::AtomicUsize::new(0),
last_flush: std::sync::atomic::AtomicU64::new(now),
flush_interval_ms: config.sinks.file.flush_interval_ms.unwrap_or(1000),
flush_threshold: config.sinks.file.flush_threshold.unwrap_or(100),
}));
}
Err(e) => eprintln!("Failed to create file sink: {}", e),
}
}
sinks
}

View File

@@ -1,149 +1,222 @@
use crate::{get_local_ip_with_default, OtelConfig};
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer;
use opentelemetry_otlp::{self, WithExportConfig};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::attribute::NETWORK_LOCAL_ADDRESS;
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use std::time::Duration;
use tracing::{info, Level};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
/// Telemetry is a wrapper around the OpenTelemetry SDK Tracer and Meter providers.
/// It initializes the global Tracer and Meter providers, and sets up the tracing subscriber.
/// The Tracer and Meter providers are shut down when the Telemetry instance is dropped.
/// This is a convenience struct to ensure that the global providers are properly initialized and shut down.
/// A guard object that manages the lifecycle of OpenTelemetry components.
///
/// This struct holds references to the created OpenTelemetry providers and ensures
/// they are properly shut down when the guard is dropped. It implements the RAII
/// (Resource Acquisition Is Initialization) pattern for managing telemetry resources.
///
/// When this guard goes out of scope, it will automatically shut down:
/// - The tracer provider (for distributed tracing)
/// - The meter provider (for metrics collection)
/// - The logger provider (for structured logging)
///
/// # Example
/// ```
/// use rustfs_obs::Telemetry;
///
/// let _telemetry = Telemetry::init();
/// ```
pub struct Telemetry {
/// use rustfs_obs::{init_telemetry, OtelConfig};
///
/// let config = OtelConfig::default();
/// let otel_guard = init_telemetry(&config);
///
/// // The guard is kept alive for the duration of the application
/// // When it's dropped, all telemetry components are properly shut down
/// drop(otel_guard);
/// ```
pub struct OtelGuard {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
logger_provider: SdkLoggerProvider,
}
impl Telemetry {
pub fn init() -> Self {
// Define service resource information
let resource = Resource::builder()
.with_service_name("rustfs-service")
.with_schema_url(
[
KeyValue::new(SERVICE_NAME, "rustfs-service"),
KeyValue::new(SERVICE_VERSION, "0.1.0"),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
KeyValue::new(NETWORK_LOCAL_ADDRESS, "127.0.0.1"),
],
SCHEMA_URL,
)
.build();
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("Tracer shutdown error: {:?}", err);
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("Meter shutdown error: {:?}", err);
}
if let Err(err) = self.logger_provider.shutdown() {
eprintln!("Logger shutdown error: {:?}", err);
}
}
}
let tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
Resource::builder()
.with_service_name(config.service_name.clone())
.with_schema_url(
[
KeyValue::new(SERVICE_NAME, config.service_name.clone()),
KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.deployment_environment.clone()),
KeyValue::new(NETWORK_LOCAL_ADDRESS, get_local_ip_with_default()),
],
SCHEMA_URL,
)
.build()
}
/// Initialize Meter Provider
fn init_meter_provider(config: &OtelConfig) -> SdkMeterProvider {
let mut builder = MeterProviderBuilder::default().with_resource(resource(config));
// If endpoint is empty, use stdout output
if config.endpoint.is_empty() {
builder = builder.with_reader(
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
.with_interval(std::time::Duration::from_secs(config.meter_interval))
.build(),
);
} else {
// If endpoint is not empty, use otlp output
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(Duration::from_secs(3))
.build()
.unwrap();
// Configure Tracer Provider
let tracer_provider = SdkTracerProvider::builder()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource.clone())
.with_batch_exporter(tracer_exporter)
// .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build();
let meter_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(Duration::from_secs(3))
.with_endpoint(&config.endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(config.meter_interval))
.build(),
);
// If use_stdout is true, output to stdout at the same time
if config.use_stdout {
builder = builder.with_reader(
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
.with_interval(std::time::Duration::from_secs(config.meter_interval))
.build(),
);
}
}
let meter_reader = PeriodicReader::builder(meter_exporter)
.with_interval(Duration::from_secs(30))
.build();
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
}
// For debugging in development
// let meter_stdout_reader = PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
/// Initialize Tracer Provider
fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider {
let sampler = if config.sample_ratio > 0.0 && config.sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(config.sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource(config));
// Configure Meter Provider
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource.clone())
.with_reader(meter_reader)
// .with_reader(meter_stdout_reader)
.build();
let tracer_provider = if config.endpoint.is_empty() {
builder
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.build()
.unwrap();
if config.use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
}
.build()
};
// Set global Tracer and Meter providers
global::set_tracer_provider(tracer_provider.clone());
global::set_meter_provider(meter_provider.clone());
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
}
let tracer = tracer_provider.tracer("rustfs-service");
/// Initialize Telemetry
pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let tracer_provider = init_tracer_provider(config);
let meter_provider = init_meter_provider(config);
let tracer = tracer_provider.tracer(config.service_name.clone());
// // let _stdout_exporter = opentelemetry_stdout::LogExporter::default();
// let otlp_exporter = opentelemetry_otlp::LogExporter::builder()
// .with_tonic()
// .with_endpoint("http://localhost:4317")
// // .with_timeout(Duration::from_secs(3))
// // .with_protocol(opentelemetry_otlp::Protocol::Grpc)
// .build()
// .unwrap();
// let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
// .with_resource(resource.clone())
// .with_simple_exporter(otlp_exporter)
// .build();
// let filter_otel = EnvFilter::new("debug")
// // .add_directive("hyper=off".parse().unwrap())
// // .add_directive("opentelemetry=off".parse().unwrap())
// // .add_directive("tonic=off".parse().unwrap())
// // .add_directive("h2=off".parse().unwrap())
// .add_directive("reqwest=off".parse().unwrap());
// let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);
let logger_provider = if config.endpoint.is_empty() {
SdkLoggerProvider::builder()
.with_resource(resource(config))
.with_simple_exporter(opentelemetry_stdout::LogExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.build()
.unwrap();
SdkLoggerProvider::builder()
.with_resource(resource(config))
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::LogExporter::default())
.build()
};
let otel_layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
// For the OpenTelemetry layer, add a tracing filter to filter events from
// OpenTelemetry and its dependent crates (opentelemetry-otlp uses crates
// like reqwest/tonic etc.) from being sent back to OTel itself, thus
// preventing infinite telemetry generation. The filter levels are set as
// follows:
// - Allow `info` level and above by default.
// - Restrict `opentelemetry`, `hyper`, `tonic`, and `reqwest` completely.
// Note: This will also drop events from crates like `tonic` etc. even when
// they are used outside the OTLP Exporter. For more details, see:
// https://github.com/open-telemetry/opentelemetry-rust/issues/761
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
let otel_layer = otel_layer.with_filter(filter_otel);
let registry = tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::INFO)
.with(OpenTelemetryLayer::new(tracer))
.with(MetricsLayer::new(meter_provider.clone()))
.with(otel_layer);
if config.endpoint.is_empty() {
// Create a new tracing::Fmt layer to print the logs to stdout. It has a
// default filter of `info` level and above, and `debug` and above for logs
// from OpenTelemetry crates. The filter levels can be customized as needed.
let filter_fmt = EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap());
let fmt_layer = tracing_subscriber::fmt::layer()
.with_thread_names(true)
.with_filter(filter_fmt);
// Configure `tracing subscriber`
tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::from_level(Level::DEBUG))
registry
.with(tracing_subscriber::fmt::layer().with_ansi(true))
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
// .with(otel_layer)
.with(fmt_layer)
.init();
info!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
Self {
tracer_provider,
meter_provider,
}
} else {
registry.with(tracing_subscriber::fmt::layer().with_ansi(false)).init();
println!("Logs and meter,tracer enabled");
}
}
impl Drop for Telemetry {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
OtelGuard {
tracer_provider,
meter_provider,
logger_provider,
}
}

42
packages/obs/src/utils.rs Normal file
View File

@@ -0,0 +1,42 @@
use local_ip_address::{local_ip, local_ipv6};
use std::net::{IpAddr, Ipv4Addr};
/// Get the IP address of the machine
///
/// Priority is given to trying to get the IPv4 address, and if it fails, try to get the IPv6 address.
/// If both fail to retrieve, None is returned.
///
/// # Returns
///
/// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6)
/// * `None` - Unable to obtain any native IP address
pub fn get_local_ip() -> Option<IpAddr> {
local_ip().ok().or_else(|| local_ipv6().ok())
}
/// Get the IP address of the machine as a string
///
/// If the IP address cannot be obtained, returns "127.0.0.1" as the default value.
///
/// # Returns
///
/// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value
pub fn get_local_ip_with_default() -> String {
get_local_ip()
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_local_ip() {
match get_local_ip() {
Some(ip) => println!("the ip address of this machine:{}", ip),
None => println!("Unable to obtain the IP address of the machine"),
}
assert!(get_local_ip().is_some());
}
}

View File

@@ -0,0 +1,13 @@
use crate::{entry::LogEntry, sink::Sink};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
/// Start the log processing worker thread
pub async fn start_worker(receiver: Receiver<LogEntry>, sinks: Vec<Arc<dyn Sink>>) {
let mut receiver = receiver;
while let Some(entry) = receiver.recv().await {
for sink in &sinks {
sink.write(&entry).await;
}
}
}