From c3ecfeae6cd8a6ea2a3998122e30df513e7523f3 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 18 Mar 2025 16:36:23 +0800 Subject: [PATCH] improve logger entry for Observability --- .docker/observability/README.md | 0 .docker/observability/docker-compose.yml | 65 +++ .docker/observability/jaeger-config.yaml | 88 +++ .docker/observability/loki-config.yaml | 63 +++ .../observability/otel-collector-config.yaml | 57 ++ .docker/observability/prometheus.yml | 11 + config/obs.example.toml | 31 ++ packages/obs/examples/server.rs | 30 +- packages/obs/src/config.rs | 65 ++- packages/obs/src/entry/args.rs | 74 +++ packages/obs/src/entry/audit.rs | 275 ++++----- packages/obs/src/entry/base.rs | 526 +++--------------- packages/obs/src/entry/log.rs | 83 --- packages/obs/src/entry/mod.rs | 142 ++++- packages/obs/src/entry/unified.rs | 279 ++++++++++ packages/obs/src/lib.rs | 32 +- packages/obs/src/logger.rs | 177 +++--- packages/obs/src/sink.rs | 27 +- packages/obs/src/worker.rs | 4 +- 19 files changed, 1208 insertions(+), 821 deletions(-) create mode 100644 .docker/observability/README.md create mode 100644 .docker/observability/docker-compose.yml create mode 100644 .docker/observability/jaeger-config.yaml create mode 100644 .docker/observability/loki-config.yaml create mode 100644 .docker/observability/otel-collector-config.yaml create mode 100644 .docker/observability/prometheus.yml create mode 100644 config/obs.example.toml create mode 100644 packages/obs/src/entry/args.rs delete mode 100644 packages/obs/src/entry/log.rs create mode 100644 packages/obs/src/entry/unified.rs diff --git a/.docker/observability/README.md b/.docker/observability/README.md new file mode 100644 index 00000000..e69de29b diff --git a/.docker/observability/docker-compose.yml b/.docker/observability/docker-compose.yml new file mode 100644 index 00000000..d6bd1932 --- /dev/null +++ b/.docker/observability/docker-compose.yml @@ -0,0 +1,65 @@ +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:0.120.0 + environment: + - TZ=Asia/Shanghai + volumes: + - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml + ports: + - 1888:1888 + - 8888:8888 + - 8889:8889 + - 13133:13133 + - 4317:4317 + - 4318:4318 + - 55679:55679 + networks: + - otel-network + jaeger: + image: jaegertracing/jaeger:2.3.0 + environment: + - TZ=Asia/Shanghai + ports: + - "16686:16686" + - "14317:4317" + - "14318:4318" + networks: + - otel-network + prometheus: + image: prom/prometheus:v3.2.0 + environment: + - TZ=Asia/Shanghai + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - "9090:9090" + networks: + - otel-network + loki: + image: grafana/loki:3.4.2 + environment: + - TZ=Asia/Shanghai + volumes: + - ./loki-config.yaml:/etc/loki/local-config.yaml + ports: + - "3100:3100" + command: -config.file=/etc/loki/local-config.yaml + networks: + - otel-network + grafana: + image: grafana/grafana:11.5.2 + ports: + - "3000:3000" # Web UI + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - TZ=Asia/Shanghai + networks: + - otel-network + + +networks: + otel-network: + driver: bridge + name: "network_otel_config" + driver_opts: + com.docker.network.enable_ipv6: "true" diff --git a/.docker/observability/jaeger-config.yaml b/.docker/observability/jaeger-config.yaml new file mode 100644 index 00000000..31b5a9d6 --- /dev/null +++ b/.docker/observability/jaeger-config.yaml @@ -0,0 +1,88 @@ + +service: + extensions: [jaeger_storage, jaeger_query, remote_sampling, healthcheckv2] + pipelines: + traces: + receivers: [otlp, jaeger, zipkin] + processors: [batch, adaptive_sampling] + exporters: [jaeger_storage_exporter] + telemetry: + resource: + service.name: jaeger + metrics: + level: detailed + readers: + - pull: + exporter: + prometheus: + host: 0.0.0.0 + port: 8888 + logs: + level: debug + # TODO Initialize telemetry tracer once OTEL released new feature. + # https://github.com/open-telemetry/opentelemetry-collector/issues/10663 + +extensions: + healthcheckv2: + use_v2: true + http: + + # pprof: + # endpoint: 0.0.0.0:1777 + # zpages: + # endpoint: 0.0.0.0:55679 + + jaeger_query: + storage: + traces: some_store + traces_archive: another_store + ui: + config_file: ./cmd/jaeger/config-ui.json + # The maximum duration that is considered for clock skew adjustments. + # Defaults to 0 seconds, which means it's disabled. + max_clock_skew_adjust: 0s + + jaeger_storage: + backends: + some_store: + memory: + max_traces: 100000 + another_store: + memory: + max_traces: 100000 + + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + sampling_store: some_store + initial_sampling_probability: 0.1 + http: + grpc: + +receivers: + otlp: + protocols: + grpc: + http: + + jaeger: + protocols: + grpc: + thrift_binary: + thrift_compact: + thrift_http: + + zipkin: + +processors: + batch: + # Adaptive Sampling Processor is required to support adaptive sampling. + # It expects remote_sampling extension with `adaptive:` config to be enabled. + adaptive_sampling: + +exporters: + jaeger_storage_exporter: + trace_storage: some_store + diff --git a/.docker/observability/loki-config.yaml b/.docker/observability/loki-config.yaml new file mode 100644 index 00000000..4aff8772 --- /dev/null +++ b/.docker/observability/loki-config.yaml @@ -0,0 +1,63 @@ +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + log_level: debug + grpc_server_max_concurrent_streams: 1000 + +common: + instance_addr: 127.0.0.1 + path_prefix: /tmp/loki + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + +limits_config: + metric_aggregation_enabled: true + +schema_config: + configs: + - from: 2020-10-24 + store: tsdb + object_store: filesystem + schema: v13 + index: + prefix: index_ + period: 24h + +pattern_ingester: + enabled: true + metric_aggregation: + loki_address: localhost:3100 + +ruler: + alertmanager_url: http://localhost:9093 + +frontend: + encoding: protobuf + +# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration +# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ +# +# Statistics help us better understand how Loki is used, and they show us performance +# levels for most users. This helps us prioritize features and documentation. +# For more information on what's sent, look at +# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go +# Refer to the buildReport method to see what goes into a report. +# +# If you would like to disable reporting, uncomment the following lines: +#analytics: +# reporting_enabled: false diff --git a/.docker/observability/otel-collector-config.yaml b/.docker/observability/otel-collector-config.yaml new file mode 100644 index 00000000..9e27c4e2 --- /dev/null +++ b/.docker/observability/otel-collector-config.yaml @@ -0,0 +1,57 @@ +receivers: + otlp: + protocols: + grpc: # OTLP gRPC 接收器 + endpoint: 0.0.0.0:4317 + http: # OTLP HTTP 接收器 + endpoint: 0.0.0.0:4318 + +processors: + batch: # 批处理处理器,提升吞吐量 + timeout: 5s + send_batch_size: 1000 + memory_limiter: + check_interval: 1s + limit_mib: 512 + +exporters: + otlp/traces: # OTLP 导出器,用于跟踪数据 + endpoint: "jaeger:4317" # Jaeger 的 OTLP gRPC 端点 + tls: + insecure: true # 开发环境禁用 TLS,生产环境需配置证书 + prometheus: # Prometheus 导出器,用于指标数据 + endpoint: "0.0.0.0:8889" # Prometheus 刮取端点 + namespace: "otel" # 指标前缀 + send_timestamps: true # 发送时间戳 + # enable_open_metrics: true + loki: # Loki 导出器,用于日志数据 + # endpoint: "http://loki:3100/otlp/v1/logs" + endpoint: "http://loki:3100/loki/api/v1/push" + tls: + insecure: true +extensions: + health_check: + pprof: + zpages: +service: + extensions: [health_check, pprof, zpages] # 启用扩展 + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter,batch] + exporters: [otlp/traces] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] + logs: + receivers: [otlp] + processors: [batch] + exporters: [loki] + telemetry: + logs: + level: "info" # Collector 日志级别 + metrics: + address: "0.0.0.0:8888" # Collector 自身指标暴露 + + diff --git a/.docker/observability/prometheus.yml b/.docker/observability/prometheus.yml new file mode 100644 index 00000000..8d3a031e --- /dev/null +++ b/.docker/observability/prometheus.yml @@ -0,0 +1,11 @@ +global: + scrape_interval: 5s # 刮取间隔 + +scrape_configs: + - job_name: 'otel-collector' + static_configs: + - targets: ['otel-collector:8888'] # 从 Collector 刮取指标 + - job_name: 'otel-metrics' + static_configs: + - targets: ['otel-collector:8889'] # 应用指标 + diff --git a/config/obs.example.toml b/config/obs.example.toml new file mode 100644 index 00000000..59bd0f34 --- /dev/null +++ b/config/obs.example.toml @@ -0,0 +1,31 @@ +[observability] +endpoint = "" # Default is "http://localhost:4317" if not specified +use_stdout = false +sample_ratio = 0.5 +meter_interval = 30 +service_name = "rustfs_obs_service" +service_version = "0.1.0" +deployment_environment = "develop" + +[sinks] +[sinks.kafka] # Kafka sink is disabled by default +enabled = false +bootstrap_servers = "localhost:9092" +topic = "logs" +batch_size = 100 # Default is 100 if not specified +batch_timeout_ms = 1000 # Default is 1000ms if not specified + +[sinks.webhook] +enabled = false +url = "http://localhost:8080/webhook" +batch_size = 100 # Default is 3 if not specified +batch_timeout_ms = 1000 # Default is 100ms if not specified + +[sinks.file] +enabled = true +path = "logs/app.log" +batch_size = 100 +batch_timeout_ms = 1000 # Default is 8192 bytes if not specified + +[logger] +queue_capacity = 10000 \ No newline at end of file diff --git a/packages/obs/examples/server.rs b/packages/obs/examples/server.rs index 60b275de..2d35c8d4 100644 --- a/packages/obs/examples/server.rs +++ b/packages/obs/examples/server.rs @@ -1,12 +1,13 @@ use opentelemetry::global; -use rustfs_obs::{get_logger, init_obs, load_config, log_info, LogEntry}; +use rustfs_obs::{get_logger, init_obs, load_config, log_info, ServerLogEntry}; use std::time::{Duration, SystemTime}; use tracing::{info, instrument}; use tracing_core::Level; #[tokio::main] async fn main() { - let config = load_config(Some("packages/obs/examples/config".to_string())); + let obs_conf = Some("packages/obs/examples/config.toml".to_string()); + let config = load_config(obs_conf); let (_logger, _guard) = init_obs(config.clone()).await; // Simulate the operation tokio::time::sleep(Duration::from_millis(100)).await; @@ -33,24 +34,13 @@ async fn run(bucket: String, object: String, user: String, service_name: String) &[opentelemetry::KeyValue::new("operation", "run")], ); - let result = get_logger() - .lock() - .await - .log(LogEntry::new( - Level::INFO, - "Process user requests".to_string(), - "api_handler".to_string(), - Some("demo-audit".to_string()), - Some("req-12345".to_string()), - Some(user), - vec![ - ("endpoint".to_string(), "/api/v1/data".to_string()), - ("method".to_string(), "GET".to_string()), - ("bucket".to_string(), bucket), - ("object-length".to_string(), object.len().to_string()), - ], - )) - .await; + let server_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string()) + .user_id(Some(user.clone())) + .add_field("operation".to_string(), "login".to_string()) + .add_field("bucket".to_string(), bucket.clone()) + .add_field("object".to_string(), object.clone()); + + let result = get_logger().lock().await.log_server_entry(server_entry).await; info!("Logging is completed {:?}", result); put_object("bucket".to_string(), "object".to_string(), "user".to_string()).await; info!("Logging is completed"); diff --git a/packages/obs/src/config.rs b/packages/obs/src/config.rs index c6e57c42..744c0ec1 100644 --- a/packages/obs/src/config.rs +++ b/packages/obs/src/config.rs @@ -3,6 +3,11 @@ use serde::Deserialize; use std::env; /// OpenTelemetry Configuration +/// Add service name, service version, deployment environment +/// Add interval time for metric collection +/// Add sample ratio for trace sampling +/// Add endpoint for metric collection +/// Add use_stdout for output to stdout #[derive(Debug, Deserialize, Clone, Default)] pub struct OtelConfig { pub endpoint: String, @@ -58,6 +63,19 @@ pub struct LoggerConfig { } /// Overall application configuration +/// Add observability, sinks, and logger configuration +/// +/// Observability: OpenTelemetry configuration +/// Sinks: Kafka, Webhook, File sink configuration +/// Logger: Logger configuration +/// +/// # Example +/// ``` +/// use rustfs_obs::AppConfig; +/// use rustfs_obs::load_config; +/// +/// let config = load_config(None); +/// ``` #[derive(Debug, Deserialize, Clone, Default)] pub struct AppConfig { pub observability: OtelConfig, @@ -65,19 +83,48 @@ pub struct AppConfig { pub logger: LoggerConfig, } +const DEFAULT_CONFIG_FILE: &str = "obs"; + /// Loading the configuration file /// Supports TOML, YAML and .env formats, read in order by priority +/// +/// # Parameters +/// - `config_dir`: Configuration file path +/// +/// # Returns +/// Configuration information +/// +/// # Example +/// ``` +/// use rustfs_obs::AppConfig; +/// use rustfs_obs::load_config; +/// +/// let config = load_config(None); +/// ``` pub fn load_config(config_dir: Option) -> AppConfig { - let config_dir = config_dir.unwrap_or_else(|| { - env::current_dir() - .map(|path| path.to_string_lossy().to_string()) - .unwrap_or_else(|_| { - eprintln!("Warning: Failed to get current directory, using empty path"); - String::new() - }) - }); + let config_dir = if let Some(path) = config_dir { + // Use the provided path + let path = std::path::Path::new(&path); + if path.extension().is_some() { + // If path has extension, use it as is (extension will be added by Config::builder) + path.with_extension("").to_string_lossy().into_owned() + } else { + // If path is a directory, append the default config file name + path.to_string_lossy().into_owned() + } + } else { + // If no path provided, use current directory + default config file + match env::current_dir() { + Ok(dir) => dir.join(DEFAULT_CONFIG_FILE).to_string_lossy().into_owned(), + Err(_) => { + eprintln!("Warning: Failed to get current directory, using default config file"); + DEFAULT_CONFIG_FILE.to_string() + } + } + }; - println!("config_dir: {}", config_dir); + // Log using proper logging instead of println when possible + println!("Using config file base: {}", config_dir); let config = Config::builder() .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml)) diff --git a/packages/obs/src/entry/args.rs b/packages/obs/src/entry/args.rs new file mode 100644 index 00000000..1099398a --- /dev/null +++ b/packages/obs/src/entry/args.rs @@ -0,0 +1,74 @@ +use crate::entry::ObjectVersion; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Args - defines the arguments for API operations +/// Args is used to define the arguments for API operations. +/// +/// # Example +/// ``` +/// use rustfs_obs::Args; +/// use std::collections::HashMap; +/// +/// let args = Args::new() +/// .set_bucket(Some("my-bucket".to_string())) +/// .set_object(Some("my-object".to_string())) +/// .set_version_id(Some("123".to_string())) +/// .set_metadata(Some(HashMap::new())); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)] +pub struct Args { + #[serde(rename = "bucket", skip_serializing_if = "Option::is_none")] + pub bucket: Option, + #[serde(rename = "object", skip_serializing_if = "Option::is_none")] + pub object: Option, + #[serde(rename = "versionId", skip_serializing_if = "Option::is_none")] + pub version_id: Option, + #[serde(rename = "objects", skip_serializing_if = "Option::is_none")] + pub objects: Option>, + #[serde(rename = "metadata", skip_serializing_if = "Option::is_none")] + pub metadata: Option>, +} + +impl Args { + /// Create a new Args object + pub fn new() -> Self { + Args { + bucket: None, + object: None, + version_id: None, + objects: None, + metadata: None, + } + } + + /// Set the bucket + pub fn set_bucket(mut self, bucket: Option) -> Self { + self.bucket = bucket; + self + } + + /// Set the object + pub fn set_object(mut self, object: Option) -> Self { + self.object = object; + self + } + + /// Set the version ID + pub fn set_version_id(mut self, version_id: Option) -> Self { + self.version_id = version_id; + self + } + + /// Set the objects + pub fn set_objects(mut self, objects: Option>) -> Self { + self.objects = objects; + self + } + + /// Set the metadata + pub fn set_metadata(mut self, metadata: Option>) -> Self { + self.metadata = metadata; + self + } +} diff --git a/packages/obs/src/entry/audit.rs b/packages/obs/src/entry/audit.rs index 8f8b1e34..0aba2bac 100644 --- a/packages/obs/src/entry/audit.rs +++ b/packages/obs/src/entry/audit.rs @@ -1,11 +1,64 @@ -use crate::ObjectVersion; +use crate::{BaseLogEntry, LogRecord, ObjectVersion}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; /// API details structure -#[derive(Debug, Serialize, Deserialize, Clone, Default)] +/// ApiDetails is used to define the details of an API operation +/// +/// The `ApiDetails` structure contains the following fields: +/// - `name` - the name of the API operation +/// - `bucket` - the bucket name +/// - `object` - the object name +/// - `objects` - the list of objects +/// - `status` - the status of the API operation +/// - `status_code` - the status code of the API operation +/// - `input_bytes` - the input bytes +/// - `output_bytes` - the output bytes +/// - `header_bytes` - the header bytes +/// - `time_to_first_byte` - the time to first byte +/// - `time_to_first_byte_in_ns` - the time to first byte in nanoseconds +/// - `time_to_response` - the time to response +/// - `time_to_response_in_ns` - the time to response in nanoseconds +/// +/// The `ApiDetails` structure contains the following methods: +/// - `new` - create a new `ApiDetails` with default values +/// - `set_name` - set the name +/// - `set_bucket` - set the bucket +/// - `set_object` - set the object +/// - `set_objects` - set the objects +/// - `set_status` - set the status +/// - `set_status_code` - set the status code +/// - `set_input_bytes` - set the input bytes +/// - `set_output_bytes` - set the output bytes +/// - `set_header_bytes` - set the header bytes +/// - `set_time_to_first_byte` - set the time to first byte +/// - `set_time_to_first_byte_in_ns` - set the time to first byte in nanoseconds +/// - `set_time_to_response` - set the time to response +/// - `set_time_to_response_in_ns` - set the time to response in nanoseconds +/// +/// # Example +/// ``` +/// use rustfs_obs::ApiDetails; +/// use rustfs_obs::ObjectVersion; +/// +/// let api = ApiDetails::new() +/// .set_name(Some("GET".to_string())) +/// .set_bucket(Some("my-bucket".to_string())) +/// .set_object(Some("my-object".to_string())) +/// .set_objects(vec![ObjectVersion::new_with_object_name("my-object".to_string())]) +/// .set_status(Some("OK".to_string())) +/// .set_status_code(Some(200)) +/// .set_input_bytes(100) +/// .set_output_bytes(200) +/// .set_header_bytes(Some(50)) +/// .set_time_to_first_byte(Some("100ms".to_string())) +/// .set_time_to_first_byte_in_ns(Some("100000000ns".to_string())) +/// .set_time_to_response(Some("200ms".to_string())) +/// .set_time_to_response_in_ns(Some("200000000ns".to_string())); +/// ``` +#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)] pub struct ApiDetails { #[serde(rename = "name", skip_serializing_if = "Option::is_none")] pub name: Option, @@ -135,22 +188,85 @@ impl ApiDetails { } /// Entry - audit entry logs +/// AuditLogEntry is used to define the structure of an audit log entry +/// +/// The `AuditLogEntry` structure contains the following fields: +/// - `base` - the base log entry +/// - `version` - the version of the audit log entry +/// - `deployment_id` - the deployment ID +/// - `event` - the event +/// - `entry_type` - the type of audit message +/// - `api` - the API details +/// - `remote_host` - the remote host +/// - `user_agent` - the user agent +/// - `req_path` - the request path +/// - `req_host` - the request host +/// - `req_claims` - the request claims +/// - `req_query` - the request query +/// - `req_header` - the request header +/// - `resp_header` - the response header +/// - `access_key` - the access key +/// - `parent_user` - the parent user +/// - `error` - the error +/// +/// The `AuditLogEntry` structure contains the following methods: +/// - `new` - create a new `AuditEntry` with default values +/// - `new_with_values` - create a new `AuditEntry` with version, time, event and api details +/// - `with_base` - set the base log entry +/// - `set_version` - set the version +/// - `set_deployment_id` - set the deployment ID +/// - `set_event` - set the event +/// - `set_entry_type` - set the entry type +/// - `set_api` - set the API details +/// - `set_remote_host` - set the remote host +/// - `set_user_agent` - set the user agent +/// - `set_req_path` - set the request path +/// - `set_req_host` - set the request host +/// - `set_req_claims` - set the request claims +/// - `set_req_query` - set the request query +/// - `set_req_header` - set the request header +/// - `set_resp_header` - set the response header +/// - `set_access_key` - set the access key +/// - `set_parent_user` - set the parent user +/// - `set_error` - set the error +/// +/// # Example +/// ``` +/// use rustfs_obs::AuditLogEntry; +/// use rustfs_obs::ApiDetails; +/// use std::collections::HashMap; +/// +/// let entry = AuditLogEntry::new() +/// .set_version("1.0".to_string()) +/// .set_deployment_id(Some("123".to_string())) +/// .set_event("event".to_string()) +/// .set_entry_type(Some("type".to_string())) +/// .set_api(ApiDetails::new()) +/// .set_remote_host(Some("remote-host".to_string())) +/// .set_user_agent(Some("user-agent".to_string())) +/// .set_req_path(Some("req-path".to_string())) +/// .set_req_host(Some("req-host".to_string())) +/// .set_req_claims(Some(HashMap::new())) +/// .set_req_query(Some(HashMap::new())) +/// .set_req_header(Some(HashMap::new())) +/// .set_resp_header(Some(HashMap::new())) +/// .set_access_key(Some("access-key".to_string())) +/// .set_parent_user(Some("parent-user".to_string())) +/// .set_error(Some("error".to_string())); #[derive(Debug, Serialize, Deserialize, Clone, Default)] -pub struct AuditEntry { +pub struct AuditLogEntry { + #[serde(flatten)] + pub base: BaseLogEntry, pub version: String, #[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")] pub deployment_id: Option, - pub time: DateTime, pub event: String, - // Class of audit message - S3, admin ops, bucket management #[serde(rename = "type", skip_serializing_if = "Option::is_none")] pub entry_type: Option, pub api: ApiDetails, #[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")] pub remote_host: Option, - #[serde(rename = "requestID", skip_serializing_if = "Option::is_none")] - pub request_id: Option, #[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")] pub user_agent: Option, #[serde(rename = "requestPath", skip_serializing_if = "Option::is_none")] @@ -165,8 +281,6 @@ pub struct AuditEntry { pub req_header: Option>, #[serde(rename = "responseHeader", skip_serializing_if = "Option::is_none")] pub resp_header: Option>, - #[serde(rename = "tags", skip_serializing_if = "Option::is_none")] - pub tags: Option>, #[serde(rename = "accessKey", skip_serializing_if = "Option::is_none")] pub access_key: Option, #[serde(rename = "parentUser", skip_serializing_if = "Option::is_none")] @@ -175,18 +289,17 @@ pub struct AuditEntry { pub error: Option, } -impl AuditEntry { +impl AuditLogEntry { /// Create a new `AuditEntry` with default values pub fn new() -> Self { - AuditEntry { + AuditLogEntry { + base: BaseLogEntry::new(), version: String::new(), deployment_id: None, - time: Utc::now(), event: String::new(), entry_type: None, api: ApiDetails::new(), remote_host: None, - request_id: None, user_agent: None, req_path: None, req_host: None, @@ -194,47 +307,25 @@ impl AuditEntry { req_query: None, req_header: None, resp_header: None, - tags: None, access_key: None, parent_user: None, error: None, } } - /// Create a new `AuditEntry` with version and time event and api details - /// # Arguments - /// * `version` - Version of the audit entry - /// * `time` - Time of the audit entry - /// * `event` - Event of the audit entry - /// * `api` - API details of the audit entry - /// # Returns - /// * `AuditEntry` with the given values - /// # Example - /// ``` - /// use chrono::Utc; - /// use rustfs_obs::{ApiDetails, AuditEntry}; - /// let entry = AuditEntry::new_with_values( - /// "v1".to_string(), - /// Utc::now(), - /// "event".to_string(), - /// ApiDetails::new(), - /// ); - /// ``` - /// # Remarks - /// This is a convenience method to create an `AuditEntry` with the given values - /// without having to set each field individually - /// This is useful when you want to create an `AuditEntry` with the given values - /// without having to set each field individually + /// Create a new `AuditEntry` with version, time, event and api details pub fn new_with_values(version: String, time: DateTime, event: String, api: ApiDetails) -> Self { - AuditEntry { + let mut base = BaseLogEntry::new(); + base.timestamp = time; + + AuditLogEntry { + base, version, deployment_id: None, - time, event, entry_type: None, api, remote_host: None, - request_id: None, user_agent: None, req_path: None, req_host: None, @@ -242,13 +333,18 @@ impl AuditEntry { req_query: None, req_header: None, resp_header: None, - tags: None, access_key: None, parent_user: None, error: None, } } + /// Set the base log entry + pub fn with_base(mut self, base: BaseLogEntry) -> Self { + self.base = base; + self + } + /// Set the version pub fn set_version(mut self, version: String) -> Self { self.version = version; @@ -261,12 +357,6 @@ impl AuditEntry { self } - /// Set the time - pub fn set_time(mut self, time: DateTime) -> Self { - self.time = time; - self - } - /// Set the event pub fn set_event(mut self, event: String) -> Self { self.event = event; @@ -291,12 +381,6 @@ impl AuditEntry { self } - /// Set the request ID - pub fn set_request_id(mut self, request_id: Option) -> Self { - self.request_id = request_id; - self - } - /// Set the user agent pub fn set_user_agent(mut self, user_agent: Option) -> Self { self.user_agent = user_agent; @@ -339,12 +423,6 @@ impl AuditEntry { self } - /// Set the tags - pub fn set_tags(mut self, tags: Option>) -> Self { - self.tags = tags; - self - } - /// Set the access key pub fn set_access_key(mut self, access_key: Option) -> Self { self.access_key = access_key; @@ -364,79 +442,12 @@ impl AuditEntry { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_audit_entry() { - let entry = AuditEntry::new() - .set_version("v1".to_string()) - .set_deployment_id(Some("12345".to_string())) - .set_time(Utc::now()) - .set_event("event".to_string()) - .set_entry_type(Some("type".to_string())) - .set_api(ApiDetails::new()) - .set_remote_host(Some("localhost".to_string())) - .set_request_id(Some("req-12345".to_string())) - .set_user_agent(Some("user-agent".to_string())) - .set_req_path(Some("/path".to_string())) - .set_req_host(Some("localhost".to_string())) - .set_req_claims(Some(HashMap::new())) - .set_req_query(Some(HashMap::new())) - .set_req_header(Some(HashMap::new())) - .set_resp_header(Some(HashMap::new())) - .set_tags(Some(HashMap::new())) - .set_access_key(Some("access-key".to_string())) - .set_parent_user(Some("parent-user".to_string())) - .set_error(Some("error".to_string())); - - assert_eq!(entry.version, "v1"); - assert_eq!(entry.deployment_id, Some("12345".to_string())); - assert_eq!(entry.event, "event"); - assert_eq!(entry.entry_type, Some("type".to_string())); - assert_eq!(entry.remote_host, Some("localhost".to_string())); - assert_eq!(entry.request_id, Some("req-12345".to_string())); - assert_eq!(entry.user_agent, Some("user-agent".to_string())); - assert_eq!(entry.req_path, Some("/path".to_string())); - assert_eq!(entry.req_host, Some("localhost".to_string())); - assert_eq!(entry.access_key, Some("access-key".to_string())); - assert_eq!(entry.parent_user, Some("parent-user".to_string())); - assert_eq!(entry.error, Some("error".to_string())); +impl LogRecord for AuditLogEntry { + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) } - #[test] - fn test_api_details() { - let api = ApiDetails::new() - .set_name(Some("name".to_string())) - .set_bucket(Some("bucket".to_string())) - .set_object(Some("object".to_string())) - .set_objects(vec![ObjectVersion { - object_name: "object".to_string(), - version_id: Some("12345".to_string()), - }]) - .set_status(Some("status".to_string())) - .set_status_code(Some(200)) - .set_input_bytes(100) - .set_output_bytes(200) - .set_header_bytes(Some(300)) - .set_time_to_first_byte(Some("100ms".to_string())) - .set_time_to_first_byte_in_ns(Some("100ns".to_string())) - .set_time_to_response(Some("200ms".to_string())) - .set_time_to_response_in_ns(Some("200ns".to_string())); - - assert_eq!(api.name, Some("name".to_string())); - assert_eq!(api.bucket, Some("bucket".to_string())); - assert_eq!(api.object, Some("object".to_string())); - assert_eq!(api.objects.len(), 1); - assert_eq!(api.status, Some("status".to_string())); - assert_eq!(api.status_code, Some(200)); - assert_eq!(api.input_bytes, 100); - assert_eq!(api.output_bytes, 200); - assert_eq!(api.header_bytes, Some(300)); - assert_eq!(api.time_to_first_byte, Some("100ms".to_string())); - assert_eq!(api.time_to_first_byte_in_ns, Some("100ns".to_string())); - assert_eq!(api.time_to_response, Some("200ms".to_string())); - assert_eq!(api.time_to_response_in_ns, Some("200ns".to_string())); + fn get_timestamp(&self) -> DateTime { + self.base.timestamp } } diff --git a/packages/obs/src/entry/base.rs b/packages/obs/src/entry/base.rs index c629c74d..422646e0 100644 --- a/packages/obs/src/entry/base.rs +++ b/packages/obs/src/entry/base.rs @@ -3,490 +3,90 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -/// ObjectVersion object version key/versionId +/// Base log entry structure shared by all log types +/// This structure is used to serialize log entries to JSON +/// and send them to the log sinks +/// This structure is also used to deserialize log entries from JSON +/// This structure is also used to store log entries in the database +/// This structure is also used to query log entries from the database +/// +/// The `BaseLogEntry` structure contains the following fields: +/// - `timestamp` - the timestamp of the log entry +/// - `request_id` - the request ID of the log entry +/// - `message` - the message of the log entry +/// - `tags` - the tags of the log entry +/// +/// The `BaseLogEntry` structure contains the following methods: +/// - `new` - create a new `BaseLogEntry` with default values +/// - `message` - set the message +/// - `request_id` - set the request ID +/// - `tags` - set the tags +/// - `timestamp` - set the timestamp +/// +/// # Example +/// ``` +/// use rustfs_obs::BaseLogEntry; +/// use chrono::{DateTime, Utc}; +/// use std::collections::HashMap; +/// +/// let timestamp = Utc::now(); +/// let request = Some("req-123".to_string()); +/// let message = Some("This is a log message".to_string()); +/// let tags = Some(HashMap::new()); +/// +/// let entry = BaseLogEntry::new() +/// .timestamp(timestamp) +/// .request_id(request) +/// .message(message) +/// .tags(tags); +/// ``` #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)] -pub struct ObjectVersion { - #[serde(rename = "objectName")] - pub object_name: String, - #[serde(rename = "versionId", skip_serializing_if = "Option::is_none")] - pub version_id: Option, -} - -impl ObjectVersion { - /// Create a new ObjectVersion object - pub fn new() -> Self { - ObjectVersion { - object_name: String::new(), - version_id: None, - } - } - - /// Create a new ObjectVersion object with object name - pub fn new_with_object_name(object_name: String) -> Self { - ObjectVersion { - object_name, - version_id: None, - } - } - /// Set the object name - pub fn set_object_name(mut self, object_name: String) -> Self { - self.object_name = object_name; - self - } - - /// Set the version ID - pub fn set_version_id(mut self, version_id: Option) -> Self { - self.version_id = version_id; - self - } -} - -/// Args - defines the arguments for the API -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub struct Args { - #[serde(rename = "bucket", skip_serializing_if = "Option::is_none")] - pub bucket: Option, - #[serde(rename = "object", skip_serializing_if = "Option::is_none")] - pub object: Option, - #[serde(rename = "versionId", skip_serializing_if = "Option::is_none")] - pub version_id: Option, - #[serde(rename = "objects", skip_serializing_if = "Option::is_none")] - pub objects: Option>, - #[serde(rename = "metadata", skip_serializing_if = "Option::is_none")] - pub metadata: Option>, -} - -impl Args { - /// Create a new Args object - pub fn new() -> Self { - Args { - bucket: None, - object: None, - version_id: None, - objects: None, - metadata: None, - } - } - - /// Set the bucket - pub fn set_bucket(mut self, bucket: Option) -> Self { - self.bucket = bucket; - self - } - - /// Set the object - pub fn set_object(mut self, object: Option) -> Self { - self.object = object; - self - } - - /// Set the version ID - pub fn set_version_id(mut self, version_id: Option) -> Self { - self.version_id = version_id; - self - } - - /// Set the objects - pub fn set_objects(mut self, objects: Option>) -> Self { - self.objects = objects; - self - } - - /// Set the metadata - pub fn set_metadata(mut self, metadata: Option>) -> Self { - self.metadata = metadata; - self - } -} - -/// Trace - defines the trace -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub struct Trace { - #[serde(rename = "message", skip_serializing_if = "Option::is_none")] - pub message: Option, - #[serde(rename = "source", skip_serializing_if = "Option::is_none")] - pub source: Option>, - #[serde(rename = "variables", skip_serializing_if = "Option::is_none")] - pub variables: Option>, -} - -impl Trace { - /// Create a new Trace object - pub fn new() -> Self { - Trace { - message: None, - source: None, - variables: None, - } - } - - /// Set the message - pub fn set_message(mut self, message: Option) -> Self { - self.message = message; - self - } - - /// Set the source - pub fn set_source(mut self, source: Option>) -> Self { - self.source = source; - self - } - - /// Set the variables - pub fn set_variables(mut self, variables: Option>) -> Self { - self.variables = variables; - self - } -} - -/// API - defines the api type and its args -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub struct API { - #[serde(rename = "name", skip_serializing_if = "Option::is_none")] - pub name: Option, - #[serde(rename = "args", skip_serializing_if = "Option::is_none")] - pub args: Option, -} - -impl API { - /// Create a new API object - pub fn new() -> Self { - API { name: None, args: None } - } - - /// Set the name - pub fn set_name(mut self, name: Option) -> Self { - self.name = name; - self - } - - /// Set the args - pub fn set_args(mut self, args: Option) -> Self { - self.args = args; - self - } -} - -/// Log kind/level enum -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum LogKind { - #[serde(rename = "INFO")] - Info, - #[serde(rename = "WARNING")] - Warning, - #[serde(rename = "ERROR")] - Error, - #[serde(rename = "FATAL")] - Fatal, -} - -impl Default for LogKind { - fn default() -> Self { - LogKind::Info - } -} - -/// Entry - defines fields and values of each log entry -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub struct Entry { - #[serde(rename = "site", skip_serializing_if = "Option::is_none")] - pub site: Option, - - #[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")] - pub deployment_id: Option, - - pub level: LogKind, - - #[serde(rename = "errKind", skip_serializing_if = "Option::is_none")] - pub log_kind: Option, // Deprecated Jan 2024 - - pub time: DateTime, - - #[serde(rename = "api", skip_serializing_if = "Option::is_none")] - pub api: Option, - - #[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")] - pub remote_host: Option, - - #[serde(rename = "host", skip_serializing_if = "Option::is_none")] - pub host: Option, +pub struct BaseLogEntry { + #[serde(rename = "time")] + pub timestamp: DateTime, #[serde(rename = "requestID", skip_serializing_if = "Option::is_none")] pub request_id: Option, - #[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")] - pub user_agent: Option, - #[serde(rename = "message", skip_serializing_if = "Option::is_none")] pub message: Option, - #[serde(rename = "error", skip_serializing_if = "Option::is_none")] - pub trace: Option, + #[serde(rename = "tags", skip_serializing_if = "Option::is_none")] + pub tags: Option>, } -impl Entry { - /// Create a new Entry object with default values +impl BaseLogEntry { + /// Create a new BaseLogEntry with default values pub fn new() -> Self { - Entry { - site: None, - deployment_id: None, - level: LogKind::Info, - log_kind: None, - time: Utc::now(), - api: None, - remote_host: None, - host: None, + BaseLogEntry { + timestamp: Utc::now(), request_id: None, - user_agent: None, message: None, - trace: None, + tags: None, } } - /// Set the site - pub fn set_site(mut self, site: Option) -> Self { - self.site = site; - self - } - - /// Set the deployment ID - pub fn set_deployment_id(mut self, deployment_id: Option) -> Self { - self.deployment_id = deployment_id; - self - } - - /// Set the level - pub fn set_level(mut self, level: LogKind) -> Self { - self.level = level; - self - } - - /// Set the log kind - pub fn set_log_kind(mut self, log_kind: Option) -> Self { - self.log_kind = log_kind; - self - } - - /// Set the time - pub fn set_time(mut self, time: DateTime) -> Self { - self.time = time; - self - } - - /// Set the API - pub fn set_api(mut self, api: Option) -> Self { - self.api = api; - self - } - - /// Set the remote host - pub fn set_remote_host(mut self, remote_host: Option) -> Self { - self.remote_host = remote_host; - self - } - - /// Set the host - pub fn set_host(mut self, host: Option) -> Self { - self.host = host; - self - } - - /// Set the request ID - pub fn set_request_id(mut self, request_id: Option) -> Self { - self.request_id = request_id; - self - } - - /// Set the user agent - pub fn set_user_agent(mut self, user_agent: Option) -> Self { - self.user_agent = user_agent; - self - } - /// Set the message - pub fn set_message(mut self, message: Option) -> Self { + pub fn message(mut self, message: Option) -> Self { self.message = message; self } - /// Set the trace - pub fn set_trace(mut self, trace: Option) -> Self { - self.trace = trace; + /// Set the request ID + pub fn request_id(mut self, request_id: Option) -> Self { + self.request_id = request_id; + self + } + + /// Set the tags + pub fn tags(mut self, tags: Option>) -> Self { + self.tags = tags; + self + } + + /// Set the timestamp + pub fn timestamp(mut self, timestamp: DateTime) -> Self { + self.timestamp = timestamp; self } } - -/// Info holds console log messages -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Info { - #[serde(flatten)] - pub entry: Entry, - - pub console_msg: String, - - #[serde(rename = "node")] - pub node_name: String, - - #[serde(skip)] - pub err: Option, -} - -impl Info { - /// Create a new Info object with default values - pub fn new() -> Self { - Info { - entry: Entry::new(), - console_msg: String::new(), - node_name: String::new(), - err: None, - } - } - - /// Create a new Info object with console message and node name - pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self { - Info { - entry: Entry::new(), - console_msg, - node_name, - err: None, - } - } - - /// Set the node name - pub fn set_node_name(&mut self, node_name: String) { - self.node_name = node_name; - } - - /// Set the entry - pub fn set_entry(&mut self, entry: Entry) { - self.entry = entry; - } - - /// Set the console message - pub fn set_console_msg(&mut self, console_msg: String) { - self.console_msg = console_msg; - } - - /// Set the error message - pub fn set_err(&mut self, err: Option) { - self.err = err; - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_object_version() { - let mut object_version = ObjectVersion::new(); - object_version = object_version.clone().set_object_name("object".to_string()); - object_version = object_version.clone().set_version_id(Some("version".to_string())); - assert_eq!(object_version.object_name, "object".to_string()); - assert_eq!(object_version.version_id, Some("version".to_string())); - } - - #[test] - fn test_object_version_with_object_name() { - let object_version = ObjectVersion::new_with_object_name("object".to_string()); - assert_eq!(object_version.object_name, "object".to_string()); - assert_eq!(object_version.version_id, None); - } - - #[test] - fn test_args() { - let mut obj = ObjectVersion::new(); - obj.object_name = "object".to_string(); - obj.version_id = Some("version".to_string()); - let objs = vec![obj]; - let args = Args::new() - .set_bucket(Some("bucket".to_string())) - .set_object(Some("object".to_string())) - .set_version_id(Some("version".to_string())) - .set_objects(Some(objs.clone())) - .set_metadata(Some(HashMap::new())); - - assert_eq!(args.bucket, Some("bucket".to_string())); - assert_eq!(args.object, Some("object".to_string())); - assert_eq!(args.version_id, Some("version".to_string())); - assert_eq!(args.objects, Some(objs)); - assert_eq!(args.metadata, Some(HashMap::new())); - } - - #[test] - fn test_trace() { - let trace = Trace::new() - .set_message(Some("message".to_string())) - .set_source(Some(vec!["source".to_string()])) - .set_variables(Some(HashMap::new())); - - assert_eq!(trace.message, Some("message".to_string())); - assert_eq!(trace.source, Some(vec!["source".to_string()])); - assert_eq!(trace.variables, Some(HashMap::new())); - } - - #[test] - fn test_api() { - let api = API::new().set_name(Some("name".to_string())).set_args(Some(Args::new())); - - assert_eq!(api.name, Some("name".to_string())); - assert_eq!(api.args, Some(Args::new())); - } - - #[test] - fn test_log_kind() { - assert_eq!(LogKind::default(), LogKind::Info); - } - - #[test] - fn test_entry() { - let entry = Entry::new() - .set_site(Some("site".to_string())) - .set_deployment_id(Some("deployment_id".to_string())) - .set_level(LogKind::Info) - .set_log_kind(Some(LogKind::Info)) - .set_time(Utc::now()) - .set_api(Some(API::new())) - .set_remote_host(Some("remote_host".to_string())) - .set_host(Some("host".to_string())) - .set_request_id(Some("request_id".to_string())) - .set_user_agent(Some("user_agent".to_string())) - .set_message(Some("message".to_string())) - .set_trace(Some(Trace::new())); - - assert_eq!(entry.site, Some("site".to_string())); - assert_eq!(entry.deployment_id, Some("deployment_id".to_string())); - assert_eq!(entry.level, LogKind::Info); - assert_eq!(entry.log_kind, Some(LogKind::Info)); - assert_eq!(entry.api, Some(API::new())); - assert_eq!(entry.remote_host, Some("remote_host".to_string())); - assert_eq!(entry.host, Some("host".to_string())); - assert_eq!(entry.request_id, Some("request_id".to_string())); - assert_eq!(entry.user_agent, Some("user_agent".to_string())); - assert_eq!(entry.message, Some("message".to_string())); - assert_eq!(entry.trace, Some(Trace::new())); - } - - #[test] - fn test_info() { - let mut info = Info::new(); - info.set_node_name("node_name".to_string()); - info.set_entry(Entry::new()); - info.set_console_msg("console_msg".to_string()); - info.set_err(Some("err".to_string())); - - assert_eq!(info.node_name, "node_name".to_string()); - // assert_eq!(info.entry, Entry::new()); - assert_eq!(info.console_msg, "console_msg".to_string()); - assert_eq!(info.err, Some("err".to_string())); - } - - #[test] - fn test_info_with_console_msg() { - let info = Info::new_with_console_msg("console_msg".to_string(), "node_name".to_string()); - - assert_eq!(info.node_name, "node_name".to_string()); - assert_eq!(info.console_msg, "console_msg".to_string()); - assert_eq!(info.err, None); - } -} diff --git a/packages/obs/src/entry/log.rs b/packages/obs/src/entry/log.rs deleted file mode 100644 index 6960fef2..00000000 --- a/packages/obs/src/entry/log.rs +++ /dev/null @@ -1,83 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::de::Error; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tracing_core::Level; - -/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize` -#[derive(Debug, Clone)] -pub struct SerializableLevel(pub Level); - -impl From for SerializableLevel { - fn from(level: Level) -> Self { - SerializableLevel(level) - } -} - -impl From for Level { - fn from(serializable_level: SerializableLevel) -> Self { - serializable_level.0 - } -} - -impl Serialize for SerializableLevel { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(self.0.as_str()) - } -} - -impl<'de> Deserialize<'de> for SerializableLevel { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - match s.as_str() { - "TRACE" => Ok(SerializableLevel(Level::TRACE)), - "DEBUG" => Ok(SerializableLevel(Level::DEBUG)), - "INFO" => Ok(SerializableLevel(Level::INFO)), - "WARN" => Ok(SerializableLevel(Level::WARN)), - "ERROR" => Ok(SerializableLevel(Level::ERROR)), - _ => Err(D::Error::custom("unknown log level")), - } - } -} - -/// Server log entry structure -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct LogEntry { - pub timestamp: DateTime, // Log timestamp - pub level: SerializableLevel, // Log Level - pub message: String, // Log messages - pub source: String, // Log source (such as module name) - pub target: Option, // Log target - pub request_id: Option, // Request ID (Common Server Fields) - pub user_id: Option, // User ID (Common Server Fields) - pub fields: Vec<(String, String)>, // Attached fields (key value pairs) -} - -impl LogEntry { - /// Create a new LogEntry - pub fn new( - level: Level, - message: String, - source: String, - target: Option, - request_id: Option, - user_id: Option, - fields: Vec<(String, String)>, - ) -> Self { - LogEntry { - timestamp: Utc::now(), - level: SerializableLevel::from(level), - message, - source, - target, - request_id, - user_id, - fields, - } - } -} diff --git a/packages/obs/src/entry/mod.rs b/packages/obs/src/entry/mod.rs index 51ba1584..72dae261 100644 --- a/packages/obs/src/entry/mod.rs +++ b/packages/obs/src/entry/mod.rs @@ -1,3 +1,143 @@ +pub(crate) mod args; pub(crate) mod audit; pub(crate) mod base; -pub(crate) mod log; +pub(crate) mod unified; + +use serde::de::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use tracing_core::Level; + +/// ObjectVersion is used across multiple modules +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct ObjectVersion { + #[serde(rename = "name")] + pub object_name: String, + #[serde(rename = "versionId", skip_serializing_if = "Option::is_none")] + pub version_id: Option, +} + +impl ObjectVersion { + /// Create a new ObjectVersion object + pub fn new() -> Self { + ObjectVersion { + object_name: String::new(), + version_id: None, + } + } + + /// Create a new ObjectVersion with object name + pub fn new_with_object_name(object_name: String) -> Self { + ObjectVersion { + object_name, + version_id: None, + } + } + + /// Set the object name + pub fn set_object_name(mut self, object_name: String) -> Self { + self.object_name = object_name; + self + } + + /// Set the version ID + pub fn set_version_id(mut self, version_id: Option) -> Self { + self.version_id = version_id; + self + } +} + +/// Log kind/level enum +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum LogKind { + #[serde(rename = "INFO")] + Info, + #[serde(rename = "WARNING")] + Warning, + #[serde(rename = "ERROR")] + Error, + #[serde(rename = "FATAL")] + Fatal, +} + +impl Default for LogKind { + fn default() -> Self { + LogKind::Info + } +} + +/// Trait for types that can be serialized to JSON and have a timestamp +/// This trait is used by `ServerLogEntry` to convert the log entry to JSON +/// and get the timestamp of the log entry +/// This trait is implemented by `ServerLogEntry` +/// +/// # Example +/// ``` +/// use rustfs_obs::LogRecord; +/// use chrono::{DateTime, Utc}; +/// use rustfs_obs::ServerLogEntry; +/// use tracing_core::Level; +/// +/// let log_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string()); +/// let json = log_entry.to_json(); +/// let timestamp = log_entry.get_timestamp(); +/// ``` +pub trait LogRecord { + fn to_json(&self) -> String; + fn get_timestamp(&self) -> chrono::DateTime; +} + +/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize` +/// for `ServerLogEntry` +/// This is necessary because `tracing_core::Level` does not implement `Serialize` +/// and `Deserialize` +/// This is a workaround to allow `ServerLogEntry` to be serialized and deserialized +/// using `serde` +/// +/// # Example +/// ``` +/// use rustfs_obs::SerializableLevel; +/// use tracing_core::Level; +/// +/// let level = Level::INFO; +/// let serializable_level = SerializableLevel::from(level); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SerializableLevel(pub Level); + +impl From for SerializableLevel { + fn from(level: Level) -> Self { + SerializableLevel(level) + } +} + +impl From for Level { + fn from(serializable_level: SerializableLevel) -> Self { + serializable_level.0 + } +} + +impl Serialize for SerializableLevel { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(self.0.as_str()) + } +} + +impl<'de> Deserialize<'de> for SerializableLevel { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + match s.as_str() { + "TRACE" => Ok(SerializableLevel(Level::TRACE)), + "DEBUG" => Ok(SerializableLevel(Level::DEBUG)), + "INFO" => Ok(SerializableLevel(Level::INFO)), + "WARN" => Ok(SerializableLevel(Level::WARN)), + "ERROR" => Ok(SerializableLevel(Level::ERROR)), + _ => Err(D::Error::custom("unknown log level")), + } + } +} diff --git a/packages/obs/src/entry/unified.rs b/packages/obs/src/entry/unified.rs new file mode 100644 index 00000000..598ebe79 --- /dev/null +++ b/packages/obs/src/entry/unified.rs @@ -0,0 +1,279 @@ +use crate::{AuditLogEntry, BaseLogEntry, LogKind, LogRecord, SerializableLevel}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tracing_core::Level; + +/// Server log entry with structured fields +/// ServerLogEntry is used to log structured log entries from the server +/// +/// The `ServerLogEntry` structure contains the following fields: +/// - `base` - the base log entry +/// - `level` - the log level +/// - `source` - the source of the log entry +/// - `user_id` - the user ID +/// - `fields` - the structured fields of the log entry +/// +/// The `ServerLogEntry` structure contains the following methods: +/// - `new` - create a new `ServerLogEntry` with specified level and source +/// - `with_base` - set the base log entry +/// - `user_id` - set the user ID +/// - `fields` - set the fields +/// - `add_field` - add a field +/// +/// # Example +/// ``` +/// use rustfs_obs::ServerLogEntry; +/// use tracing_core::Level; +/// +/// let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string()) +/// .user_id(Some("user-456".to_string())) +/// .add_field("operation".to_string(), "login".to_string()); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ServerLogEntry { + #[serde(flatten)] + pub base: BaseLogEntry, + + pub level: SerializableLevel, + pub source: String, + + #[serde(rename = "userId", skip_serializing_if = "Option::is_none")] + pub user_id: Option, + + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub fields: Vec<(String, String)>, +} + +impl ServerLogEntry { + /// Create a new ServerLogEntry with specified level and source + pub fn new(level: Level, source: String) -> Self { + ServerLogEntry { + base: BaseLogEntry::new(), + level: SerializableLevel(level), + source, + user_id: None, + fields: Vec::new(), + } + } + + /// Set the base log entry + pub fn with_base(mut self, base: BaseLogEntry) -> Self { + self.base = base; + self + } + + /// Set the user ID + pub fn user_id(mut self, user_id: Option) -> Self { + self.user_id = user_id; + self + } + + /// Set fields + pub fn fields(mut self, fields: Vec<(String, String)>) -> Self { + self.fields = fields; + self + } + + /// Add a field + pub fn add_field(mut self, key: String, value: String) -> Self { + self.fields.push((key, value)); + self + } +} + +impl LogRecord for ServerLogEntry { + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) + } + + fn get_timestamp(&self) -> DateTime { + self.base.timestamp + } +} + +/// Console log entry structure +/// ConsoleLogEntry is used to log console log entries +/// The `ConsoleLogEntry` structure contains the following fields: +/// - `base` - the base log entry +/// - `level` - the log level +/// - `console_msg` - the console message +/// - `node_name` - the node name +/// - `err` - the error message +/// The `ConsoleLogEntry` structure contains the following methods: +/// - `new` - create a new `ConsoleLogEntry` +/// - `new_with_console_msg` - create a new `ConsoleLogEntry` with console message and node name +/// - `with_base` - set the base log entry +/// - `set_level` - set the log level +/// - `set_node_name` - set the node name +/// - `set_console_msg` - set the console message +/// - `set_err` - set the error message +/// # Example +/// ``` +/// use rustfs_obs::ConsoleLogEntry; +/// +/// let entry = ConsoleLogEntry::new_with_console_msg("Test message".to_string(), "node-123".to_string()); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConsoleLogEntry { + #[serde(flatten)] + pub base: BaseLogEntry, + + pub level: LogKind, + pub console_msg: String, + pub node_name: String, + + #[serde(skip)] + pub err: Option, +} + +impl ConsoleLogEntry { + /// Create a new ConsoleLogEntry + pub fn new() -> Self { + ConsoleLogEntry { + base: BaseLogEntry::new(), + level: LogKind::Info, + console_msg: String::new(), + node_name: String::new(), + err: None, + } + } + + /// Create a new ConsoleLogEntry with console message and node name + pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self { + ConsoleLogEntry { + base: BaseLogEntry::new(), + level: LogKind::Info, + console_msg, + node_name, + err: None, + } + } + + /// Set the base log entry + pub fn with_base(mut self, base: BaseLogEntry) -> Self { + self.base = base; + self + } + + /// Set the log level + pub fn set_level(mut self, level: LogKind) -> Self { + self.level = level; + self + } + + /// Set the node name + pub fn set_node_name(mut self, node_name: String) -> Self { + self.node_name = node_name; + self + } + + /// Set the console message + pub fn set_console_msg(mut self, console_msg: String) -> Self { + self.console_msg = console_msg; + self + } + + /// Set the error message + pub fn set_err(mut self, err: Option) -> Self { + self.err = err; + self + } +} + +impl LogRecord for ConsoleLogEntry { + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) + } + + fn get_timestamp(&self) -> DateTime { + self.base.timestamp + } +} + +/// Unified log entry type +/// UnifiedLogEntry is used to log different types of log entries +/// +/// The `UnifiedLogEntry` enum contains the following variants: +/// - `Server` - a server log entry +/// - `Audit` - an audit log entry +/// - `Console` - a console log entry +/// +/// The `UnifiedLogEntry` enum contains the following methods: +/// - `to_json` - convert the log entry to JSON +/// - `get_timestamp` - get the timestamp of the log entry +/// +/// # Example +/// ``` +/// use rustfs_obs::{UnifiedLogEntry, ServerLogEntry}; +/// use tracing_core::Level; +/// +/// let server_entry = ServerLogEntry::new(Level::INFO, "test_module".to_string()); +/// let unified = UnifiedLogEntry::Server(server_entry); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UnifiedLogEntry { + #[serde(rename = "server")] + Server(ServerLogEntry), + + #[serde(rename = "audit")] + Audit(AuditLogEntry), + + #[serde(rename = "console")] + Console(ConsoleLogEntry), +} + +impl LogRecord for UnifiedLogEntry { + fn to_json(&self) -> String { + match self { + UnifiedLogEntry::Server(entry) => entry.to_json(), + UnifiedLogEntry::Audit(entry) => entry.to_json(), + UnifiedLogEntry::Console(entry) => entry.to_json(), + } + } + + fn get_timestamp(&self) -> DateTime { + match self { + UnifiedLogEntry::Server(entry) => entry.get_timestamp(), + UnifiedLogEntry::Audit(entry) => entry.get_timestamp(), + UnifiedLogEntry::Console(entry) => entry.get_timestamp(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_base_log_entry() { + let base = BaseLogEntry::new() + .request_id(Some("req-123".to_string())) + .message(Some("Test message".to_string())); + + assert_eq!(base.request_id, Some("req-123".to_string())); + assert_eq!(base.message, Some("Test message".to_string())); + } + + #[test] + fn test_server_log_entry() { + let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string()) + .user_id(Some("user-456".to_string())) + .add_field("operation".to_string(), "login".to_string()); + + assert_eq!(entry.level.0, Level::INFO); + assert_eq!(entry.source, "test_module"); + assert_eq!(entry.user_id, Some("user-456".to_string())); + assert_eq!(entry.fields.len(), 1); + assert_eq!(entry.fields[0], ("operation".to_string(), "login".to_string())); + } + + #[test] + fn test_unified_log_entry_json() { + let server_entry = ServerLogEntry::new(Level::INFO, "test_source".to_string()); + let unified = UnifiedLogEntry::Server(server_entry); + + let json = unified.to_json(); + assert!(json.contains("test_source")); + } +} diff --git a/packages/obs/src/lib.rs b/packages/obs/src/lib.rs index 8d00c7b0..0db02853 100644 --- a/packages/obs/src/lib.rs +++ b/packages/obs/src/lib.rs @@ -3,6 +3,30 @@ /// `obs` is a logging and observability library for Rust. /// It provides a simple and easy-to-use interface for logging and observability. /// It is built on top of the `log` crate and `opentelemetry` crate. +/// +/// ## Features +/// - Structured logging +/// - Distributed tracing +/// - Metrics collection +/// - Log processing worker +/// - Multiple sinks +/// - Configuration-based setup +/// - Telemetry guard +/// - Global logger +/// - Log levels +/// - Log entry types +/// - Log record +/// - Object version +/// - Local IP address +/// +/// ## Usage +/// +/// ```rust +/// use rustfs_obs::{AppConfig, init_obs}; +/// +/// let config = AppConfig::default(); +/// let (logger, guard) = init_obs(config); +/// ``` mod config; mod entry; mod logger; @@ -13,9 +37,11 @@ mod worker; pub use config::load_config; pub use config::{AppConfig, OtelConfig}; -pub use entry::audit::{ApiDetails, AuditEntry}; -pub use entry::base::{Args, Entry, Info, LogKind, ObjectVersion, Trace, API}; -pub use entry::log::{LogEntry, SerializableLevel}; +pub use entry::args::Args; +pub use entry::audit::{ApiDetails, AuditLogEntry}; +pub use entry::base::BaseLogEntry; +pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry}; +pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel}; pub use logger::start_logger; pub use logger::{ ensure_logger_initialized, get_global_logger, init_global_logger, locked_logger, log_debug, log_error, log_info, log_trace, diff --git a/packages/obs/src/logger.rs b/packages/obs/src/logger.rs index a3848cc3..22914a66 100644 --- a/packages/obs/src/logger.rs +++ b/packages/obs/src/logger.rs @@ -1,4 +1,4 @@ -use crate::{AppConfig, LogEntry, SerializableLevel, Sink}; +use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, ServerLogEntry, Sink, UnifiedLogEntry}; use std::sync::Arc; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{Mutex, OnceCell}; @@ -10,14 +10,14 @@ static GLOBAL_LOGGER: OnceCell>> = OnceCell::const_new(); /// Server log processor #[derive(Debug)] pub struct Logger { - sender: Sender, // Log sending channel + sender: Sender, // Log sending channel queue_capacity: usize, } impl Logger { /// Create a new Logger instance /// Returns Logger and corresponding Receiver - pub fn new(config: &AppConfig) -> (Self, Receiver) { + pub fn new(config: &AppConfig) -> (Self, Receiver) { // Get queue capacity from configuration, or use default values 10000 let queue_capacity = config.logger.queue_capacity.unwrap_or(10000); let (sender, receiver) = mpsc::channel(queue_capacity); @@ -39,83 +39,80 @@ impl Logger { self.queue_capacity } - /// Asynchronous logging of server logs - /// Attach the log to the current Span and generate a separate Tracing Event - #[tracing::instrument(skip(self), fields(log_source = "logger"))] - pub async fn log(&self, entry: LogEntry) -> Result<(), LogError> { - // Log messages to the current Span - tracing::Span::current() - .record("log_message", &entry.message) - .record("source", &entry.source); - println!("target start is {:?}", &entry.target); - let target = if let Some(target) = &entry.target { - target.clone() - } else { - "server_logs".to_string() - }; + /// Log a server entry + #[tracing::instrument(skip(self), fields(log_source = "logger_server"))] + pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), LogError> { + self.log_entry(UnifiedLogEntry::Server(entry)).await + } - println!("target end is {:?}", target); - // Generate independent Tracing Events with full LogEntry information - // Generate corresponding events according to level - match entry.level { - SerializableLevel(Level::ERROR) => { - tracing::error!( - target = %target.clone(), - timestamp = %entry.timestamp, - message = %entry.message, - source = %entry.source, - request_id = ?entry.request_id, - user_id = ?entry.user_id, - fields = ?entry.fields - ); + /// Log an audit entry + #[tracing::instrument(skip(self), fields(log_source = "logger_audit"))] + pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), LogError> { + self.log_entry(UnifiedLogEntry::Audit(entry)).await + } + + /// Log a console entry + #[tracing::instrument(skip(self), fields(log_source = "logger_console"))] + pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), LogError> { + self.log_entry(UnifiedLogEntry::Console(entry)).await + } + + /// Asynchronous logging of unified log entries + #[tracing::instrument(skip(self), fields(log_source = "logger"))] + pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), LogError> { + // Extract information for tracing based on entry type + match &entry { + UnifiedLogEntry::Server(server) => { + tracing::Span::current() + .record("log_level", &server.level.0.as_str()) + .record("log_message", &server.base.message.as_deref().unwrap_or("")) + .record("source", &server.source); + + // Generate tracing event based on log level + match server.level.0 { + Level::ERROR => { + tracing::error!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or("")); + } + Level::WARN => { + tracing::warn!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or("")); + } + Level::INFO => { + tracing::info!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or("")); + } + Level::DEBUG => { + tracing::debug!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or("")); + } + Level::TRACE => { + tracing::trace!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or("")); + } + } } - SerializableLevel(Level::WARN) => { - tracing::warn!( - target = %target.clone(), - timestamp = %entry.timestamp, - message = %entry.message, - source = %entry.source, - request_id = ?entry.request_id, - user_id = ?entry.user_id, - fields = ?entry.fields - ); - } - SerializableLevel(Level::INFO) => { + UnifiedLogEntry::Audit(audit) => { tracing::info!( - target = %target.clone(), - timestamp = %entry.timestamp, - message = %entry.message, - source = %entry.source, - request_id = ?entry.request_id, - user_id = ?entry.user_id, - fields = ?entry.fields + target: "audit_logs", + event = %audit.event, + api = %audit.api.name.as_deref().unwrap_or("unknown"), + message = %audit.base.message.as_deref().unwrap_or("") ); } - SerializableLevel(Level::DEBUG) => { - tracing::debug!( - target = %target.clone(), - timestamp = %entry.timestamp, - message = %entry.message, - source = %entry.source, - request_id = ?entry.request_id, - user_id = ?entry.user_id, - fields = ?entry.fields - ); - } - SerializableLevel(Level::TRACE) => { - tracing::trace!( - target = %target.clone(), - timestamp = %entry.timestamp, - message = %entry.message, - source = %entry.source, - request_id = ?entry.request_id, - user_id = ?entry.user_id, - fields = ?entry.fields + UnifiedLogEntry::Console(console) => { + let level_str = match console.level { + crate::LogKind::Info => "INFO", + crate::LogKind::Warning => "WARN", + crate::LogKind::Error => "ERROR", + crate::LogKind::Fatal => "FATAL", + }; + + tracing::info!( + target: "console_logs", + level = %level_str, + node = %console.node_name, + message = %console.console_msg ); } } - // Send logs to asynchronous queues to improve error handling + // Send logs to async queue with improved error handling match self.sender.try_send(entry) { Ok(_) => Ok(()), Err(mpsc::error::TrySendError::Full(entry)) => { @@ -150,28 +147,25 @@ impl Logger { /// use rustfs_obs::Logger; /// /// async fn example(logger: &Logger) { - /// let _ = logger.write_with_context("This is an information message", "example",Level::INFO, Some("target".to_string()),Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await; + /// let _ = logger.write_with_context("This is an information message", "example",Level::INFO, Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await; /// } pub async fn write_with_context( &self, message: &str, source: &str, level: Level, - target: Option, request_id: Option, user_id: Option, fields: Vec<(String, String)>, ) -> Result<(), LogError> { - self.log(LogEntry::new( - level, - message.to_string(), - source.to_string(), - target, - request_id, - user_id, - fields, - )) - .await + let base = BaseLogEntry::new().message(Some(message.to_string())).request_id(request_id); + + let server_entry = ServerLogEntry::new(level, source.to_string()) + .user_id(user_id) + .fields(fields) + .with_base(base); + + self.log_server_entry(server_entry).await } /// Write log @@ -194,16 +188,7 @@ impl Logger { /// } /// ``` pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), LogError> { - self.log(LogEntry::new( - level, - message.to_string(), - source.to_string(), - None, - None, - None, - Vec::new(), - )) - .await + self.write_with_context(message, source, level, None, None, Vec::new()).await } /// Shutdown the logger @@ -458,7 +443,6 @@ pub async fn log_trace(message: &str, source: &str) -> Result<(), LogError> { /// - `message`: Message to be logged /// - `source`: Source of the log /// - `level`: Log level -/// - `target`: Log target /// - `request_id`: Request ID /// - `user_id`: User ID /// - `fields`: Additional fields @@ -470,14 +454,13 @@ pub async fn log_trace(message: &str, source: &str) -> Result<(), LogError> { /// use rustfs_obs::log_with_context; /// /// async fn example() { -/// let _ = log_with_context("This is an information message", "example", Level::INFO, Some("target".to_string()), Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await; +/// let _ = log_with_context("This is an information message", "example", Level::INFO, Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await; /// } /// ``` pub async fn log_with_context( message: &str, source: &str, level: Level, - target: Option, request_id: Option, user_id: Option, fields: Vec<(String, String)>, @@ -485,6 +468,6 @@ pub async fn log_with_context( get_global_logger() .lock() .await - .write_with_context(message, source, level, target, request_id, user_id, fields) + .write_with_context(message, source, level, request_id, user_id, fields) .await } diff --git a/packages/obs/src/sink.rs b/packages/obs/src/sink.rs index 2363a69d..8eb1830d 100644 --- a/packages/obs/src/sink.rs +++ b/packages/obs/src/sink.rs @@ -1,4 +1,4 @@ -use crate::{AppConfig, LogEntry}; +use crate::{AppConfig, LogRecord, UnifiedLogEntry}; use async_trait::async_trait; use std::sync::Arc; use tokio::fs::OpenOptions; @@ -8,7 +8,7 @@ use tokio::io::AsyncWriteExt; /// Sink Trait definition, asynchronously write logs #[async_trait] pub trait Sink: Send + Sync { - async fn write(&self, entry: &LogEntry); + async fn write(&self, entry: &UnifiedLogEntry); } #[cfg(feature = "kafka")] @@ -18,7 +18,7 @@ pub struct KafkaSink { topic: String, batch_size: usize, batch_timeout_ms: u64, - entries: Arc>>, + entries: Arc>>, last_flush: Arc, } @@ -64,7 +64,7 @@ impl KafkaSink { async fn periodic_flush( producer: rdkafka::producer::FutureProducer, topic: String, - entries: Arc>>, + entries: Arc>>, last_flush: Arc, timeout_ms: u64, ) { @@ -88,7 +88,7 @@ impl KafkaSink { } } - async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec) { + async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec) { for entry in entries { let payload = match serde_json::to_string(&entry) { Ok(p) => p, @@ -98,7 +98,7 @@ impl KafkaSink { } }; - let span_id = entry.timestamp.to_rfc3339(); + let span_id = entry.get_timestamp().to_rfc3339(); let _ = producer .send( @@ -113,7 +113,7 @@ impl KafkaSink { #[cfg(feature = "kafka")] #[async_trait] impl Sink for KafkaSink { - async fn write(&self, entry: &LogEntry) { + async fn write(&self, entry: &UnifiedLogEntry) { let mut batch = self.entries.lock().await; batch.push(entry.clone()); @@ -129,7 +129,7 @@ impl Sink for KafkaSink { if should_flush_by_size || should_flush_by_time { // Existing flush logic - let entries_to_send: Vec = batch.drain(..).collect(); + let entries_to_send: Vec = batch.drain(..).collect(); let producer = self.producer.clone(); let topic = self.topic.clone(); @@ -203,7 +203,7 @@ impl WebhookSink { #[cfg(feature = "webhook")] #[async_trait] impl Sink for WebhookSink { - async fn write(&self, entry: &LogEntry) { + async fn write(&self, entry: &UnifiedLogEntry) { let mut retries = 0; let url = self.url.clone(); let entry_clone = entry.clone(); @@ -327,12 +327,17 @@ impl FileSink { #[cfg(feature = "file")] #[async_trait] impl Sink for FileSink { - async fn write(&self, entry: &LogEntry) { + async fn write(&self, entry: &UnifiedLogEntry) { let line = format!("{:?}\n", entry); let mut writer = self.writer.lock().await; if let Err(e) = writer.write_all(line.as_bytes()).await { - eprintln!("Failed to write log to file {}: {}", self.path, e); + eprintln!( + "Failed to write log to file {}: {},entry timestamp:{:?}", + self.path, + e, + entry.get_timestamp() + ); return; } diff --git a/packages/obs/src/worker.rs b/packages/obs/src/worker.rs index 1c40f47f..2d7ee2e1 100644 --- a/packages/obs/src/worker.rs +++ b/packages/obs/src/worker.rs @@ -1,9 +1,9 @@ -use crate::{entry::log::LogEntry, sink::Sink}; +use crate::{sink::Sink, UnifiedLogEntry}; use std::sync::Arc; use tokio::sync::mpsc::Receiver; /// Start the log processing worker thread -pub async fn start_worker(receiver: Receiver, sinks: Vec>) { +pub async fn start_worker(receiver: Receiver, sinks: Vec>) { let mut receiver = receiver; while let Some(entry) = receiver.recv().await { for sink in &sinks {