From 571cedf4cec44ae60892a359df004f0bdaf5cd51 Mon Sep 17 00:00:00 2001 From: houseme Date: Mon, 12 May 2025 13:32:18 +0800 Subject: [PATCH] feat(obs): implement global OpenTelemetry guard management --- .gitignore | 3 ++- Cargo.lock | 1 + crates/config/src/constants/app.rs | 12 ++++----- crates/obs/Cargo.toml | 1 + crates/obs/src/config.rs | 18 ++++++------- crates/obs/src/global.rs | 8 ------ crates/obs/src/lib.rs | 1 - crates/obs/src/logger.rs | 6 ++--- crates/obs/src/telemetry.rs | 10 +++---- crates/obs/src/utils.rs | 42 ------------------------------ crates/obs/src/worker.rs | 2 +- crates/utils/Cargo.toml | 15 ++++++++--- crates/utils/src/lib.rs | 11 +++----- rustfs/Cargo.toml | 2 +- 14 files changed, 44 insertions(+), 88 deletions(-) delete mode 100644 crates/obs/src/utils.rs diff --git a/.gitignore b/.gitignore index 7ccca205..0a1501ce 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ deploy/config/obs.toml *.log deploy/certs/* *jsonl -.env \ No newline at end of file +.env +.rustfs.sys \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 01ab7a99..fa09fb75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7402,6 +7402,7 @@ dependencies = [ "rdkafka", "reqwest", "rustfs-config", + "rustfs-utils", "serde", "serde_json", "smallvec", diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index 467905f7..c5ad125a 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -16,22 +16,22 @@ pub const DEFAULT_LOG_LEVEL: &str = "info"; /// Default configuration use stdout /// Default value: true -pub(crate) const USE_STDOUT: bool = true; +pub const USE_STDOUT: bool = true; /// Default configuration sample ratio /// Default value: 1.0 -pub(crate) const SAMPLE_RATIO: f64 = 1.0; +pub const SAMPLE_RATIO: f64 = 1.0; /// Default configuration meter interval /// Default value: 30 -pub(crate) const METER_INTERVAL: u64 = 30; +pub const METER_INTERVAL: u64 = 30; /// Default configuration service version /// Default value: 0.0.1 -pub(crate) const SERVICE_VERSION: &str = "0.0.1"; +pub const SERVICE_VERSION: &str = "0.0.1"; /// Default configuration environment /// Default value: production -pub(crate) const ENVIRONMENT: &str = "production"; +pub const ENVIRONMENT: &str = "production"; /// maximum number of connections /// This is the maximum number of connections that the server will accept. @@ -63,7 +63,7 @@ pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; /// Example: RUSTFS_OBS_CONFIG=config/obs.toml /// Example: --obs-config config/obs.toml /// Example: --obs-config /etc/rustfs/obs.toml -pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml"; +pub const DEFAULT_OBS_CONFIG: &str = "./deploy/config/obs.toml"; /// Default TLS key for rustfs /// This is the default key for TLS. diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 65968d51..12c9831e 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -29,6 +29,7 @@ opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-stdout = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic"] } opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } +rustfs-utils = { workspace = true, features = ["ip"] } serde = { workspace = true } smallvec = { workspace = true, features = ["serde"] } tracing = { workspace = true, features = ["std", "attributes"] } diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 98929470..770203de 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -1,5 +1,5 @@ -use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT}; use config::{Config, File, FileFormat}; +use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; use serde::{Deserialize, Serialize}; use std::env; @@ -43,7 +43,7 @@ fn extract_otel_config_from_env() -> OtelConfig { service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME") .ok() .and_then(|v| v.parse().ok()) - .or(Some(SERVICE_NAME.to_string())), + .or(Some(APP_NAME.to_string())), service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION") .ok() .and_then(|v| v.parse().ok()) @@ -55,7 +55,7 @@ fn extract_otel_config_from_env() -> OtelConfig { logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL") .ok() .and_then(|v| v.parse().ok()) - .or(Some(LOGGER_LEVEL.to_string())), + .or(Some(DEFAULT_LOG_LEVEL.to_string())), local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED") .ok() .and_then(|v| v.parse().ok()) @@ -91,11 +91,11 @@ pub struct KafkaSinkConfig { impl KafkaSinkConfig { pub fn new() -> Self { Self { - brokers: env::var("RUSTFS__SINKS_0_KAFKA_BROKERS") + brokers: env::var("RUSTFS_SINKS_KAFKA_BROKERS") .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(|| "localhost:9092".to_string()), - topic: env::var("RUSTFS__SINKS_0_KAFKA_TOPIC") + topic: env::var("RUSTFS_SINKS_KAFKA_TOPIC") .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(|| "default_topic".to_string()), @@ -123,11 +123,11 @@ pub struct WebhookSinkConfig { impl WebhookSinkConfig { pub fn new() -> Self { Self { - endpoint: env::var("RUSTFS__SINKS_0_WEBHOOK_ENDPOINT") + endpoint: env::var("RUSTFS_SINKS_WEBHOOK_ENDPOINT") .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(|| "http://localhost:8080".to_string()), - auth_token: env::var("RUSTFS__SINKS_0_WEBHOOK_AUTH_TOKEN") + auth_token: env::var("RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN") .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(|| "default_token".to_string()), @@ -160,7 +160,7 @@ impl FileSinkConfig { eprintln!("Failed to create log directory: {}", e); return "rustfs/rustfs.log".to_string(); } - + println!("Using log directory: {:?}", temp_dir); temp_dir .join("rustfs.log") .to_str() @@ -169,7 +169,7 @@ impl FileSinkConfig { } pub fn new() -> Self { Self { - path: env::var("RUSTFS__SINKS_0_FILE_PATH") + path: env::var("RUSTFS_SINKS_FILE_PATH") .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(Self::get_default_log_path), diff --git a/crates/obs/src/global.rs b/crates/obs/src/global.rs index 8d392ad9..07657fb4 100644 --- a/crates/obs/src/global.rs +++ b/crates/obs/src/global.rs @@ -3,14 +3,6 @@ use std::sync::{Arc, Mutex}; use tokio::sync::{OnceCell, SetError}; use tracing::{error, info}; -pub(crate) const USE_STDOUT: bool = true; -pub(crate) const SERVICE_NAME: &str = "RustFS"; -pub(crate) const SAMPLE_RATIO: f64 = 1.0; -pub(crate) const METER_INTERVAL: u64 = 60; -pub(crate) const SERVICE_VERSION: &str = "0.1.0"; -pub(crate) const ENVIRONMENT: &str = "production"; -pub(crate) const LOGGER_LEVEL: &str = "info"; - /// Global guard for OpenTelemetry tracing static GLOBAL_GUARD: OnceCell>> = OnceCell::const_new(); diff --git a/crates/obs/src/lib.rs b/crates/obs/src/lib.rs index 5d181e31..6a8f6219 100644 --- a/crates/obs/src/lib.rs +++ b/crates/obs/src/lib.rs @@ -35,7 +35,6 @@ mod logger; mod sinks; mod system; mod telemetry; -mod utils; mod worker; use crate::logger::InitLogStatus; diff --git a/crates/obs/src/logger.rs b/crates/obs/src/logger.rs index 92ff5365..02e0bf9b 100644 --- a/crates/obs/src/logger.rs +++ b/crates/obs/src/logger.rs @@ -1,6 +1,6 @@ -use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION}; use crate::sinks::Sink; use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry}; +use rustfs_config::{APP_NAME, ENVIRONMENT, SERVICE_VERSION}; use std::sync::Arc; use std::time::SystemTime; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -428,7 +428,7 @@ impl Default for InitLogStatus { fn default() -> Self { Self { timestamp: SystemTime::now(), - service_name: String::from(SERVICE_NAME), + service_name: String::from(APP_NAME), version: SERVICE_VERSION.to_string(), environment: ENVIRONMENT.to_string(), } @@ -442,7 +442,7 @@ impl InitLogStatus { let version = config.service_version.unwrap_or(SERVICE_VERSION.to_string()); Self { timestamp: SystemTime::now(), - service_name: String::from(SERVICE_NAME), + service_name: String::from(APP_NAME), version, environment, } diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 5a08321d..cba605d3 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,5 +1,3 @@ -use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT}; -use crate::utils::get_local_ip_with_default; use crate::OtelConfig; use opentelemetry::trace::TracerProvider; use opentelemetry::{global, KeyValue}; @@ -15,6 +13,8 @@ use opentelemetry_semantic_conventions::{ attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, SCHEMA_URL, }; +use rustfs_config::{APP_NAME, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; +use rustfs_utils::get_local_ip_with_default; use smallvec::SmallVec; use std::borrow::Cow; use std::io::IsTerminal; @@ -70,7 +70,7 @@ impl Drop for OtelGuard { /// create OpenTelemetry Resource fn resource(config: &OtelConfig) -> Resource { Resource::builder() - .with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(SERVICE_NAME)).to_string()) + .with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(APP_NAME)).to_string()) .with_schema_url( [ KeyValue::new( @@ -101,8 +101,8 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { let endpoint = &config.endpoint; let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT); let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); - let logger_level = config.logger_level.as_deref().unwrap_or(LOGGER_LEVEL); - let service_name = config.service_name.as_deref().unwrap_or(SERVICE_NAME); + let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL); + let service_name = config.service_name.as_deref().unwrap_or(APP_NAME); // Pre-create resource objects to avoid repeated construction let res = resource(config); diff --git a/crates/obs/src/utils.rs b/crates/obs/src/utils.rs deleted file mode 100644 index 77459457..00000000 --- a/crates/obs/src/utils.rs +++ /dev/null @@ -1,42 +0,0 @@ -use local_ip_address::{local_ip, local_ipv6}; -use std::net::{IpAddr, Ipv4Addr}; - -/// Get the IP address of the machine -/// -/// Priority is given to trying to get the IPv4 address, and if it fails, try to get the IPv6 address. -/// If both fail to retrieve, None is returned. -/// -/// # Returns -/// -/// * `Some(IpAddr)` - Native IP address (IPv4 or IPv6) -/// * `None` - Unable to obtain any native IP address -pub fn get_local_ip() -> Option { - local_ip().ok().or_else(|| local_ipv6().ok()) -} - -/// Get the IP address of the machine as a string -/// -/// If the IP address cannot be obtained, returns "127.0.0.1" as the default value. -/// -/// # Returns -/// -/// * `String` - Native IP address (IPv4 or IPv6) as a string, or the default value -pub fn get_local_ip_with_default() -> String { - get_local_ip() - .unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) // Provide a safe default value - .to_string() -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_get_local_ip() { - match get_local_ip() { - Some(ip) => println!("the ip address of this machine:{}", ip), - None => println!("Unable to obtain the IP address of the machine"), - } - assert!(get_local_ip().is_some()); - } -} diff --git a/crates/obs/src/worker.rs b/crates/obs/src/worker.rs index aee1695d..cfe2f26c 100644 --- a/crates/obs/src/worker.rs +++ b/crates/obs/src/worker.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::sync::mpsc::Receiver; /// Start the log processing worker thread -pub async fn start_worker(receiver: Receiver, sinks: Vec>) { +pub(crate) async fn start_worker(receiver: Receiver, sinks: Vec>) { let mut receiver = receiver; while let Some(entry) = receiver.recv().await { for sink in &sinks { diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 13ee21e4..76cac05c 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -7,12 +7,19 @@ rust-version.workspace = true version.workspace = true [dependencies] -local-ip-address = { workspace = true } +local-ip-address = { workspace = true, optional = true } rustfs-config = { workspace = true } -rustls = { workspace = true } -rustls-pemfile = { workspace = true } -rustls-pki-types = { workspace = true } +rustls = { workspace = true, optional = true } +rustls-pemfile = { workspace = true, optional = true } +rustls-pki-types = { workspace = true, optional = true } tracing = { workspace = true } [lints] workspace = true + +[features] +default = ["ip"] # features that are enabled by default +ip = ["dep:local-ip-address"] # ip characteristics and their dependencies +tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies +net = ["ip"] # empty network features +full = ["ip", "tls", "net"] # all features \ No newline at end of file diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index fbb5936b..cda53d08 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -2,10 +2,7 @@ mod certs; mod ip; mod net; -pub use certs::certs_error; -pub use certs::create_multi_cert_resolver; -pub use certs::load_all_certs_from_directory; -pub use certs::load_certs; -pub use certs::load_private_key; -pub use ip::get_local_ip; -pub use ip::get_local_ip_with_default; +#[cfg(feature = "ip")] +pub use certs::*; +#[cfg(feature = "ip")] +pub use ip::*; diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 3023a9ca..a238665d 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -55,7 +55,7 @@ rmp-serde.workspace = true rustfs-config = { workspace = true } rustfs-event-notifier = { workspace = true } rustfs-obs = { workspace = true } -rustfs-utils = { workspace = true } +rustfs-utils = { workspace = true, features = ["full"] } rustls.workspace = true rust-embed = { workspace = true, features = ["interpolate-folder-path"] } s3s.workspace = true