feat(obs): implement global OpenTelemetry guard management

This commit is contained in:
houseme
2025-05-12 13:32:18 +08:00
parent dd7da015e3
commit 571cedf4ce
14 changed files with 44 additions and 88 deletions

3
.gitignore vendored
View File

@@ -13,4 +13,5 @@ deploy/config/obs.toml
*.log
deploy/certs/*
*jsonl
.env
.env
.rustfs.sys

1
Cargo.lock generated
View File

@@ -7402,6 +7402,7 @@ dependencies = [
"rdkafka",
"reqwest",
"rustfs-config",
"rustfs-utils",
"serde",
"serde_json",
"smallvec",

View File

@@ -16,22 +16,22 @@ pub const DEFAULT_LOG_LEVEL: &str = "info";
/// Default configuration use stdout
/// Default value: true
pub(crate) const USE_STDOUT: bool = true;
pub const USE_STDOUT: bool = true;
/// Default configuration sample ratio
/// Default value: 1.0
pub(crate) const SAMPLE_RATIO: f64 = 1.0;
pub const SAMPLE_RATIO: f64 = 1.0;
/// Default configuration meter interval
/// Default value: 30
pub(crate) const METER_INTERVAL: u64 = 30;
pub const METER_INTERVAL: u64 = 30;
/// Default configuration service version
/// Default value: 0.0.1
pub(crate) const SERVICE_VERSION: &str = "0.0.1";
pub const SERVICE_VERSION: &str = "0.0.1";
/// Default configuration environment
/// Default value: production
pub(crate) const ENVIRONMENT: &str = "production";
pub const ENVIRONMENT: &str = "production";
/// maximum number of connections
/// This is the maximum number of connections that the server will accept.
@@ -63,7 +63,7 @@ pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Example: RUSTFS_OBS_CONFIG=config/obs.toml
/// Example: --obs-config config/obs.toml
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
pub const DEFAULT_OBS_CONFIG: &str = "./deploy/config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.

View File

@@ -29,6 +29,7 @@ opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic"] }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
rustfs-utils = { workspace = true, features = ["ip"] }
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }
tracing = { workspace = true, features = ["std", "attributes"] }

View File

@@ -1,5 +1,5 @@
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT};
use config::{Config, File, FileFormat};
use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT};
use serde::{Deserialize, Serialize};
use std::env;
@@ -43,7 +43,7 @@ fn extract_otel_config_from_env() -> OtelConfig {
service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(SERVICE_NAME.to_string())),
.or(Some(APP_NAME.to_string())),
service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION")
.ok()
.and_then(|v| v.parse().ok())
@@ -55,7 +55,7 @@ fn extract_otel_config_from_env() -> OtelConfig {
logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL")
.ok()
.and_then(|v| v.parse().ok())
.or(Some(LOGGER_LEVEL.to_string())),
.or(Some(DEFAULT_LOG_LEVEL.to_string())),
local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED")
.ok()
.and_then(|v| v.parse().ok())
@@ -91,11 +91,11 @@ pub struct KafkaSinkConfig {
impl KafkaSinkConfig {
pub fn new() -> Self {
Self {
brokers: env::var("RUSTFS__SINKS_0_KAFKA_BROKERS")
brokers: env::var("RUSTFS_SINKS_KAFKA_BROKERS")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "localhost:9092".to_string()),
topic: env::var("RUSTFS__SINKS_0_KAFKA_TOPIC")
topic: env::var("RUSTFS_SINKS_KAFKA_TOPIC")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default_topic".to_string()),
@@ -123,11 +123,11 @@ pub struct WebhookSinkConfig {
impl WebhookSinkConfig {
pub fn new() -> Self {
Self {
endpoint: env::var("RUSTFS__SINKS_0_WEBHOOK_ENDPOINT")
endpoint: env::var("RUSTFS_SINKS_WEBHOOK_ENDPOINT")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "http://localhost:8080".to_string()),
auth_token: env::var("RUSTFS__SINKS_0_WEBHOOK_AUTH_TOKEN")
auth_token: env::var("RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default_token".to_string()),
@@ -160,7 +160,7 @@ impl FileSinkConfig {
eprintln!("Failed to create log directory: {}", e);
return "rustfs/rustfs.log".to_string();
}
println!("Using log directory: {:?}", temp_dir);
temp_dir
.join("rustfs.log")
.to_str()
@@ -169,7 +169,7 @@ impl FileSinkConfig {
}
pub fn new() -> Self {
Self {
path: env::var("RUSTFS__SINKS_0_FILE_PATH")
path: env::var("RUSTFS_SINKS_FILE_PATH")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(Self::get_default_log_path),

View File

@@ -3,14 +3,6 @@ use std::sync::{Arc, Mutex};
use tokio::sync::{OnceCell, SetError};
use tracing::{error, info};
pub(crate) const USE_STDOUT: bool = true;
pub(crate) const SERVICE_NAME: &str = "RustFS";
pub(crate) const SAMPLE_RATIO: f64 = 1.0;
pub(crate) const METER_INTERVAL: u64 = 60;
pub(crate) const SERVICE_VERSION: &str = "0.1.0";
pub(crate) const ENVIRONMENT: &str = "production";
pub(crate) const LOGGER_LEVEL: &str = "info";
/// Global guard for OpenTelemetry tracing
static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();

View File

@@ -35,7 +35,6 @@ mod logger;
mod sinks;
mod system;
mod telemetry;
mod utils;
mod worker;
use crate::logger::InitLogStatus;

View File

@@ -1,6 +1,6 @@
use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION};
use crate::sinks::Sink;
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry};
use rustfs_config::{APP_NAME, ENVIRONMENT, SERVICE_VERSION};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -428,7 +428,7 @@ impl Default for InitLogStatus {
fn default() -> Self {
Self {
timestamp: SystemTime::now(),
service_name: String::from(SERVICE_NAME),
service_name: String::from(APP_NAME),
version: SERVICE_VERSION.to_string(),
environment: ENVIRONMENT.to_string(),
}
@@ -442,7 +442,7 @@ impl InitLogStatus {
let version = config.service_version.unwrap_or(SERVICE_VERSION.to_string());
Self {
timestamp: SystemTime::now(),
service_name: String::from(SERVICE_NAME),
service_name: String::from(APP_NAME),
version,
environment,
}

View File

@@ -1,5 +1,3 @@
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT};
use crate::utils::get_local_ip_with_default;
use crate::OtelConfig;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
@@ -15,6 +13,8 @@ use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
};
use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT};
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::io::IsTerminal;
@@ -70,7 +70,7 @@ impl Drop for OtelGuard {
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
Resource::builder()
.with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(SERVICE_NAME)).to_string())
.with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(APP_NAME)).to_string())
.with_schema_url(
[
KeyValue::new(
@@ -101,8 +101,8 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let endpoint = &config.endpoint;
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(LOGGER_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(SERVICE_NAME);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
// Pre-create resource objects to avoid repeated construction
let res = resource(config);

View File

@@ -1,42 +0,0 @@
use local_ip_address::{local_ip, local_ipv6};
use std::net::{IpAddr, Ipv4Addr};
/// Get the IP address of the machine
///
/// Priority is given to trying to get the IPv4 address, and if it fails, try to get the IPv6 address.
/// If both fail to retrieve, None is returned.
///
/// # Returns
///
/// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6)
/// * `None` - Unable to obtain any native IP address
pub fn get_local_ip() -> Option<IpAddr> {
local_ip().ok().or_else(|| local_ipv6().ok())
}
/// Get the IP address of the machine as a string
///
/// If the IP address cannot be obtained, returns "127.0.0.1" as the default value.
///
/// # Returns
///
/// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value
pub fn get_local_ip_with_default() -> String {
get_local_ip()
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_local_ip() {
match get_local_ip() {
Some(ip) => println!("the ip address of this machine:{}", ip),
None => println!("Unable to obtain the IP address of the machine"),
}
assert!(get_local_ip().is_some());
}
}

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
/// Start the log processing worker thread
pub async fn start_worker(receiver: Receiver<UnifiedLogEntry>, sinks: Vec<Arc<dyn Sink>>) {
pub(crate) async fn start_worker(receiver: Receiver<UnifiedLogEntry>, sinks: Vec<Arc<dyn Sink>>) {
let mut receiver = receiver;
while let Some(entry) = receiver.recv().await {
for sink in &sinks {

View File

@@ -7,12 +7,19 @@ rust-version.workspace = true
version.workspace = true
[dependencies]
local-ip-address = { workspace = true }
local-ip-address = { workspace = true, optional = true }
rustfs-config = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
rustls-pki-types = { workspace = true }
rustls = { workspace = true, optional = true }
rustls-pemfile = { workspace = true, optional = true }
rustls-pki-types = { workspace = true, optional = true }
tracing = { workspace = true }
[lints]
workspace = true
[features]
default = ["ip"] # features that are enabled by default
ip = ["dep:local-ip-address"] # ip characteristics and their dependencies
tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies
net = ["ip"] # empty network features
full = ["ip", "tls", "net"] # all features

View File

@@ -2,10 +2,7 @@ mod certs;
mod ip;
mod net;
pub use certs::certs_error;
pub use certs::create_multi_cert_resolver;
pub use certs::load_all_certs_from_directory;
pub use certs::load_certs;
pub use certs::load_private_key;
pub use ip::get_local_ip;
pub use ip::get_local_ip_with_default;
#[cfg(feature = "ip")]
pub use certs::*;
#[cfg(feature = "ip")]
pub use ip::*;

View File

@@ -55,7 +55,7 @@ rmp-serde.workspace = true
rustfs-config = { workspace = true }
rustfs-event-notifier = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustls.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true