diff --git a/Cargo.lock b/Cargo.lock index d1dc017d..d6a65c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4290,6 +4290,109 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.11", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8863faf2910030d139fb48715ad5ff2f35029fc5f244f6d5f689ddcf4d26253" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bef114c6d41bea83d6dc60eb41720eedd0261a67af57b66dd2b84ac46c01d91" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.11", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fb3a2f78c2d55362cd6c313b8abedfbc0142ab3c2676822068fd2ab7d51f9b7" + +[[package]] +name = "opentelemetry-stdout" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb0e5a5132e4b80bf037a78e3e12c8402535199f5de490d0c38f7eac71bc831" +dependencies = [ + "async-trait", + "chrono", + "futures-util", + "opentelemetry", + "opentelemetry_sdk", + "serde", + "thiserror 2.0.11", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 2.0.11", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -5213,6 +5316,7 @@ dependencies = [ "base64", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", @@ -5443,6 +5547,23 @@ dependencies = [ "zip", ] +[[package]] +name = "rustfs-logging" +version = "0.0.1" +dependencies = [ + "chrono", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry-stdout", + "opentelemetry_sdk", + "serde", + "tokio", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "rustix" version = "0.38.44" @@ -6440,6 +6561,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -6738,6 +6860,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index eed30992..511155ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "iam", "crypto", "cli/rustfs-gui", + "packages/logging", ] resolver = "2" @@ -59,6 +60,11 @@ lock = { path = "./common/lock" } lazy_static = "1.5.0" mime = "0.3.17" netif = "0.1.6" +opentelemetry = { version = "0.28" } +opentelemetry_sdk = { version = "0.28" } +opentelemetry-stdout = { version = "0.28.0" } +opentelemetry-otlp = { version = "0.28" } +opentelemetry-semantic-conventions = { version = "0.28.0", features = ["semconv_experimental"] } pin-project-lite = "0.2" # pin-utils = "0.1.0" prost = "0.13.4" @@ -71,6 +77,7 @@ reqwest = { version = "0.12.12", default-features = false, features = ["rustls-t rfd = "0.15.2" rmp = "0.8.14" rmp-serde = "1.3.0" +rustfs-logging = { path = "packages/logging", version = "0.0.1" } s3s = { git = "https://github.com/Nugine/s3s.git", rev = "529c8933a11528c506d5fbf7c4c2ab155db37dfe", default-features = true, features = [ "tower", ] } @@ -96,6 +103,7 @@ tracing = "0.1.41" tracing-error = "0.2.1" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] } tracing-appender = "0.2.3" +tracing-opentelemetry = "0.29" transform-stream = "0.3.1" url = "2.5.4" uuid = { version = "1.12.1", features = [ @@ -108,4 +116,4 @@ axum = "0.7.9" md-5 = "0.10.6" workers = { path = "./common/workers" } test-case = "3.3.1" -zip = "2.2.2" \ No newline at end of file +zip = "2.2.2" diff --git a/packages/logging/Cargo.toml b/packages/logging/Cargo.toml new file mode 100644 index 00000000..af37740d --- /dev/null +++ b/packages/logging/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "rustfs-logging" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[dependencies] +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +opentelemetry-stdout = { workspace = true } +opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics"] } +opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } +serde = { workspace = true } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "tracing-log", "time", "local-time", "json"] } +tokio = { workspace = true, features = ["full"] } + + + +[dev-dependencies] +chrono = { workspace = true } +opentelemetry = { workspace = true, features = ["trace", "metrics"] } +opentelemetry_sdk = { workspace = true, features = ["trace", "rt-tokio"] } +opentelemetry-stdout = { workspace = true, features = ["trace", "metrics"] } +opentelemetry-otlp = { workspace = true, features = ["metrics", "grpc-tonic"] } +opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt"] } \ No newline at end of file diff --git a/packages/logging/src/audit.rs b/packages/logging/src/audit.rs new file mode 100644 index 00000000..0f120db2 --- /dev/null +++ b/packages/logging/src/audit.rs @@ -0,0 +1,153 @@ +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; + +/// AuditEntry is a struct that represents an audit entry +/// that can be logged +/// # Fields +/// * `version` - The version of the audit entry +/// * `event_type` - The type of event that occurred +/// * `bucket` - The bucket that was accessed +/// * `object` - The object that was accessed +/// * `user` - The user that accessed the object +/// * `time` - The time the event occurred +/// * `user_agent` - The user agent that accessed the object +/// * `span_id` - The span ID of the event +/// # Example +/// ``` +/// use rustfs_logging::AuditEntry; +/// let entry = AuditEntry { +/// version: "1.0".to_string(), +/// event_type: "read".to_string(), +/// bucket: "bucket".to_string(), +/// object: "object".to_string(), +/// user: "user".to_string(), +/// time: "time".to_string(), +/// user_agent: "user_agent".to_string(), +/// span_id: "span_id".to_string(), +/// }; +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AuditEntry { + pub version: String, + pub event_type: String, + pub bucket: String, + pub object: String, + pub user: String, + pub time: String, + pub user_agent: String, + pub span_id: String, +} + +/// AuditTarget is a trait that defines the interface for audit targets +/// that can receive audit entries +pub trait AuditTarget: Send + Sync { + fn send(&self, entry: AuditEntry); +} + +/// FileAuditTarget is an audit target that logs audit entries to a file +pub struct FileAuditTarget; + +impl AuditTarget for FileAuditTarget { + /// Send an audit entry to a file + /// # Arguments + /// * `entry` - The audit entry to send + /// # Example + /// ``` + /// use rustfs_logging::{AuditEntry, AuditTarget, FileAuditTarget}; + /// let entry = AuditEntry { + /// version: "1.0".to_string(), + /// event_type: "read".to_string(), + /// bucket: "bucket".to_string(), + /// object: "object".to_string(), + /// user: "user".to_string(), + /// time: "time".to_string(), + /// user_agent: "user_agent".to_string(), + /// span_id: "span_id".to_string(), + /// }; + /// FileAuditTarget.send(entry); + /// ``` + /// + fn send(&self, entry: AuditEntry) { + println!("File audit: {:?}", entry); + } +} + +/// AuditLogger is a logger that logs audit entries +/// to multiple targets +/// +/// # Example +/// ``` +/// use rustfs_logging::{AuditEntry, AuditLogger, FileAuditTarget}; +/// +/// #[tokio::main] +/// async fn main() { +/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]); +/// let entry = AuditEntry { +/// version: "1.0".to_string(), +/// event_type: "read".to_string(), +/// bucket: "bucket".to_string(), +/// object: "object".to_string(), +/// user: "user".to_string(), +/// time: "time".to_string(), +/// user_agent: "user_agent".to_string(), +/// span_id: "span_id".to_string(), +/// }; +/// logger.log(entry).await; +/// } +/// ``` +#[derive(Debug)] +pub struct AuditLogger { + tx: mpsc::Sender, +} + +impl AuditLogger { + /// Create a new AuditLogger with the given targets + /// that will receive audit entries + /// # Arguments + /// * `targets` - A vector of audit targets + /// # Returns + /// * An AuditLogger + /// # Example + /// ``` + /// use rustfs_logging::{AuditLogger, AuditEntry, FileAuditTarget}; + /// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]); + /// ``` + pub fn new(targets: Vec>) -> Self { + let (tx, mut rx) = mpsc::channel::(1000); + tokio::spawn(async move { + while let Some(entry) = rx.recv().await { + for target in &targets { + target.send(entry.clone()); + } + } + }); + Self { tx } + } + + /// Log an audit entry + /// # Arguments + /// * `entry` - The audit entry to log + /// # Example + /// ``` + /// use rustfs_logging::{AuditEntry, AuditLogger, FileAuditTarget}; + /// + /// #[tokio::main] + /// async fn main() { + /// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]); + /// let entry = AuditEntry { + /// version: "1.0".to_string(), + /// event_type: "read".to_string(), + /// bucket: "bucket".to_string(), + /// object: "object".to_string(), + /// user: "user".to_string(), + /// time: "time".to_string(), + /// user_agent: "user_agent".to_string(), + /// span_id: "span_id".to_string(), + /// }; + /// logger.log(entry).await; + /// } + /// ``` + pub async fn log(&self, entry: AuditEntry) { + let _ = self.tx.send(entry).await; + } +} diff --git a/packages/logging/src/lib.rs b/packages/logging/src/lib.rs new file mode 100644 index 00000000..3fdbb83d --- /dev/null +++ b/packages/logging/src/lib.rs @@ -0,0 +1,83 @@ +//! Logging utilities +/// +/// This crate provides utilities for logging. +/// +/// # Examples +/// ``` +/// use rustfs_logging::{log_info, log_error}; +/// +/// log_info("This is an informational message"); +/// log_error("This is an error message"); +/// ``` +pub use audit::{AuditEntry, AuditLogger, AuditTarget, FileAuditTarget}; +pub use logger::{log_debug, log_error, log_info}; +pub use telemetry::Telemetry; + +mod audit; +mod logger; +mod telemetry; + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::global; + use opentelemetry::trace::TraceContextExt; + use std::time::{Duration, SystemTime}; + use tracing::{instrument, Span}; + use tracing_opentelemetry::OpenTelemetrySpanExt; + + #[instrument(fields(bucket, object, user))] + async fn put_object(audit_logger: &AuditLogger, bucket: String, object: String, user: String) { + let start_time = SystemTime::now(); + log_info("Starting PUT operation"); + + // Simulate the operation + tokio::time::sleep(Duration::from_millis(100)).await; + + // Record Metrics + let meter = global::meter("rustfs.rs"); + let request_duration = meter.f64_histogram("s3_request_duration_seconds").build(); + request_duration.record( + start_time.elapsed().unwrap().as_secs_f64(), + &[opentelemetry::KeyValue::new("operation", "put_object")], + ); + + // Gets the current span + let span = Span::current(); + + // Use 'OpenTelemetrySpanExt' to get 'SpanContext' + let span_context = span.context(); // Get context via OpenTelemetrySpanExt + let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId + + // Audit events are logged + let audit_entry = AuditEntry { + version: "1.0".to_string(), + event_type: "s3_put_object".to_string(), + bucket, + object, + user, + time: chrono::Utc::now().to_rfc3339(), + user_agent: "rustfs.rs-client".to_string(), + span_id, + }; + audit_logger.log(audit_entry).await; + + log_info("PUT operation completed"); + } + + #[tokio::test] + async fn test_main() { + let telemetry = Telemetry::init(); + + // Initialize multiple audit objectives + let audit_targets: Vec> = vec![Box::new(FileAuditTarget)]; + let audit_logger = AuditLogger::new(audit_targets); + + // Test the PUT operation + put_object(&audit_logger, "my-bucket".to_string(), "my-object.txt".to_string(), "user123".to_string()).await; + + // Wait for the export to complete + tokio::time::sleep(Duration::from_secs(2)).await; + drop(telemetry); // Make sure to clean up + } +} diff --git a/packages/logging/src/logger.rs b/packages/logging/src/logger.rs new file mode 100644 index 00000000..67c8e630 --- /dev/null +++ b/packages/logging/src/logger.rs @@ -0,0 +1,46 @@ +use tracing::{debug, error, info}; + +/// Log an info message +/// +/// # Arguments +/// msg: &str - The message to log +/// +/// # Example +/// ``` +/// use rustfs_logging::log_info; +/// +/// log_info("This is an info message"); +/// ``` +pub fn log_info(msg: &str) { + info!("{}", msg); +} + +/// Log an error message +/// +/// # Arguments +/// msg: &str - The message to log +/// +/// # Example +/// ``` +/// use rustfs_logging::log_error; +/// +/// log_error("This is an error message"); +/// ``` +pub fn log_error(msg: &str) { + error!("{}", msg); +} + +/// Log a debug message +/// +/// # Arguments +/// msg: &str - The message to log +/// +/// # Example +/// ``` +/// use rustfs_logging::log_debug; +/// +/// log_debug("This is a debug message"); +/// ``` +pub fn log_debug(msg: &str) { + debug!("{}", msg); +} diff --git a/packages/logging/src/telemetry.rs b/packages/logging/src/telemetry.rs new file mode 100644 index 00000000..4d1759c4 --- /dev/null +++ b/packages/logging/src/telemetry.rs @@ -0,0 +1,113 @@ +use opentelemetry::trace::TracerProvider; +use opentelemetry::{global, KeyValue}; +use opentelemetry_otlp::{self, WithExportConfig}; +use opentelemetry_sdk::{ + metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, + trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use tracing::Level; +use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +/// Telemetry is a wrapper around the OpenTelemetry SDK Tracer and Meter providers. +/// It initializes the global Tracer and Meter providers, and sets up the tracing subscriber. +/// The Tracer and Meter providers are shut down when the Telemetry instance is dropped. +/// This is a convenience struct to ensure that the global providers are properly initialized and shut down. +/// +/// # Example +/// ``` +/// use rustfs_logging::Telemetry; +/// +/// let _telemetry = Telemetry::init(); +/// ``` +pub struct Telemetry { + tracer_provider: SdkTracerProvider, + meter_provider: SdkMeterProvider, +} + +impl Telemetry { + pub fn init() -> Self { + // Define service resource information + let resource = Resource::builder() + .with_schema_url( + [ + KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")), + KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"), + ], + SCHEMA_URL, + ) + .build(); + + let tracer_exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .build() + .unwrap(); + + // Configure Tracer Provider + let tracer_provider = SdkTracerProvider::builder() + .with_sampler(Sampler::AlwaysOn) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource.clone()) + .with_batch_exporter(tracer_exporter) + .with_simple_exporter(opentelemetry_stdout::SpanExporter::default()) + .build(); + + let meter_exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build() + .unwrap(); + + let meter_reader = PeriodicReader::builder(meter_exporter) + .with_interval(std::time::Duration::from_secs(30)) + .build(); + + // For debugging in development + let meter_stdout_reader = PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build(); + + // Configure Meter Provider + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource) + .with_reader(meter_reader) + .with_reader(meter_stdout_reader) + .build(); + + // Set global Tracer and Meter providers + global::set_tracer_provider(tracer_provider.clone()); + global::set_meter_provider(meter_provider.clone()); + + let tracer = tracer_provider.tracer("tracing-otel-subscriber-rustfs-service"); + + // Configure `tracing subscriber` + tracing_subscriber::registry() + .with(tracing_subscriber::filter::LevelFilter::from_level(Level::DEBUG)) + .with(tracing_subscriber::fmt::layer().with_ansi(true)) + .with(MetricsLayer::new(meter_provider.clone())) + .with(OpenTelemetryLayer::new(tracer)) + .init(); + + Self { + tracer_provider, + meter_provider, + } + } +} + +impl Drop for Telemetry { + fn drop(&mut self) { + if let Err(err) = self.tracer_provider.shutdown() { + eprintln!("{err:?}"); + } + if let Err(err) = self.meter_provider.shutdown() { + eprintln!("{err:?}"); + } + } +} diff --git a/rustfs/src/logging/mod.rs b/rustfs/src/logging/mod.rs new file mode 100644 index 00000000..e69de29b diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index a2ed4790..cfd84975 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -3,8 +3,10 @@ mod auth; mod config; mod console; mod grpc; +mod logging; mod service; mod storage; + use crate::auth::IAMAuth; use clap::Parser; use common::{ @@ -68,7 +70,7 @@ fn main() -> Result<()> { //解析获得到的参数 let opt = config::Opt::parse(); - //设置trace + //设置 trace setup_tracing(); //运行参数 @@ -91,17 +93,17 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("server_address {}", &server_address); - //设置AK和SK + //设置 AK 和 SK iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone())).unwrap(); set_global_rustfs_port(server_port); - //监听地址,端口从参数中获取 + //监听地址,端口从参数中获取 let listener = TcpListener::bind(server_address.clone()).await?; //获取监听地址 let local_addr: SocketAddr = listener.local_addr()?; - // 用于rpc + // 用于 rpc let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone()) .map_err(|err| Error::from_string(err.to_string()))?; @@ -123,13 +125,13 @@ async fn run(opt: config::Opt) -> Result<()> { .map_err(|err| Error::from_string(err.to_string()))?; // Setup S3 service - // 本项目使用s3s库来实现s3服务 + // 本项目使用 s3s 库来实现 s3 服务 let service = { let store = storage::ecfs::FS::new(); // let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?); let mut b = S3ServiceBuilder::new(store.clone()); - //显示info信息 + //显示 info 信息 info!("authentication is enabled {}, {}", &opt.access_key, &opt.secret_key); b.set_auth(IAMAuth::new(opt.access_key, opt.secret_key));