Implement initialization for packages/logging

- Add initialization logic for the `rustfs-logging` crate.
- Provide examples for logging utilities.
- Include tests for logging and telemetry functionalities.
- Ensure proper configuration of dependencies in `Cargo.toml`.
This commit is contained in:
houseme
2025-02-21 13:46:35 +08:00
parent 391eb5b6f9
commit 03589201fb
9 changed files with 587 additions and 7 deletions

140
Cargo.lock generated
View File

@@ -4290,6 +4290,109 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "opentelemetry"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror 2.0.11",
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8863faf2910030d139fb48715ad5ff2f35029fc5f244f6d5f689ddcf4d26253"
dependencies = [
"async-trait",
"bytes",
"http",
"opentelemetry",
"reqwest",
"tracing",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bef114c6d41bea83d6dc60eb41720eedd0261a67af57b66dd2b84ac46c01d91"
dependencies = [
"async-trait",
"futures-core",
"http",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"reqwest",
"thiserror 2.0.11",
"tokio",
"tonic",
"tracing",
]
[[package]]
name = "opentelemetry-proto"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost",
"tonic",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fb3a2f78c2d55362cd6c313b8abedfbc0142ab3c2676822068fd2ab7d51f9b7"
[[package]]
name = "opentelemetry-stdout"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eb0e5a5132e4b80bf037a78e3e12c8402535199f5de490d0c38f7eac71bc831"
dependencies = [
"async-trait",
"chrono",
"futures-util",
"opentelemetry",
"opentelemetry_sdk",
"serde",
"thiserror 2.0.11",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570"
dependencies = [
"async-trait",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"opentelemetry",
"percent-encoding",
"rand 0.8.5",
"serde_json",
"thiserror 2.0.11",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "option-ext"
version = "0.2.0"
@@ -5213,6 +5316,7 @@ dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-channel",
"futures-core",
"futures-util",
"h2",
@@ -5443,6 +5547,23 @@ dependencies = [
"zip",
]
[[package]]
name = "rustfs-logging"
version = "0.0.1"
dependencies = [
"chrono",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",
"opentelemetry_sdk",
"serde",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
]
[[package]]
name = "rustix"
version = "0.38.44"
@@ -6440,6 +6561,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@@ -6738,6 +6860,24 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"smallvec",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"web-time",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"

View File

@@ -13,6 +13,7 @@ members = [
"iam",
"crypto",
"cli/rustfs-gui",
"packages/logging",
]
resolver = "2"
@@ -59,6 +60,11 @@ lock = { path = "./common/lock" }
lazy_static = "1.5.0"
mime = "0.3.17"
netif = "0.1.6"
opentelemetry = { version = "0.28" }
opentelemetry_sdk = { version = "0.28" }
opentelemetry-stdout = { version = "0.28.0" }
opentelemetry-otlp = { version = "0.28" }
opentelemetry-semantic-conventions = { version = "0.28.0", features = ["semconv_experimental"] }
pin-project-lite = "0.2"
# pin-utils = "0.1.0"
prost = "0.13.4"
@@ -71,6 +77,7 @@ reqwest = { version = "0.12.12", default-features = false, features = ["rustls-t
rfd = "0.15.2"
rmp = "0.8.14"
rmp-serde = "1.3.0"
rustfs-logging = { path = "packages/logging", version = "0.0.1" }
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "529c8933a11528c506d5fbf7c4c2ab155db37dfe", default-features = true, features = [
"tower",
] }
@@ -96,6 +103,7 @@ tracing = "0.1.41"
tracing-error = "0.2.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
tracing-appender = "0.2.3"
tracing-opentelemetry = "0.29"
transform-stream = "0.3.1"
url = "2.5.4"
uuid = { version = "1.12.1", features = [
@@ -108,4 +116,4 @@ axum = "0.7.9"
md-5 = "0.10.6"
workers = { path = "./common/workers" }
test-case = "3.3.1"
zip = "2.2.2"
zip = "2.2.2"

View File

@@ -0,0 +1,35 @@
[package]
name = "rustfs-logging"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[lints]
workspace = true
[dependencies]
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics"] }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
serde = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }
tokio = { workspace = true, features = ["full"] }
[dev-dependencies]
chrono = { workspace = true }
opentelemetry = { workspace = true, features = ["trace", "metrics"] }
opentelemetry_sdk = { workspace = true, features = ["trace", "rt-tokio"] }
opentelemetry-stdout = { workspace = true, features = ["trace", "metrics"] }
opentelemetry-otlp = { workspace = true, features = ["metrics", "grpc-tonic"] }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt"] }

View File

@@ -0,0 +1,153 @@
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
/// AuditEntry is a struct that represents an audit entry
/// that can be logged
/// # Fields
/// * `version` - The version of the audit entry
/// * `event_type` - The type of event that occurred
/// * `bucket` - The bucket that was accessed
/// * `object` - The object that was accessed
/// * `user` - The user that accessed the object
/// * `time` - The time the event occurred
/// * `user_agent` - The user agent that accessed the object
/// * `span_id` - The span ID of the event
/// # Example
/// ```
/// use rustfs_logging::AuditEntry;
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// ```
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AuditEntry {
pub version: String,
pub event_type: String,
pub bucket: String,
pub object: String,
pub user: String,
pub time: String,
pub user_agent: String,
pub span_id: String,
}
/// AuditTarget is a trait that defines the interface for audit targets
/// that can receive audit entries
pub trait AuditTarget: Send + Sync {
fn send(&self, entry: AuditEntry);
}
/// FileAuditTarget is an audit target that logs audit entries to a file
pub struct FileAuditTarget;
impl AuditTarget for FileAuditTarget {
/// Send an audit entry to a file
/// # Arguments
/// * `entry` - The audit entry to send
/// # Example
/// ```
/// use rustfs_logging::{AuditEntry, AuditTarget, FileAuditTarget};
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// FileAuditTarget.send(entry);
/// ```
///
fn send(&self, entry: AuditEntry) {
println!("File audit: {:?}", entry);
}
}
/// AuditLogger is a logger that logs audit entries
/// to multiple targets
///
/// # Example
/// ```
/// use rustfs_logging::{AuditEntry, AuditLogger, FileAuditTarget};
///
/// #[tokio::main]
/// async fn main() {
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// logger.log(entry).await;
/// }
/// ```
#[derive(Debug)]
pub struct AuditLogger {
tx: mpsc::Sender<AuditEntry>,
}
impl AuditLogger {
/// Create a new AuditLogger with the given targets
/// that will receive audit entries
/// # Arguments
/// * `targets` - A vector of audit targets
/// # Returns
/// * An AuditLogger
/// # Example
/// ```
/// use rustfs_logging::{AuditLogger, AuditEntry, FileAuditTarget};
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// ```
pub fn new(targets: Vec<Box<dyn AuditTarget>>) -> Self {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(1000);
tokio::spawn(async move {
while let Some(entry) = rx.recv().await {
for target in &targets {
target.send(entry.clone());
}
}
});
Self { tx }
}
/// Log an audit entry
/// # Arguments
/// * `entry` - The audit entry to log
/// # Example
/// ```
/// use rustfs_logging::{AuditEntry, AuditLogger, FileAuditTarget};
///
/// #[tokio::main]
/// async fn main() {
/// let logger = AuditLogger::new(vec![Box::new(FileAuditTarget)]);
/// let entry = AuditEntry {
/// version: "1.0".to_string(),
/// event_type: "read".to_string(),
/// bucket: "bucket".to_string(),
/// object: "object".to_string(),
/// user: "user".to_string(),
/// time: "time".to_string(),
/// user_agent: "user_agent".to_string(),
/// span_id: "span_id".to_string(),
/// };
/// logger.log(entry).await;
/// }
/// ```
pub async fn log(&self, entry: AuditEntry) {
let _ = self.tx.send(entry).await;
}
}

View File

@@ -0,0 +1,83 @@
//! Logging utilities
///
/// This crate provides utilities for logging.
///
/// # Examples
/// ```
/// use rustfs_logging::{log_info, log_error};
///
/// log_info("This is an informational message");
/// log_error("This is an error message");
/// ```
pub use audit::{AuditEntry, AuditLogger, AuditTarget, FileAuditTarget};
pub use logger::{log_debug, log_error, log_info};
pub use telemetry::Telemetry;
mod audit;
mod logger;
mod telemetry;
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::global;
use opentelemetry::trace::TraceContextExt;
use std::time::{Duration, SystemTime};
use tracing::{instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[instrument(fields(bucket, object, user))]
async fn put_object(audit_logger: &AuditLogger, bucket: String, object: String, user: String) {
let start_time = SystemTime::now();
log_info("Starting PUT operation");
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
// Record Metrics
let meter = global::meter("rustfs.rs");
let request_duration = meter.f64_histogram("s3_request_duration_seconds").build();
request_duration.record(
start_time.elapsed().unwrap().as_secs_f64(),
&[opentelemetry::KeyValue::new("operation", "put_object")],
);
// Gets the current span
let span = Span::current();
// Use 'OpenTelemetrySpanExt' to get 'SpanContext'
let span_context = span.context(); // Get context via OpenTelemetrySpanExt
let span_id = span_context.span().span_context().span_id().to_string(); // Get the SpanId
// Audit events are logged
let audit_entry = AuditEntry {
version: "1.0".to_string(),
event_type: "s3_put_object".to_string(),
bucket,
object,
user,
time: chrono::Utc::now().to_rfc3339(),
user_agent: "rustfs.rs-client".to_string(),
span_id,
};
audit_logger.log(audit_entry).await;
log_info("PUT operation completed");
}
#[tokio::test]
async fn test_main() {
let telemetry = Telemetry::init();
// Initialize multiple audit objectives
let audit_targets: Vec<Box<dyn AuditTarget>> = vec![Box::new(FileAuditTarget)];
let audit_logger = AuditLogger::new(audit_targets);
// Test the PUT operation
put_object(&audit_logger, "my-bucket".to_string(), "my-object.txt".to_string(), "user123".to_string()).await;
// Wait for the export to complete
tokio::time::sleep(Duration::from_secs(2)).await;
drop(telemetry); // Make sure to clean up
}
}

View File

@@ -0,0 +1,46 @@
use tracing::{debug, error, info};
/// Log an info message
///
/// # Arguments
/// msg: &str - The message to log
///
/// # Example
/// ```
/// use rustfs_logging::log_info;
///
/// log_info("This is an info message");
/// ```
pub fn log_info(msg: &str) {
info!("{}", msg);
}
/// Log an error message
///
/// # Arguments
/// msg: &str - The message to log
///
/// # Example
/// ```
/// use rustfs_logging::log_error;
///
/// log_error("This is an error message");
/// ```
pub fn log_error(msg: &str) {
error!("{}", msg);
}
/// Log a debug message
///
/// # Arguments
/// msg: &str - The message to log
///
/// # Example
/// ```
/// use rustfs_logging::log_debug;
///
/// log_debug("This is a debug message");
/// ```
pub fn log_debug(msg: &str) {
debug!("{}", msg);
}

View File

@@ -0,0 +1,113 @@
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::{self, WithExportConfig};
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use tracing::Level;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
/// Telemetry is a wrapper around the OpenTelemetry SDK Tracer and Meter providers.
/// It initializes the global Tracer and Meter providers, and sets up the tracing subscriber.
/// The Tracer and Meter providers are shut down when the Telemetry instance is dropped.
/// This is a convenience struct to ensure that the global providers are properly initialized and shut down.
///
/// # Example
/// ```
/// use rustfs_logging::Telemetry;
///
/// let _telemetry = Telemetry::init();
/// ```
pub struct Telemetry {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
}
impl Telemetry {
pub fn init() -> Self {
// Define service resource information
let resource = Resource::builder()
.with_schema_url(
[
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
],
SCHEMA_URL,
)
.build();
let tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.build()
.unwrap();
// Configure Tracer Provider
let tracer_provider = SdkTracerProvider::builder()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource.clone())
.with_batch_exporter(tracer_exporter)
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build();
let meter_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
let meter_reader = PeriodicReader::builder(meter_exporter)
.with_interval(std::time::Duration::from_secs(30))
.build();
// For debugging in development
let meter_stdout_reader = PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
// Configure Meter Provider
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource)
.with_reader(meter_reader)
.with_reader(meter_stdout_reader)
.build();
// Set global Tracer and Meter providers
global::set_tracer_provider(tracer_provider.clone());
global::set_meter_provider(meter_provider.clone());
let tracer = tracer_provider.tracer("tracing-otel-subscriber-rustfs-service");
// Configure `tracing subscriber`
tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::from_level(Level::DEBUG))
.with(tracing_subscriber::fmt::layer().with_ansi(true))
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.init();
Self {
tracer_provider,
meter_provider,
}
}
}
impl Drop for Telemetry {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
}
}

View File

View File

@@ -3,8 +3,10 @@ mod auth;
mod config;
mod console;
mod grpc;
mod logging;
mod service;
mod storage;
use crate::auth::IAMAuth;
use clap::Parser;
use common::{
@@ -68,7 +70,7 @@ fn main() -> Result<()> {
//解析获得到的参数
let opt = config::Opt::parse();
//设置trace
//设置 trace
setup_tracing();
//运行参数
@@ -91,17 +93,17 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("server_address {}", &server_address);
//设置AKSK
//设置 AKSK
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone())).unwrap();
set_global_rustfs_port(server_port);
//监听地址,端口从参数中获取
//监听地址端口从参数中获取
let listener = TcpListener::bind(server_address.clone()).await?;
//获取监听地址
let local_addr: SocketAddr = listener.local_addr()?;
// 用于rpc
// 用于 rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone())
.map_err(|err| Error::from_string(err.to_string()))?;
@@ -123,13 +125,13 @@ async fn run(opt: config::Opt) -> Result<()> {
.map_err(|err| Error::from_string(err.to_string()))?;
// Setup S3 service
// 本项目使用s3s库来实现s3服务
// 本项目使用 s3s 库来实现 s3 服务
let service = {
let store = storage::ecfs::FS::new();
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?);
let mut b = S3ServiceBuilder::new(store.clone());
//显示info信息
//显示 info 信息
info!("authentication is enabled {}, {}", &opt.access_key, &opt.secret_key);
b.set_auth(IAMAuth::new(opt.access_key, opt.secret_key));