improve logger entry for Observability

This commit is contained in:
houseme
2025-03-18 16:36:23 +08:00
parent bcdd204fa0
commit c3ecfeae6c
19 changed files with 1208 additions and 821 deletions

View File

View File

@@ -0,0 +1,65 @@
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.120.0
environment:
- TZ=Asia/Shanghai
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
ports:
- 1888:1888
- 8888:8888
- 8889:8889
- 13133:13133
- 4317:4317
- 4318:4318
- 55679:55679
networks:
- otel-network
jaeger:
image: jaegertracing/jaeger:2.3.0
environment:
- TZ=Asia/Shanghai
ports:
- "16686:16686"
- "14317:4317"
- "14318:4318"
networks:
- otel-network
prometheus:
image: prom/prometheus:v3.2.0
environment:
- TZ=Asia/Shanghai
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
networks:
- otel-network
loki:
image: grafana/loki:3.4.2
environment:
- TZ=Asia/Shanghai
volumes:
- ./loki-config.yaml:/etc/loki/local-config.yaml
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
networks:
- otel-network
grafana:
image: grafana/grafana:11.5.2
ports:
- "3000:3000" # Web UI
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- TZ=Asia/Shanghai
networks:
- otel-network
networks:
otel-network:
driver: bridge
name: "network_otel_config"
driver_opts:
com.docker.network.enable_ipv6: "true"

View File

@@ -0,0 +1,88 @@
service:
extensions: [jaeger_storage, jaeger_query, remote_sampling, healthcheckv2]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]
telemetry:
resource:
service.name: jaeger
metrics:
level: detailed
readers:
- pull:
exporter:
prometheus:
host: 0.0.0.0
port: 8888
logs:
level: debug
# TODO Initialize telemetry tracer once OTEL released new feature.
# https://github.com/open-telemetry/opentelemetry-collector/issues/10663
extensions:
healthcheckv2:
use_v2: true
http:
# pprof:
# endpoint: 0.0.0.0:1777
# zpages:
# endpoint: 0.0.0.0:55679
jaeger_query:
storage:
traces: some_store
traces_archive: another_store
ui:
config_file: ./cmd/jaeger/config-ui.json
# The maximum duration that is considered for clock skew adjustments.
# Defaults to 0 seconds, which means it's disabled.
max_clock_skew_adjust: 0s
jaeger_storage:
backends:
some_store:
memory:
max_traces: 100000
another_store:
memory:
max_traces: 100000
remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
sampling_store: some_store
initial_sampling_probability: 0.1
http:
grpc:
receivers:
otlp:
protocols:
grpc:
http:
jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:
zipkin:
processors:
batch:
# Adaptive Sampling Processor is required to support adaptive sampling.
# It expects remote_sampling extension with `adaptive:` config to be enabled.
adaptive_sampling:
exporters:
jaeger_storage_exporter:
trace_storage: some_store

View File

@@ -0,0 +1,63 @@
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: debug
grpc_server_max_concurrent_streams: 1000
common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100
limits_config:
metric_aggregation_enabled: true
schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
pattern_ingester:
enabled: true
metric_aggregation:
loki_address: localhost:3100
ruler:
alertmanager_url: http://localhost:9093
frontend:
encoding: protobuf
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
#analytics:
# reporting_enabled: false

View File

@@ -0,0 +1,57 @@
receivers:
otlp:
protocols:
grpc: # OTLP gRPC 接收器
endpoint: 0.0.0.0:4317
http: # OTLP HTTP 接收器
endpoint: 0.0.0.0:4318
processors:
batch: # 批处理处理器,提升吞吐量
timeout: 5s
send_batch_size: 1000
memory_limiter:
check_interval: 1s
limit_mib: 512
exporters:
otlp/traces: # OTLP 导出器,用于跟踪数据
endpoint: "jaeger:4317" # Jaeger 的 OTLP gRPC 端点
tls:
insecure: true # 开发环境禁用 TLS生产环境需配置证书
prometheus: # Prometheus 导出器,用于指标数据
endpoint: "0.0.0.0:8889" # Prometheus 刮取端点
namespace: "otel" # 指标前缀
send_timestamps: true # 发送时间戳
# enable_open_metrics: true
loki: # Loki 导出器,用于日志数据
# endpoint: "http://loki:3100/otlp/v1/logs"
endpoint: "http://loki:3100/loki/api/v1/push"
tls:
insecure: true
extensions:
health_check:
pprof:
zpages:
service:
extensions: [health_check, pprof, zpages] # 启用扩展
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter,batch]
exporters: [otlp/traces]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
logs:
receivers: [otlp]
processors: [batch]
exporters: [loki]
telemetry:
logs:
level: "info" # Collector 日志级别
metrics:
address: "0.0.0.0:8888" # Collector 自身指标暴露

View File

@@ -0,0 +1,11 @@
global:
scrape_interval: 5s # 刮取间隔
scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8888'] # 从 Collector 刮取指标
- job_name: 'otel-metrics'
static_configs:
- targets: ['otel-collector:8889'] # 应用指标

31
config/obs.example.toml Normal file
View File

@@ -0,0 +1,31 @@
[observability]
endpoint = "" # Default is "http://localhost:4317" if not specified
use_stdout = false
sample_ratio = 0.5
meter_interval = 30
service_name = "rustfs_obs_service"
service_version = "0.1.0"
deployment_environment = "develop"
[sinks]
[sinks.kafka] # Kafka sink is disabled by default
enabled = false
bootstrap_servers = "localhost:9092"
topic = "logs"
batch_size = 100 # Default is 100 if not specified
batch_timeout_ms = 1000 # Default is 1000ms if not specified
[sinks.webhook]
enabled = false
url = "http://localhost:8080/webhook"
batch_size = 100 # Default is 3 if not specified
batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "logs/app.log"
batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified
[logger]
queue_capacity = 10000

View File

@@ -1,12 +1,13 @@
use opentelemetry::global;
use rustfs_obs::{get_logger, init_obs, load_config, log_info, LogEntry};
use rustfs_obs::{get_logger, init_obs, load_config, log_info, ServerLogEntry};
use std::time::{Duration, SystemTime};
use tracing::{info, instrument};
use tracing_core::Level;
#[tokio::main]
async fn main() {
let config = load_config(Some("packages/obs/examples/config".to_string()));
let obs_conf = Some("packages/obs/examples/config.toml".to_string());
let config = load_config(obs_conf);
let (_logger, _guard) = init_obs(config.clone()).await;
// Simulate the operation
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -33,24 +34,13 @@ async fn run(bucket: String, object: String, user: String, service_name: String)
&[opentelemetry::KeyValue::new("operation", "run")],
);
let result = get_logger()
.lock()
.await
.log(LogEntry::new(
Level::INFO,
"Process user requests".to_string(),
"api_handler".to_string(),
Some("demo-audit".to_string()),
Some("req-12345".to_string()),
Some(user),
vec![
("endpoint".to_string(), "/api/v1/data".to_string()),
("method".to_string(), "GET".to_string()),
("bucket".to_string(), bucket),
("object-length".to_string(), object.len().to_string()),
],
))
.await;
let server_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string())
.user_id(Some(user.clone()))
.add_field("operation".to_string(), "login".to_string())
.add_field("bucket".to_string(), bucket.clone())
.add_field("object".to_string(), object.clone());
let result = get_logger().lock().await.log_server_entry(server_entry).await;
info!("Logging is completed {:?}", result);
put_object("bucket".to_string(), "object".to_string(), "user".to_string()).await;
info!("Logging is completed");

