This commit is contained in:
houseme
2025-02-27 19:00:47 +08:00
parent ec9b1262cb
commit d7417d841f
6 changed files with 186 additions and 14 deletions

15
Cargo.lock generated
View File

@@ -4278,6 +4278,20 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-appender-tracing"
version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c513c7af3bec30113f3d4620134ff923295f1e9c580fda2b8abe0831f925ddc0"
dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-opentelemetry",
"tracing-subscriber",
]
[[package]]
name = "opentelemetry-http"
version = "0.28.0"
@@ -5546,6 +5560,7 @@ version = "0.0.1"
dependencies = [
"chrono",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",

View File

@@ -61,6 +61,7 @@ lazy_static = "1.5.0"
mime = "0.3.17"
netif = "0.1.6"
opentelemetry = { version = "0.28" }
opentelemetry-appender-tracing = { version = "0.28.1" }
opentelemetry_sdk = { version = "0.28" }
opentelemetry-stdout = { version = "0.28.0" }
opentelemetry-otlp = { version = "0.28" }
@@ -119,3 +120,21 @@ md-5 = "0.10.6"
workers = { path = "./common/workers" }
test-case = "3.3.1"
zip = "2.2.2"
[profile]
[profile.wasm-dev]
inherits = "dev"
opt-level = 1
[profile.server-dev]
inherits = "dev"
[profile.android-dev]
inherits = "dev"
[profile.release]
opt-level = 3 # Optimization Level (0-3)
lto = true # Optimize when linking
codegen-units = 1 # Reduce code generation units to improve optimization

View File

@@ -16,6 +16,7 @@ audit-webhook = ["dep:reqwest"]
[dependencies]
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics"] }

View File

@@ -86,7 +86,7 @@ impl AuditTarget for FileAuditTarget {
/// #Arguments
/// * `client` - The reqwest client
/// * `url` - The URL of the webhook
/// # Example
/// # Example
/// ```
/// use rustfs_logging::WebhookAuditTarget;
/// let target = WebhookAuditTarget::new("http://localhost:8080");
@@ -310,6 +310,10 @@ impl AuditLogger {
/// }
/// ```
pub async fn log(&self, entry: AuditEntry) {
// 将日志消息记录到当前 Span
tracing::Span::current()
.record("log_message", &entry.bucket)
.record("source", &entry.event_type);
let _ = self.tx.send(entry).await;
}
}

View File

@@ -1,4 +1,5 @@
//! Logging utilities
///
/// This crate provides utilities for logging.
///
@@ -24,8 +25,9 @@ mod telemetry;
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use opentelemetry::global;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::{TraceContextExt, Tracer};
use std::time::{Duration, SystemTime};
use tracing::{instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
@@ -60,7 +62,7 @@ mod tests {
bucket,
object,
user,
time: chrono::Utc::now().to_rfc3339(),
time: Utc::now().to_rfc3339(),
user_agent: "rustfs.rs-client".to_string(),
span_id,
};
@@ -83,11 +85,106 @@ mod tests {
];
let audit_logger = AuditLogger::new(audit_targets);
// 创建根 Span 并执行操作
// let tracer = global::tracer("main");
// tracer.in_span("main_operation", |cx| {
// Span::current().set_parent(cx);
// log_info("Starting test async");
// tokio::runtime::Runtime::new().unwrap().block_on(async {
log_info("Starting test");
// Test the PUT operation
put_object(&audit_logger, "my-bucket".to_string(), "my-object.txt".to_string(), "user123".to_string()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
query_object(&audit_logger, "my-bucket".to_string(), "my-object.txt".to_string(), "user123".to_string()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 0..100 {
put_object(
&audit_logger,
format!("my-bucket-{}", i),
format!("my-object-{}", i),
"user123".to_string(),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
query_object(
&audit_logger,
format!("my-bucket-{}", i),
format!("my-object-{}", i),
"user123".to_string(),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Wait for the export to complete
tokio::time::sleep(Duration::from_secs(2)).await;
log_info("Test completed");
// });
// });
drop(telemetry); // Make sure to clean up
}
#[instrument(fields(bucket, object, user))]
async fn query_object(audit_logger: &AuditLogger, bucket: String, object: String, user: String) {
let start_time = SystemTime::now();
log_info("Starting query operation");
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
// Record Metrics
let meter = global::meter("rustfs.rs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "query_object")],
);
// Gets the current span
let span = Span::current();
// Use 'OpenTelemetrySpanExt' to get 'SpanContext'
let span_context = span.context(); // Get context via OpenTelemetrySpanExt
let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId
query_one(user.clone());
query_two(user.clone());
query_three(user.clone());
// Audit events are logged
let audit_entry = AuditEntry {
version: "1.0".to_string(),
event_type: "s3_query_object".to_string(),
bucket,
object,
user,
time: Utc::now().to_rfc3339(),
user_agent: "rustfs.rs-client".to_string(),
span_id,
};
audit_logger.log(audit_entry).await;
log_info("query operation completed");
}
#[instrument(fields(user))]
fn query_one(user: String) {
// 初始化 OpenTelemetry Tracer
let tracer = global::tracer("query_one");
tracer.in_span("doing_work", |cx| {
// Traced app logic here...
Span::current().set_parent(cx);
log_info("Doing work...");
let current_span = Span::current();
let span_context = current_span.context();
let trace_id = span_context.clone().span().span_context().trace_id().to_string();
let span_id = span_context.clone().span().span_context().span_id().to_string();
log_info(format!("trace_id: {}, span_id: {}", trace_id, span_id).as_str());
});
log_info(format!("Starting query_one operation user:{}", user).as_str());
}
#[instrument(fields(user))]
fn query_two(user: String) {
log_info(format!("Starting query_two operation user:{}", user).as_str());
}
#[instrument(fields(user))]
fn query_three(user: String) {
log_info(format!("Starting query_three operation user: {}", user).as_str());
}
}

View File

@@ -1,18 +1,22 @@
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer;
use opentelemetry_otlp::{self, WithExportConfig};
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::attribute::NETWORK_LOCAL_ADDRESS;
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use tracing::Level;
use std::time::Duration;
use tracing::{info, Level};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
/// Telemetry is a wrapper around the OpenTelemetry SDK Tracer and Meter providers.
/// It initializes the global Tracer and Meter providers, and sets up the tracing subscriber.
@@ -34,11 +38,13 @@ impl Telemetry {
pub fn init() -> Self {
// Define service resource information
let resource = Resource::builder()
.with_service_name("rustfs-service")
.with_schema_url(
[
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(SERVICE_NAME, "rustfs-service"),
KeyValue::new(SERVICE_VERSION, "0.1.0"),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
KeyValue::new(NETWORK_LOCAL_ADDRESS, "127.0.0.1"),
],
SCHEMA_URL,
)
@@ -47,6 +53,8 @@ impl Telemetry {
let tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(Duration::from_secs(3))
.build()
.unwrap();
@@ -56,35 +64,61 @@ impl Telemetry {
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource.clone())
.with_batch_exporter(tracer_exporter)
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
// .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build();
let meter_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(Duration::from_secs(3))
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
let meter_reader = PeriodicReader::builder(meter_exporter)
.with_interval(std::time::Duration::from_secs(30))
.with_interval(Duration::from_secs(30))
.build();
// For debugging in development
let meter_stdout_reader = PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
// let meter_stdout_reader = PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
// Configure Meter Provider
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource)
.with_resource(resource.clone())
.with_reader(meter_reader)
.with_reader(meter_stdout_reader)
// .with_reader(meter_stdout_reader)
.build();
// Set global Tracer and Meter providers
global::set_tracer_provider(tracer_provider.clone());
global::set_meter_provider(meter_provider.clone());
let tracer = tracer_provider.tracer("tracing-otel-subscriber-rustfs-service");
let tracer = tracer_provider.tracer("rustfs-service");
// // let _stdout_exporter = opentelemetry_stdout::LogExporter::default();
// let otlp_exporter = opentelemetry_otlp::LogExporter::builder()
// .with_tonic()
// .with_endpoint("http://localhost:4317")
// // .with_timeout(Duration::from_secs(3))
// // .with_protocol(opentelemetry_otlp::Protocol::Grpc)
// .build()
// .unwrap();
// let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
// .with_resource(resource.clone())
// .with_simple_exporter(otlp_exporter)
// .build();
// let filter_otel = EnvFilter::new("debug")
// // .add_directive("hyper=off".parse().unwrap())
// // .add_directive("opentelemetry=off".parse().unwrap())
// // .add_directive("tonic=off".parse().unwrap())
// // .add_directive("h2=off".parse().unwrap())
// .add_directive("reqwest=off".parse().unwrap());
// let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);
let filter_fmt = EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap());
let fmt_layer = tracing_subscriber::fmt::layer()
.with_thread_names(true)
.with_filter(filter_fmt);
// Configure `tracing subscriber`
tracing_subscriber::registry()
@@ -92,8 +126,10 @@ impl Telemetry {
.with(tracing_subscriber::fmt::layer().with_ansi(true))
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
// .with(otel_layer)
.with(fmt_layer)
.init();
info!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
Self {
tracer_provider,
meter_provider,