View File

@@ -3,6 +3,11 @@ use serde::Deserialize;
use std::env;
/// OpenTelemetry Configuration
/// Add service name, service version, deployment environment
/// Add interval time for metric collection
/// Add sample ratio for trace sampling
/// Add endpoint for metric collection
/// Add use_stdout for output to stdout
#[derive(Debug, Deserialize, Clone, Default)]
pub struct OtelConfig {
pub endpoint: String,
@@ -58,6 +63,19 @@ pub struct LoggerConfig {
}
/// Overall application configuration
/// Add observability, sinks, and logger configuration
///
/// Observability: OpenTelemetry configuration
/// Sinks: Kafka, Webhook, File sink configuration
/// Logger: Logger configuration
///
/// # Example
/// ```
/// use rustfs_obs::AppConfig;
/// use rustfs_obs::load_config;
///
/// let config = load_config(None);
/// ```
#[derive(Debug, Deserialize, Clone, Default)]
pub struct AppConfig {
pub observability: OtelConfig,
@@ -65,19 +83,48 @@ pub struct AppConfig {
pub logger: LoggerConfig,
}
const DEFAULT_CONFIG_FILE: &str = "obs";
/// Loading the configuration file
/// Supports TOML, YAML and .env formats, read in order by priority
///
/// # Parameters
/// - `config_dir`: Configuration file path
///
/// # Returns
/// Configuration information
///
/// # Example
/// ```
/// use rustfs_obs::AppConfig;
/// use rustfs_obs::load_config;
///
/// let config = load_config(None);
/// ```
pub fn load_config(config_dir: Option<String>) -> AppConfig {
let config_dir = config_dir.unwrap_or_else(|| {
env::current_dir()
.map(|path| path.to_string_lossy().to_string())
.unwrap_or_else(|_| {
eprintln!("Warning: Failed to get current directory, using empty path");
String::new()
})
});
let config_dir = if let Some(path) = config_dir {
// Use the provided path
let path = std::path::Path::new(&path);
if path.extension().is_some() {
// If path has extension, use it as is (extension will be added by Config::builder)
path.with_extension("").to_string_lossy().into_owned()
} else {
// If path is a directory, append the default config file name
path.to_string_lossy().into_owned()
}
} else {
// If no path provided, use current directory + default config file
match env::current_dir() {
Ok(dir) => dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned(),
Err(_) => {
eprintln!("Warning: Failed to get current directory, using default config file");
DEFAULT_CONFIG_FILE.to_string()
}
}
};
println!("config_dir: {}", config_dir);
// Log using proper logging instead of println when possible
println!("Using config file base: {}", config_dir);
let config = Config::builder()
.add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml))

View File

@@ -0,0 +1,74 @@
use crate::entry::ObjectVersion;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Args - defines the arguments for API operations
/// Args is used to define the arguments for API operations.
///
/// # Example
/// ```
/// use rustfs_obs::Args;
/// use std::collections::HashMap;
///
/// let args = Args::new()
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_version_id(Some("123".to_string()))
/// .set_metadata(Some(HashMap::new()));
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct Args {
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Option::is_none")]
pub objects: Option<Vec<ObjectVersion>>,
#[serde(rename = "metadata", skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
}
impl Args {
/// Create a new Args object
pub fn new() -> Self {
Args {
bucket: None,
object: None,
version_id: None,
objects: None,
metadata: None,
}
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Option<Vec<ObjectVersion>>) -> Self {
self.objects = objects;
self
}
/// Set the metadata
pub fn set_metadata(mut self, metadata: Option<HashMap<String, String>>) -> Self {
self.metadata = metadata;
self
}
}

View File

@@ -1,11 +1,64 @@
use crate::ObjectVersion;
use crate::{BaseLogEntry, LogRecord, ObjectVersion};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// API details structure
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
/// ApiDetails is used to define the details of an API operation
///
/// The `ApiDetails` structure contains the following fields:
/// - `name` - the name of the API operation
/// - `bucket` - the bucket name
/// - `object` - the object name
/// - `objects` - the list of objects
/// - `status` - the status of the API operation
/// - `status_code` - the status code of the API operation
/// - `input_bytes` - the input bytes
/// - `output_bytes` - the output bytes
/// - `header_bytes` - the header bytes
/// - `time_to_first_byte` - the time to first byte
/// - `time_to_first_byte_in_ns` - the time to first byte in nanoseconds
/// - `time_to_response` - the time to response
/// - `time_to_response_in_ns` - the time to response in nanoseconds
///
/// The `ApiDetails` structure contains the following methods:
/// - `new` - create a new `ApiDetails` with default values
/// - `set_name` - set the name
/// - `set_bucket` - set the bucket
/// - `set_object` - set the object
/// - `set_objects` - set the objects
/// - `set_status` - set the status
/// - `set_status_code` - set the status code
/// - `set_input_bytes` - set the input bytes
/// - `set_output_bytes` - set the output bytes
/// - `set_header_bytes` - set the header bytes
/// - `set_time_to_first_byte` - set the time to first byte
/// - `set_time_to_first_byte_in_ns` - set the time to first byte in nanoseconds
/// - `set_time_to_response` - set the time to response
/// - `set_time_to_response_in_ns` - set the time to response in nanoseconds
///
/// # Example
/// ```
/// use rustfs_obs::ApiDetails;
/// use rustfs_obs::ObjectVersion;
///
/// let api = ApiDetails::new()
/// .set_name(Some("GET".to_string()))
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_objects(vec![ObjectVersion::new_with_object_name("my-object".to_string())])
/// .set_status(Some("OK".to_string()))
/// .set_status_code(Some(200))
/// .set_input_bytes(100)
/// .set_output_bytes(200)
/// .set_header_bytes(Some(50))
/// .set_time_to_first_byte(Some("100ms".to_string()))
/// .set_time_to_first_byte_in_ns(Some("100000000ns".to_string()))
/// .set_time_to_response(Some("200ms".to_string()))
/// .set_time_to_response_in_ns(Some("200000000ns".to_string()));
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
pub struct ApiDetails {
#[serde(rename = "name", skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
@@ -135,22 +188,85 @@ impl ApiDetails {
}
/// Entry - audit entry logs
/// AuditLogEntry is used to define the structure of an audit log entry
///
/// The `AuditLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `version` - the version of the audit log entry
/// - `deployment_id` - the deployment ID
/// - `event` - the event
/// - `entry_type` - the type of audit message
/// - `api` - the API details
/// - `remote_host` - the remote host
/// - `user_agent` - the user agent
/// - `req_path` - the request path
/// - `req_host` - the request host
/// - `req_claims` - the request claims
/// - `req_query` - the request query
/// - `req_header` - the request header
/// - `resp_header` - the response header
/// - `access_key` - the access key
/// - `parent_user` - the parent user
/// - `error` - the error
///
/// The `AuditLogEntry` structure contains the following methods:
/// - `new` - create a new `AuditEntry` with default values
/// - `new_with_values` - create a new `AuditEntry` with version, time, event and api details
/// - `with_base` - set the base log entry
/// - `set_version` - set the version
/// - `set_deployment_id` - set the deployment ID
/// - `set_event` - set the event
/// - `set_entry_type` - set the entry type
/// - `set_api` - set the API details
/// - `set_remote_host` - set the remote host
/// - `set_user_agent` - set the user agent
/// - `set_req_path` - set the request path
/// - `set_req_host` - set the request host
/// - `set_req_claims` - set the request claims
/// - `set_req_query` - set the request query
/// - `set_req_header` - set the request header
/// - `set_resp_header` - set the response header
/// - `set_access_key` - set the access key
/// - `set_parent_user` - set the parent user
/// - `set_error` - set the error
///
/// # Example
/// ```
/// use rustfs_obs::AuditLogEntry;
/// use rustfs_obs::ApiDetails;
/// use std::collections::HashMap;
///
/// let entry = AuditLogEntry::new()
/// .set_version("1.0".to_string())
/// .set_deployment_id(Some("123".to_string()))
/// .set_event("event".to_string())
/// .set_entry_type(Some("type".to_string()))
/// .set_api(ApiDetails::new())
/// .set_remote_host(Some("remote-host".to_string()))
/// .set_user_agent(Some("user-agent".to_string()))
/// .set_req_path(Some("req-path".to_string()))
/// .set_req_host(Some("req-host".to_string()))
/// .set_req_claims(Some(HashMap::new()))
/// .set_req_query(Some(HashMap::new()))
/// .set_req_header(Some(HashMap::new()))
/// .set_resp_header(Some(HashMap::new()))
/// .set_access_key(Some("access-key".to_string()))
/// .set_parent_user(Some("parent-user".to_string()))
/// .set_error(Some("error".to_string()));
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct AuditEntry {
pub struct AuditLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub version: String,
#[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")]
pub deployment_id: Option<String>,
pub time: DateTime<Utc>,
pub event: String,
// Class of audit message - S3, admin ops, bucket management
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub entry_type: Option<String>,
pub api: ApiDetails,
#[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")]
pub remote_host: Option<String>,
#[serde(rename = "requestID", skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
#[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(rename = "requestPath", skip_serializing_if = "Option::is_none")]
@@ -165,8 +281,6 @@ pub struct AuditEntry {
pub req_header: Option<HashMap<String, String>>,
#[serde(rename = "responseHeader", skip_serializing_if = "Option::is_none")]
pub resp_header: Option<HashMap<String, String>>,
#[serde(rename = "tags", skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Value>>,
#[serde(rename = "accessKey", skip_serializing_if = "Option::is_none")]
pub access_key: Option<String>,
#[serde(rename = "parentUser", skip_serializing_if = "Option::is_none")]
@@ -175,18 +289,17 @@ pub struct AuditEntry {
pub error: Option<String>,
}
impl AuditEntry {
impl AuditLogEntry {
/// Create a new `AuditEntry` with default values
pub fn new() -> Self {
AuditEntry {
AuditLogEntry {
base: BaseLogEntry::new(),
version: String::new(),
deployment_id: None,
time: Utc::now(),
event: String::new(),
entry_type: None,
api: ApiDetails::new(),
remote_host: None,
request_id: None,
user_agent: None,
req_path: None,
req_host: None,
@@ -194,47 +307,25 @@ impl AuditEntry {
req_query: None,
req_header: None,
resp_header: None,
tags: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Create a new `AuditEntry` with version and time event and api details
/// # Arguments
/// * `version` - Version of the audit entry
/// * `time` - Time of the audit entry
/// * `event` - Event of the audit entry
/// * `api` - API details of the audit entry
/// # Returns
/// * `AuditEntry` with the given values
/// # Example
/// ```
/// use chrono::Utc;
/// use rustfs_obs::{ApiDetails, AuditEntry};
/// let entry = AuditEntry::new_with_values(
/// "v1".to_string(),
/// Utc::now(),
/// "event".to_string(),
/// ApiDetails::new(),
/// );
/// ```
/// # Remarks
/// This is a convenience method to create an `AuditEntry` with the given values
/// without having to set each field individually
/// This is useful when you want to create an `AuditEntry` with the given values
/// without having to set each field individually
/// Create a new `AuditEntry` with version, time, event and api details
pub fn new_with_values(version: String, time: DateTime<Utc>, event: String, api: ApiDetails) -> Self {
AuditEntry {
let mut base = BaseLogEntry::new();
base.timestamp = time;
AuditLogEntry {
base,
version,
deployment_id: None,
time,
event,
entry_type: None,
api,
remote_host: None,
request_id: None,
user_agent: None,
req_path: None,
req_host: None,
@@ -242,13 +333,18 @@ impl AuditEntry {
req_query: None,
req_header: None,
resp_header: None,
tags: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the version
pub fn set_version(mut self, version: String) -> Self {
self.version = version;
@@ -261,12 +357,6 @@ impl AuditEntry {
self
}
/// Set the time
pub fn set_time(mut self, time: DateTime<Utc>) -> Self {
self.time = time;
self
}
/// Set the event
pub fn set_event(mut self, event: String) -> Self {
self.event = event;
@@ -291,12 +381,6 @@ impl AuditEntry {
self
}
/// Set the request ID
pub fn set_request_id(mut self, request_id: Option<String>) -> Self {
self.request_id = request_id;
self
}
/// Set the user agent
pub fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
self.user_agent = user_agent;
@@ -339,12 +423,6 @@ impl AuditEntry {
self
}
/// Set the tags
pub fn set_tags(mut self, tags: Option<HashMap<String, Value>>) -> Self {
self.tags = tags;
self
}
/// Set the access key
pub fn set_access_key(mut self, access_key: Option<String>) -> Self {
self.access_key = access_key;
@@ -364,79 +442,12 @@ impl AuditEntry {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_audit_entry() {
let entry = AuditEntry::new()
.set_version("v1".to_string())
.set_deployment_id(Some("12345".to_string()))
.set_time(Utc::now())
.set_event("event".to_string())
.set_entry_type(Some("type".to_string()))
.set_api(ApiDetails::new())
.set_remote_host(Some("localhost".to_string()))
.set_request_id(Some("req-12345".to_string()))
.set_user_agent(Some("user-agent".to_string()))
.set_req_path(Some("/path".to_string()))
.set_req_host(Some("localhost".to_string()))
.set_req_claims(Some(HashMap::new()))
.set_req_query(Some(HashMap::new()))
.set_req_header(Some(HashMap::new()))
.set_resp_header(Some(HashMap::new()))
.set_tags(Some(HashMap::new()))
.set_access_key(Some("access-key".to_string()))
.set_parent_user(Some("parent-user".to_string()))
.set_error(Some("error".to_string()));
assert_eq!(entry.version, "v1");
assert_eq!(entry.deployment_id, Some("12345".to_string()));
assert_eq!(entry.event, "event");
assert_eq!(entry.entry_type, Some("type".to_string()));
assert_eq!(entry.remote_host, Some("localhost".to_string()));
assert_eq!(entry.request_id, Some("req-12345".to_string()));
assert_eq!(entry.user_agent, Some("user-agent".to_string()));
assert_eq!(entry.req_path, Some("/path".to_string()));
assert_eq!(entry.req_host, Some("localhost".to_string()));
assert_eq!(entry.access_key, Some("access-key".to_string()));
assert_eq!(entry.parent_user, Some("parent-user".to_string()));
assert_eq!(entry.error, Some("error".to_string()));
impl LogRecord for AuditLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
#[test]
fn test_api_details() {
let api = ApiDetails::new()
.set_name(Some("name".to_string()))
.set_bucket(Some("bucket".to_string()))
.set_object(Some("object".to_string()))
.set_objects(vec![ObjectVersion {
object_name: "object".to_string(),
version_id: Some("12345".to_string()),
}])
.set_status(Some("status".to_string()))
.set_status_code(Some(200))
.set_input_bytes(100)
.set_output_bytes(200)
.set_header_bytes(Some(300))
.set_time_to_first_byte(Some("100ms".to_string()))
.set_time_to_first_byte_in_ns(Some("100ns".to_string()))
.set_time_to_response(Some("200ms".to_string()))
.set_time_to_response_in_ns(Some("200ns".to_string()));
assert_eq!(api.name, Some("name".to_string()));
assert_eq!(api.bucket, Some("bucket".to_string()));
assert_eq!(api.object, Some("object".to_string()));
assert_eq!(api.objects.len(), 1);
assert_eq!(api.status, Some("status".to_string()));
assert_eq!(api.status_code, Some(200));
assert_eq!(api.input_bytes, 100);
assert_eq!(api.output_bytes, 200);
assert_eq!(api.header_bytes, Some(300));
assert_eq!(api.time_to_first_byte, Some("100ms".to_string()));
assert_eq!(api.time_to_first_byte_in_ns, Some("100ns".to_string()));
assert_eq!(api.time_to_response, Some("200ms".to_string()));
assert_eq!(api.time_to_response_in_ns, Some("200ns".to_string()));
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}

View File

@@ -3,490 +3,90 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// ObjectVersion object version key/versionId
/// Base log entry structure shared by all log types
/// This structure is used to serialize log entries to JSON
/// and send them to the log sinks
/// This structure is also used to deserialize log entries from JSON
/// This structure is also used to store log entries in the database
/// This structure is also used to query log entries from the database
///
/// The `BaseLogEntry` structure contains the following fields:
/// - `timestamp` - the timestamp of the log entry
/// - `request_id` - the request ID of the log entry
/// - `message` - the message of the log entry
/// - `tags` - the tags of the log entry
///
/// The `BaseLogEntry` structure contains the following methods:
/// - `new` - create a new `BaseLogEntry` with default values
/// - `message` - set the message
/// - `request_id` - set the request ID
/// - `tags` - set the tags
/// - `timestamp` - set the timestamp
///
/// # Example
/// ```
/// use rustfs_obs::BaseLogEntry;
/// use chrono::{DateTime, Utc};
/// use std::collections::HashMap;
///
/// let timestamp = Utc::now();
/// let request = Some("req-123".to_string());
/// let message = Some("This is a log message".to_string());
/// let tags = Some(HashMap::new());
///
/// let entry = BaseLogEntry::new()
/// .timestamp(timestamp)
/// .request_id(request)
/// .message(message)
/// .tags(tags);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct ObjectVersion {
#[serde(rename = "objectName")]
pub object_name: String,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
}
impl ObjectVersion {
/// Create a new ObjectVersion object
pub fn new() -> Self {
ObjectVersion {
object_name: String::new(),
version_id: None,
}
}
/// Create a new ObjectVersion object with object name
pub fn new_with_object_name(object_name: String) -> Self {
ObjectVersion {
object_name,
version_id: None,
}
}
/// Set the object name
pub fn set_object_name(mut self, object_name: String) -> Self {
self.object_name = object_name;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
}
/// Args - defines the arguments for the API
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Args {
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Option::is_none")]
pub objects: Option<Vec<ObjectVersion>>,
#[serde(rename = "metadata", skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
}
impl Args {
/// Create a new Args object
pub fn new() -> Self {
Args {
bucket: None,
object: None,
version_id: None,
objects: None,
metadata: None,
}
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Option<Vec<ObjectVersion>>) -> Self {
self.objects = objects;
self
}
/// Set the metadata
pub fn set_metadata(mut self, metadata: Option<HashMap<String, String>>) -> Self {
self.metadata = metadata;
self
}
}
/// Trace - defines the trace
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Trace {
#[serde(rename = "message", skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(rename = "source", skip_serializing_if = "Option::is_none")]
pub source: Option<Vec<String>>,
#[serde(rename = "variables", skip_serializing_if = "Option::is_none")]
pub variables: Option<HashMap<String, Value>>,
}
impl Trace {
/// Create a new Trace object
pub fn new() -> Self {
Trace {
message: None,
source: None,
variables: None,
}
}
/// Set the message
pub fn set_message(mut self, message: Option<String>) -> Self {
self.message = message;
self
}
/// Set the source
pub fn set_source(mut self, source: Option<Vec<String>>) -> Self {
self.source = source;
self
}
/// Set the variables
pub fn set_variables(mut self, variables: Option<HashMap<String, Value>>) -> Self {
self.variables = variables;
self
}
}
/// API - defines the api type and its args
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct API {
#[serde(rename = "name", skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(rename = "args", skip_serializing_if = "Option::is_none")]
pub args: Option<Args>,
}
impl API {
/// Create a new API object
pub fn new() -> Self {
API { name: None, args: None }
}
/// Set the name
pub fn set_name(mut self, name: Option<String>) -> Self {
self.name = name;
self
}
/// Set the args
pub fn set_args(mut self, args: Option<Args>) -> Self {
self.args = args;
self
}
}
/// Log kind/level enum
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum LogKind {
#[serde(rename = "INFO")]
Info,
#[serde(rename = "WARNING")]
Warning,
#[serde(rename = "ERROR")]
Error,
#[serde(rename = "FATAL")]
Fatal,
}
impl Default for LogKind {
fn default() -> Self {
LogKind::Info
}
}
/// Entry - defines fields and values of each log entry
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Entry {
#[serde(rename = "site", skip_serializing_if = "Option::is_none")]
pub site: Option<String>,
#[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")]
pub deployment_id: Option<String>,
pub level: LogKind,
#[serde(rename = "errKind", skip_serializing_if = "Option::is_none")]
pub log_kind: Option<LogKind>, // Deprecated Jan 2024
pub time: DateTime<Utc>,
#[serde(rename = "api", skip_serializing_if = "Option::is_none")]
pub api: Option<API>,
#[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")]
pub remote_host: Option<String>,
#[serde(rename = "host", skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
pub struct BaseLogEntry {
#[serde(rename = "time")]
pub timestamp: DateTime<Utc>,
#[serde(rename = "requestID", skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
#[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(rename = "message", skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(rename = "error", skip_serializing_if = "Option::is_none")]
pub trace: Option<Trace>,
#[serde(rename = "tags", skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Value>>,
}
impl Entry {
/// Create a new Entry object with default values
impl BaseLogEntry {
/// Create a new BaseLogEntry with default values
pub fn new() -> Self {
Entry {
site: None,
deployment_id: None,
level: LogKind::Info,
log_kind: None,
time: Utc::now(),
api: None,
remote_host: None,
host: None,
BaseLogEntry {
timestamp: Utc::now(),
request_id: None,
user_agent: None,
message: None,
trace: None,
tags: None,
}
}
/// Set the site
pub fn set_site(mut self, site: Option<String>) -> Self {
self.site = site;
self
}
/// Set the deployment ID
pub fn set_deployment_id(mut self, deployment_id: Option<String>) -> Self {
self.deployment_id = deployment_id;
self
}
/// Set the level
pub fn set_level(mut self, level: LogKind) -> Self {
self.level = level;
self
}
/// Set the log kind
pub fn set_log_kind(mut self, log_kind: Option<LogKind>) -> Self {
self.log_kind = log_kind;
self
}
/// Set the time
pub fn set_time(mut self, time: DateTime<Utc>) -> Self {
self.time = time;
self
}
/// Set the API
pub fn set_api(mut self, api: Option<API>) -> Self {
self.api = api;
self
}
/// Set the remote host
pub fn set_remote_host(mut self, remote_host: Option<String>) -> Self {
self.remote_host = remote_host;
self
}
/// Set the host
pub fn set_host(mut self, host: Option<String>) -> Self {
self.host = host;
self
}
/// Set the request ID
pub fn set_request_id(mut self, request_id: Option<String>) -> Self {
self.request_id = request_id;
self
}
/// Set the user agent
pub fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
self.user_agent = user_agent;
self
}
/// Set the message
pub fn set_message(mut self, message: Option<String>) -> Self {
pub fn message(mut self, message: Option<String>) -> Self {
self.message = message;
self
}
/// Set the trace
pub fn set_trace(mut self, trace: Option<Trace>) -> Self {
self.trace = trace;
/// Set the request ID
pub fn request_id(mut self, request_id: Option<String>) -> Self {
self.request_id = request_id;
self
}
/// Set the tags
pub fn tags(mut self, tags: Option<HashMap<String, Value>>) -> Self {
self.tags = tags;
self
}
/// Set the timestamp
pub fn timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.timestamp = timestamp;
self
}
}
/// Info holds console log messages
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Info {
#[serde(flatten)]
pub entry: Entry,
pub console_msg: String,
#[serde(rename = "node")]
pub node_name: String,
#[serde(skip)]
pub err: Option<String>,
}
impl Info {
/// Create a new Info object with default values
pub fn new() -> Self {
Info {
entry: Entry::new(),
console_msg: String::new(),
node_name: String::new(),
err: None,
}
}
/// Create a new Info object with console message and node name
pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self {
Info {
entry: Entry::new(),
console_msg,
node_name,
err: None,
}
}
/// Set the node name
pub fn set_node_name(&mut self, node_name: String) {
self.node_name = node_name;
}
/// Set the entry
pub fn set_entry(&mut self, entry: Entry) {
self.entry = entry;
}
/// Set the console message
pub fn set_console_msg(&mut self, console_msg: String) {
self.console_msg = console_msg;
}
/// Set the error message
pub fn set_err(&mut self, err: Option<String>) {
self.err = err;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_object_version() {
let mut object_version = ObjectVersion::new();
object_version = object_version.clone().set_object_name("object".to_string());
object_version = object_version.clone().set_version_id(Some("version".to_string()));
assert_eq!(object_version.object_name, "object".to_string());
assert_eq!(object_version.version_id, Some("version".to_string()));
}
#[test]
fn test_object_version_with_object_name() {
let object_version = ObjectVersion::new_with_object_name("object".to_string());
assert_eq!(object_version.object_name, "object".to_string());
assert_eq!(object_version.version_id, None);
}
#[test]
fn test_args() {
let mut obj = ObjectVersion::new();
obj.object_name = "object".to_string();
obj.version_id = Some("version".to_string());
let objs = vec![obj];
let args = Args::new()
.set_bucket(Some("bucket".to_string()))
.set_object(Some("object".to_string()))
.set_version_id(Some("version".to_string()))
.set_objects(Some(objs.clone()))
.set_metadata(Some(HashMap::new()));
assert_eq!(args.bucket, Some("bucket".to_string()));
assert_eq!(args.object, Some("object".to_string()));
assert_eq!(args.version_id, Some("version".to_string()));
assert_eq!(args.objects, Some(objs));
assert_eq!(args.metadata, Some(HashMap::new()));
}
#[test]
fn test_trace() {
let trace = Trace::new()
.set_message(Some("message".to_string()))
.set_source(Some(vec!["source".to_string()]))
.set_variables(Some(HashMap::new()));
assert_eq!(trace.message, Some("message".to_string()));
assert_eq!(trace.source, Some(vec!["source".to_string()]));
assert_eq!(trace.variables, Some(HashMap::new()));
}
#[test]
fn test_api() {
let api = API::new().set_name(Some("name".to_string())).set_args(Some(Args::new()));
assert_eq!(api.name, Some("name".to_string()));
assert_eq!(api.args, Some(Args::new()));
}
#[test]
fn test_log_kind() {
assert_eq!(LogKind::default(), LogKind::Info);
}
#[test]
fn test_entry() {
let entry = Entry::new()
.set_site(Some("site".to_string()))
.set_deployment_id(Some("deployment_id".to_string()))
.set_level(LogKind::Info)
.set_log_kind(Some(LogKind::Info))
.set_time(Utc::now())
.set_api(Some(API::new()))
.set_remote_host(Some("remote_host".to_string()))
.set_host(Some("host".to_string()))
.set_request_id(Some("request_id".to_string()))
.set_user_agent(Some("user_agent".to_string()))
.set_message(Some("message".to_string()))
.set_trace(Some(Trace::new()));
assert_eq!(entry.site, Some("site".to_string()));
assert_eq!(entry.deployment_id, Some("deployment_id".to_string()));
assert_eq!(entry.level, LogKind::Info);
assert_eq!(entry.log_kind, Some(LogKind::Info));
assert_eq!(entry.api, Some(API::new()));
assert_eq!(entry.remote_host, Some("remote_host".to_string()));
assert_eq!(entry.host, Some("host".to_string()));
assert_eq!(entry.request_id, Some("request_id".to_string()));
assert_eq!(entry.user_agent, Some("user_agent".to_string()));
assert_eq!(entry.message, Some("message".to_string()));
assert_eq!(entry.trace, Some(Trace::new()));
}
#[test]
fn test_info() {
let mut info = Info::new();
info.set_node_name("node_name".to_string());
info.set_entry(Entry::new());
info.set_console_msg("console_msg".to_string());
info.set_err(Some("err".to_string()));
assert_eq!(info.node_name, "node_name".to_string());
// assert_eq!(info.entry, Entry::new());
assert_eq!(info.console_msg, "console_msg".to_string());
assert_eq!(info.err, Some("err".to_string()));
}
#[test]
fn test_info_with_console_msg() {
let info = Info::new_with_console_msg("console_msg".to_string(), "node_name".to_string());
assert_eq!(info.node_name, "node_name".to_string());
assert_eq!(info.console_msg, "console_msg".to_string());
assert_eq!(info.err, None);
}
}

View File

@@ -1,83 +0,0 @@
use chrono::{DateTime, Utc};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing_core::Level;
/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize`
#[derive(Debug, Clone)]
pub struct SerializableLevel(pub Level);
impl From<Level> for SerializableLevel {
fn from(level: Level) -> Self {
SerializableLevel(level)
}
}
impl From<SerializableLevel> for Level {
fn from(serializable_level: SerializableLevel) -> Self {
serializable_level.0
}
}
impl Serialize for SerializableLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.as_str())
}
}
impl<'de> Deserialize<'de> for SerializableLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"TRACE" => Ok(SerializableLevel(Level::TRACE)),
"DEBUG" => Ok(SerializableLevel(Level::DEBUG)),
"INFO" => Ok(SerializableLevel(Level::INFO)),
"WARN" => Ok(SerializableLevel(Level::WARN)),
"ERROR" => Ok(SerializableLevel(Level::ERROR)),
_ => Err(D::Error::custom("unknown log level")),
}
}
}
/// Server log entry structure
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>, // Log timestamp
pub level: SerializableLevel, // Log Level
pub message: String, // Log messages
pub source: String, // Log source (such as module name)
pub target: Option<String>, // Log target
pub request_id: Option<String>, // Request ID (Common Server Fields)
pub user_id: Option<String>, // User ID (Common Server Fields)
pub fields: Vec<(String, String)>, // Attached fields (key value pairs)
}
impl LogEntry {
/// Create a new LogEntry
pub fn new(
level: Level,
message: String,
source: String,
target: Option<String>,
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
) -> Self {
LogEntry {
timestamp: Utc::now(),
level: SerializableLevel::from(level),
message,
source,
target,
request_id,
user_id,
fields,
}
}
}

View File

@@ -1,3 +1,143 @@
pub(crate) mod args;
pub(crate) mod audit;
pub(crate) mod base;
pub(crate) mod log;
pub(crate) mod unified;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing_core::Level;
/// ObjectVersion is used across multiple modules
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectVersion {
#[serde(rename = "name")]
pub object_name: String,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
}
impl ObjectVersion {
/// Create a new ObjectVersion object
pub fn new() -> Self {
ObjectVersion {
object_name: String::new(),
version_id: None,
}
}
/// Create a new ObjectVersion with object name
pub fn new_with_object_name(object_name: String) -> Self {
ObjectVersion {
object_name,
version_id: None,
}
}
/// Set the object name
pub fn set_object_name(mut self, object_name: String) -> Self {
self.object_name = object_name;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
}
/// Log kind/level enum
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum LogKind {
#[serde(rename = "INFO")]
Info,
#[serde(rename = "WARNING")]
Warning,
#[serde(rename = "ERROR")]
Error,
#[serde(rename = "FATAL")]
Fatal,
}
impl Default for LogKind {
fn default() -> Self {
LogKind::Info
}
}
/// Trait for types that can be serialized to JSON and have a timestamp
/// This trait is used by `ServerLogEntry` to convert the log entry to JSON
/// and get the timestamp of the log entry
/// This trait is implemented by `ServerLogEntry`
///
/// # Example
/// ```
/// use rustfs_obs::LogRecord;
/// use chrono::{DateTime, Utc};
/// use rustfs_obs::ServerLogEntry;
/// use tracing_core::Level;
///
/// let log_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string());
/// let json = log_entry.to_json();
/// let timestamp = log_entry.get_timestamp();
/// ```
pub trait LogRecord {
fn to_json(&self) -> String;
fn get_timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize`
/// for `ServerLogEntry`
/// This is necessary because `tracing_core::Level` does not implement `Serialize`
/// and `Deserialize`
/// This is a workaround to allow `ServerLogEntry` to be serialized and deserialized
/// using `serde`
///
/// # Example
/// ```
/// use rustfs_obs::SerializableLevel;
/// use tracing_core::Level;
///
/// let level = Level::INFO;
/// let serializable_level = SerializableLevel::from(level);
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SerializableLevel(pub Level);
impl From<Level> for SerializableLevel {
fn from(level: Level) -> Self {
SerializableLevel(level)
}
}
impl From<SerializableLevel> for Level {
fn from(serializable_level: SerializableLevel) -> Self {
serializable_level.0
}
}
impl Serialize for SerializableLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.as_str())
}
}
impl<'de> Deserialize<'de> for SerializableLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"TRACE" => Ok(SerializableLevel(Level::TRACE)),
"DEBUG" => Ok(SerializableLevel(Level::DEBUG)),
"INFO" => Ok(SerializableLevel(Level::INFO)),
"WARN" => Ok(SerializableLevel(Level::WARN)),
"ERROR" => Ok(SerializableLevel(Level::ERROR)),
_ => Err(D::Error::custom("unknown log level")),
}
}
}

View File

@@ -0,0 +1,279 @@
use crate::{AuditLogEntry, BaseLogEntry, LogKind, LogRecord, SerializableLevel};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing_core::Level;
/// Server log entry with structured fields
/// ServerLogEntry is used to log structured log entries from the server
///
/// The `ServerLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `source` - the source of the log entry
/// - `user_id` - the user ID
/// - `fields` - the structured fields of the log entry
///
/// The `ServerLogEntry` structure contains the following methods:
/// - `new` - create a new `ServerLogEntry` with specified level and source
/// - `with_base` - set the base log entry
/// - `user_id` - set the user ID
/// - `fields` - set the fields
/// - `add_field` - add a field
///
/// # Example
/// ```
/// use rustfs_obs::ServerLogEntry;
/// use tracing_core::Level;
///
/// let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string())
/// .user_id(Some("user-456".to_string()))
/// .add_field("operation".to_string(), "login".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServerLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: SerializableLevel,
pub source: String,
#[serde(rename = "userId", skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub fields: Vec<(String, String)>,
}
impl ServerLogEntry {
/// Create a new ServerLogEntry with specified level and source
pub fn new(level: Level, source: String) -> Self {
ServerLogEntry {
base: BaseLogEntry::new(),
level: SerializableLevel(level),
source,
user_id: None,
fields: Vec::new(),
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the user ID
pub fn user_id(mut self, user_id: Option<String>) -> Self {
self.user_id = user_id;
self
}
/// Set fields
pub fn fields(mut self, fields: Vec<(String, String)>) -> Self {
self.fields = fields;
self
}
/// Add a field
pub fn add_field(mut self, key: String, value: String) -> Self {
self.fields.push((key, value));
self
}
}
impl LogRecord for ServerLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Console log entry structure
/// ConsoleLogEntry is used to log console log entries
/// The `ConsoleLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `console_msg` - the console message
/// - `node_name` - the node name
/// - `err` - the error message
/// The `ConsoleLogEntry` structure contains the following methods:
/// - `new` - create a new `ConsoleLogEntry`
/// - `new_with_console_msg` - create a new `ConsoleLogEntry` with console message and node name
/// - `with_base` - set the base log entry
/// - `set_level` - set the log level
/// - `set_node_name` - set the node name
/// - `set_console_msg` - set the console message
/// - `set_err` - set the error message
/// # Example
/// ```
/// use rustfs_obs::ConsoleLogEntry;
///
/// let entry = ConsoleLogEntry::new_with_console_msg("Test message".to_string(), "node-123".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: LogKind,
pub console_msg: String,
pub node_name: String,
#[serde(skip)]
pub err: Option<String>,
}
impl ConsoleLogEntry {
/// Create a new ConsoleLogEntry
pub fn new() -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg: String::new(),
node_name: String::new(),
err: None,
}
}
/// Create a new ConsoleLogEntry with console message and node name
pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg,
node_name,
err: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the log level
pub fn set_level(mut self, level: LogKind) -> Self {
self.level = level;
self
}
/// Set the node name
pub fn set_node_name(mut self, node_name: String) -> Self {
self.node_name = node_name;
self
}
/// Set the console message
pub fn set_console_msg(mut self, console_msg: String) -> Self {
self.console_msg = console_msg;
self
}
/// Set the error message
pub fn set_err(mut self, err: Option<String>) -> Self {
self.err = err;
self
}
}
impl LogRecord for ConsoleLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Unified log entry type
/// UnifiedLogEntry is used to log different types of log entries
///
/// The `UnifiedLogEntry` enum contains the following variants:
/// - `Server` - a server log entry
/// - `Audit` - an audit log entry
/// - `Console` - a console log entry
///
/// The `UnifiedLogEntry` enum contains the following methods:
/// - `to_json` - convert the log entry to JSON
/// - `get_timestamp` - get the timestamp of the log entry
///
/// # Example
/// ```
/// use rustfs_obs::{UnifiedLogEntry, ServerLogEntry};
/// use tracing_core::Level;
///
/// let server_entry = ServerLogEntry::new(Level::INFO, "test_module".to_string());
/// let unified = UnifiedLogEntry::Server(server_entry);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UnifiedLogEntry {
#[serde(rename = "server")]
Server(ServerLogEntry),
#[serde(rename = "audit")]
Audit(AuditLogEntry),
#[serde(rename = "console")]
Console(ConsoleLogEntry),
}
impl LogRecord for UnifiedLogEntry {
fn to_json(&self) -> String {
match self {
UnifiedLogEntry::Server(entry) => entry.to_json(),
UnifiedLogEntry::Audit(entry) => entry.to_json(),
UnifiedLogEntry::Console(entry) => entry.to_json(),
}
}
fn get_timestamp(&self) -> DateTime<Utc> {
match self {
UnifiedLogEntry::Server(entry) => entry.get_timestamp(),
UnifiedLogEntry::Audit(entry) => entry.get_timestamp(),
UnifiedLogEntry::Console(entry) => entry.get_timestamp(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_base_log_entry() {
let base = BaseLogEntry::new()
.request_id(Some("req-123".to_string()))
.message(Some("Test message".to_string()));
assert_eq!(base.request_id, Some("req-123".to_string()));
assert_eq!(base.message, Some("Test message".to_string()));
}
#[test]
fn test_server_log_entry() {
let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string())
.user_id(Some("user-456".to_string()))
.add_field("operation".to_string(), "login".to_string());
assert_eq!(entry.level.0, Level::INFO);
assert_eq!(entry.source, "test_module");
assert_eq!(entry.user_id, Some("user-456".to_string()));
assert_eq!(entry.fields.len(), 1);
assert_eq!(entry.fields[0], ("operation".to_string(), "login".to_string()));
}
#[test]
fn test_unified_log_entry_json() {
let server_entry = ServerLogEntry::new(Level::INFO, "test_source".to_string());
let unified = UnifiedLogEntry::Server(server_entry);
let json = unified.to_json();
assert!(json.contains("test_source"));
}
}

View File

@@ -3,6 +3,30 @@
/// `obs` is a logging and observability library for Rust.
/// It provides a simple and easy-to-use interface for logging and observability.
/// It is built on top of the `log` crate and `opentelemetry` crate.
///
/// ## Features
/// - Structured logging
/// - Distributed tracing
/// - Metrics collection
/// - Log processing worker
/// - Multiple sinks
/// - Configuration-based setup
/// - Telemetry guard
/// - Global logger
/// - Log levels
/// - Log entry types
/// - Log record
/// - Object version
/// - Local IP address
///
/// ## Usage
///
/// ```rust
/// use rustfs_obs::{AppConfig, init_obs};
///
/// let config = AppConfig::default();
/// let (logger, guard) = init_obs(config);
/// ```
mod config;
mod entry;
mod logger;
@@ -13,9 +37,11 @@ mod worker;
pub use config::load_config;
pub use config::{AppConfig, OtelConfig};
pub use entry::audit::{ApiDetails, AuditEntry};
pub use entry::base::{Args, Entry, Info, LogKind, ObjectVersion, Trace, API};
pub use entry::log::{LogEntry, SerializableLevel};
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};
pub use logger::start_logger;
pub use logger::{
ensure_logger_initialized, get_global_logger, init_global_logger, locked_logger, log_debug, log_error, log_info, log_trace,

View File

@@ -1,4 +1,4 @@
use crate::{AppConfig, LogEntry, SerializableLevel, Sink};
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, ServerLogEntry, Sink, UnifiedLogEntry};
use std::sync::Arc;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{Mutex, OnceCell};
@@ -10,14 +10,14 @@ static GLOBAL_LOGGER: OnceCell<Arc<Mutex<Logger>>> = OnceCell::const_new();
/// Server log processor
#[derive(Debug)]
pub struct Logger {
sender: Sender<LogEntry>, // Log sending channel
sender: Sender<UnifiedLogEntry>, // Log sending channel
queue_capacity: usize,
}
impl Logger {
/// Create a new Logger instance
/// Returns Logger and corresponding Receiver
pub fn new(config: &AppConfig) -> (Self, Receiver<LogEntry>) {
pub fn new(config: &AppConfig) -> (Self, Receiver<UnifiedLogEntry>) {
// Get queue capacity from configuration, or use default values 10000
let queue_capacity = config.logger.queue_capacity.unwrap_or(10000);
let (sender, receiver) = mpsc::channel(queue_capacity);
@@ -39,83 +39,80 @@ impl Logger {
self.queue_capacity
}
/// Asynchronous logging of server logs
/// Attach the log to the current Span and generate a separate Tracing Event
#[tracing::instrument(skip(self), fields(log_source = "logger"))]
pub async fn log(&self, entry: LogEntry) -> Result<(), LogError> {
// Log messages to the current Span
tracing::Span::current()
.record("log_message", &entry.message)
.record("source", &entry.source);
println!("target start is {:?}", &entry.target);
let target = if let Some(target) = &entry.target {
target.clone()
} else {
"server_logs".to_string()
};
/// Log a server entry
#[tracing::instrument(skip(self), fields(log_source = "logger_server"))]
pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), LogError> {
self.log_entry(UnifiedLogEntry::Server(entry)).await
}
println!("target end is {:?}", target);
// Generate independent Tracing Events with full LogEntry information
// Generate corresponding events according to level
match entry.level {
SerializableLevel(Level::ERROR) => {
tracing::error!(
target = %target.clone(),
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
/// Log an audit entry
#[tracing::instrument(skip(self), fields(log_source = "logger_audit"))]
pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), LogError> {
self.log_entry(UnifiedLogEntry::Audit(entry)).await
}
/// Log a console entry
#[tracing::instrument(skip(self), fields(log_source = "logger_console"))]
pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), LogError> {
self.log_entry(UnifiedLogEntry::Console(entry)).await
}
/// Asynchronous logging of unified log entries
#[tracing::instrument(skip(self), fields(log_source = "logger"))]
pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), LogError> {
// Extract information for tracing based on entry type
match &entry {
UnifiedLogEntry::Server(server) => {
tracing::Span::current()
.record("log_level", &server.level.0.as_str())
.record("log_message", &server.base.message.as_deref().unwrap_or(""))
.record("source", &server.source);
// Generate tracing event based on log level
match server.level.0 {
Level::ERROR => {
tracing::error!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
}
Level::WARN => {
tracing::warn!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
}
Level::INFO => {
tracing::info!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
}
Level::DEBUG => {
tracing::debug!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
}
Level::TRACE => {
tracing::trace!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
}
}
}
SerializableLevel(Level::WARN) => {
tracing::warn!(
target = %target.clone(),
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
SerializableLevel(Level::INFO) => {
UnifiedLogEntry::Audit(audit) => {
tracing::info!(
target = %target.clone(),
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
target: "audit_logs",
event = %audit.event,
api = %audit.api.name.as_deref().unwrap_or("unknown"),
message = %audit.base.message.as_deref().unwrap_or("")
);
}
SerializableLevel(Level::DEBUG) => {
tracing::debug!(
target = %target.clone(),
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
);
}
SerializableLevel(Level::TRACE) => {
tracing::trace!(
target = %target.clone(),
timestamp = %entry.timestamp,
message = %entry.message,
source = %entry.source,
request_id = ?entry.request_id,
user_id = ?entry.user_id,
fields = ?entry.fields
UnifiedLogEntry::Console(console) => {
let level_str = match console.level {
crate::LogKind::Info => "INFO",
crate::LogKind::Warning => "WARN",
crate::LogKind::Error => "ERROR",
crate::LogKind::Fatal => "FATAL",
};
tracing::info!(
target: "console_logs",
level = %level_str,
node = %console.node_name,
message = %console.console_msg
);
}
}
// Send logs to asynchronous queues to improve error handling
// Send logs to async queue with improved error handling
match self.sender.try_send(entry) {
Ok(_) => Ok(()),
Err(mpsc::error::TrySendError::Full(entry)) => {
@@ -150,28 +147,25 @@ impl Logger {
/// use rustfs_obs::Logger;
///
/// async fn example(logger: &Logger) {
/// let _ = logger.write_with_context("This is an information message", "example",Level::INFO, Some("target".to_string()),Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await;
/// let _ = logger.write_with_context("This is an information message", "example",Level::INFO, Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await;
/// }
pub async fn write_with_context(
&self,
message: &str,
source: &str,
level: Level,
target: Option<String>,
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
) -> Result<(), LogError> {
self.log(LogEntry::new(
level,
message.to_string(),
source.to_string(),
target,
request_id,
user_id,
fields,
))
.await
let base = BaseLogEntry::new().message(Some(message.to_string())).request_id(request_id);
let server_entry = ServerLogEntry::new(level, source.to_string())
.user_id(user_id)
.fields(fields)
.with_base(base);
self.log_server_entry(server_entry).await
}
/// Write log
@@ -194,16 +188,7 @@ impl Logger {
/// }
/// ```
pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), LogError> {
self.log(LogEntry::new(
level,
message.to_string(),
source.to_string(),
None,
None,
None,
Vec::new(),
))
.await
self.write_with_context(message, source, level, None, None, Vec::new()).await
}
/// Shutdown the logger
@@ -458,7 +443,6 @@ pub async fn log_trace(message: &str, source: &str) -> Result<(), LogError> {
/// - `message`: Message to be logged
/// - `source`: Source of the log
/// - `level`: Log level
/// - `target`: Log target
/// - `request_id`: Request ID
/// - `user_id`: User ID
/// - `fields`: Additional fields
@@ -470,14 +454,13 @@ pub async fn log_trace(message: &str, source: &str) -> Result<(), LogError> {
/// use rustfs_obs::log_with_context;
///
/// async fn example() {
/// let _ = log_with_context("This is an information message", "example", Level::INFO, Some("target".to_string()), Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await;
/// let _ = log_with_context("This is an information message", "example", Level::INFO, Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await;
/// }
/// ```
pub async fn log_with_context(
message: &str,
source: &str,
level: Level,
target: Option<String>,
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
@@ -485,6 +468,6 @@ pub async fn log_with_context(
get_global_logger()
.lock()
.await
.write_with_context(message, source, level, target, request_id, user_id, fields)
.write_with_context(message, source, level, request_id, user_id, fields)
.await
}

View File

@@ -1,4 +1,4 @@
use crate::{AppConfig, LogEntry};
use crate::{AppConfig, LogRecord, UnifiedLogEntry};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::fs::OpenOptions;
@@ -8,7 +8,7 @@ use tokio::io::AsyncWriteExt;
/// Sink Trait definition, asynchronously write logs
#[async_trait]
pub trait Sink: Send + Sync {
async fn write(&self, entry: &LogEntry);
async fn write(&self, entry: &UnifiedLogEntry);
}
#[cfg(feature = "kafka")]
@@ -18,7 +18,7 @@ pub struct KafkaSink {
topic: String,
batch_size: usize,
batch_timeout_ms: u64,
entries: Arc<tokio::sync::Mutex<Vec<LogEntry>>>,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
}
@@ -64,7 +64,7 @@ impl KafkaSink {
async fn periodic_flush(
producer: rdkafka::producer::FutureProducer,
topic: String,
entries: Arc<tokio::sync::Mutex<Vec<LogEntry>>>,
entries: Arc<tokio::sync::Mutex<Vec<UnifiedLogEntry>>>,
last_flush: Arc<std::sync::atomic::AtomicU64>,
timeout_ms: u64,
) {
@@ -88,7 +88,7 @@ impl KafkaSink {
}
}
async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec<LogEntry>) {
async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec<UnifiedLogEntry>) {
for entry in entries {
let payload = match serde_json::to_string(&entry) {
Ok(p) => p,
@@ -98,7 +98,7 @@ impl KafkaSink {
}
};
let span_id = entry.timestamp.to_rfc3339();
let span_id = entry.get_timestamp().to_rfc3339();
let _ = producer
.send(
@@ -113,7 +113,7 @@ impl KafkaSink {
#[cfg(feature = "kafka")]
#[async_trait]
impl Sink for KafkaSink {
async fn write(&self, entry: &LogEntry) {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut batch = self.entries.lock().await;
batch.push(entry.clone());
@@ -129,7 +129,7 @@ impl Sink for KafkaSink {
if should_flush_by_size || should_flush_by_time {
// Existing flush logic
let entries_to_send: Vec<LogEntry> = batch.drain(..).collect();
let entries_to_send: Vec<UnifiedLogEntry> = batch.drain(..).collect();
let producer = self.producer.clone();
let topic = self.topic.clone();
@@ -203,7 +203,7 @@ impl WebhookSink {
#[cfg(feature = "webhook")]
#[async_trait]
impl Sink for WebhookSink {
async fn write(&self, entry: &LogEntry) {
async fn write(&self, entry: &UnifiedLogEntry) {
let mut retries = 0;
let url = self.url.clone();
let entry_clone = entry.clone();
@@ -327,12 +327,17 @@ impl FileSink {
#[cfg(feature = "file")]
#[async_trait]
impl Sink for FileSink {
async fn write(&self, entry: &LogEntry) {
async fn write(&self, entry: &UnifiedLogEntry) {
let line = format!("{:?}\n", entry);
let mut writer = self.writer.lock().await;
if let Err(e) = writer.write_all(line.as_bytes()).await {
eprintln!("Failed to write log to file {}: {}", self.path, e);
eprintln!(
"Failed to write log to file {}: {},entry timestamp:{:?}",
self.path,
e,
entry.get_timestamp()
);
return;
}

View File

@@ -1,9 +1,9 @@
use crate::{entry::log::LogEntry, sink::Sink};
use crate::{sink::Sink, UnifiedLogEntry};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
/// Start the log processing worker thread
pub async fn start_worker(receiver: Receiver<LogEntry>, sinks: Vec<Arc<dyn Sink>>) {
pub async fn start_worker(receiver: Receiver<UnifiedLogEntry>, sinks: Vec<Arc<dyn Sink>>) {
let mut receiver = receiver;
while let Some(entry) = receiver.recv().await {
for sink in &sinks {