diff --git a/.docker/observability/grafana/dashboards/rustfs.json b/.docker/observability/grafana/dashboards/rustfs.json index 81d45a38..31e8fcc4 100644 --- a/.docker/observability/grafana/dashboards/rustfs.json +++ b/.docker/observability/grafana/dashboards/rustfs.json @@ -2850,6 +2850,1061 @@ ], "title": "Replication_Bandwidth_Limit", "type": "stat" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 88 + }, + "id": 300, + "panels": [], + "title": "Observability", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 0.05 + }, + { + "color": "red", + "value": 0.2 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 89 + }, + "id": 301, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_run_failures_total{job=~\"$job\"}[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_runs_total{job=~\"$job\"}[$__rate_interval])), 1e-9)", + "legendFormat": "failure ratio", + "range": true, + "refId": "A" + } + ], + "title": "Cleanup Failure Ratio", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 0.01 + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 89 + }, + "id": 302, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_rotation_failures_total{job=~\"$job\"}[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_rotation_total{job=~\"$job\"}[$__rate_interval])), 1e-9)", + "legendFormat": "rotation failure ratio", + "range": true, + "refId": "A" + } + ], + "title": "Rotation Failure Ratio", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + }, + { + "color": "red", + "value": 5 + } + ] + }, + "unit": "s" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "P50" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "P95" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "yellow", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "P99" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 97 + }, + "id": 303, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum by (le) (rate(rustfs_log_cleaner_rotation_duration_seconds_bucket{job=~\"$job\"}[$__rate_interval])))", + "legendFormat": "P50", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by (le) (rate(rustfs_log_cleaner_rotation_duration_seconds_bucket{job=~\"$job\"}[$__rate_interval])))", + "legendFormat": "P95", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum by (le) (rate(rustfs_log_cleaner_rotation_duration_seconds_bucket{job=~\"$job\"}[$__rate_interval])))", + "legendFormat": "P99", + "range": true, + "refId": "C" + } + ], + "title": "Rotation Duration Percentiles", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "fixedColor": "green", + "mode": "fixed" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 97 + }, + "id": 304, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(increase(rustfs_log_cleaner_freed_bytes_total{job=~\"$job\"}[$__range]))", + "legendFormat": "bytes freed", + "range": true, + "refId": "A" + } + ], + "title": "Log Space Freed (range total)", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "from": 0, + "result": { + "color": "red", + "index": 0, + "text": "INACTIVE" + }, + "to": 1e-9 + }, + "type": "range" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "green", + "value": 1e-10 + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 105 + }, + "id": 305, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_runs_total{job=~\"$job\"}[15m]))", + "legendFormat": "runs/s", + "range": true, + "refId": "A" + } + ], + "title": "Cleanup Activity", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 105 + }, + "id": 306, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_freed_bytes_total{job=~\"$job\"}[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_deleted_files_total{job=~\"$job\"}[$__rate_interval])), 1e-9)", + "legendFormat": "bytes/file", + "range": true, + "refId": "A" + } + ], + "title": "Compression Efficiency (bytes/file)", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 113 + }, + "id": 200, + "panels": [], + "title": "Log Cleaner", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 114 + }, + "id": 201, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_runs_total{job=~\"$job\"}[$__rate_interval]))", + "legendFormat": "runs/s", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_run_failures_total{job=~\"$job\"}[$__rate_interval]))", + "legendFormat": "failures/s", + "range": true, + "refId": "B" + } + ], + "title": "Cleanup Runs / Failures", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "Bps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 114 + }, + "id": 202, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_freed_bytes_total{job=~\"$job\"}[$__rate_interval]))", + "legendFormat": "bytes/s", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_deleted_files_total{job=~\"$job\"}[$__rate_interval]))", + "legendFormat": "files/s", + "range": true, + "refId": "B" + } + ], + "title": "Freed Bytes / Deleted Files", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 122 + }, + "id": 203, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket{job=~\"$job\"}[$__rate_interval])) by (le))", + "legendFormat": "p95", + "range": true, + "refId": "A" + } + ], + "title": "Compression P95 Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 122 + }, + "id": 204, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_rotation_total{job=~\"$job\"}[$__rate_interval]))", + "legendFormat": "rotation/s", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "sum(rate(rustfs_log_cleaner_rotation_failures_total{job=~\"$job\"}[$__rate_interval]))", + "legendFormat": "rotation_failures/s", + "range": true, + "refId": "B" + } + ], + "title": "Rotation Success / Failure", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 130 + }, + "id": 205, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "max(rustfs_log_cleaner_steal_success_rate{job=~\"$job\"})", + "legendFormat": "ratio", + "range": true, + "refId": "A" + } + ], + "title": "Steal Success Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 130 + }, + "id": 206, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "max(rustfs_log_cleaner_active_file_size_bytes{job=~\"$job\"})", + "legendFormat": "bytes", + "range": true, + "refId": "A" + } + ], + "title": "Active File Size", + "type": "timeseries" } ], "preload": false, @@ -2993,5 +4048,5 @@ "timezone": "browser", "title": "RustFS", "uid": "rustfs-s3", - "version": 10 + "version": 11 } diff --git a/Cargo.lock b/Cargo.lock index 63b7f755..e3a85062 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7764,10 +7764,14 @@ dependencies = [ name = "rustfs-obs" version = "0.0.5" dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", "flate2", "glob", "jiff", "metrics", + "num_cpus", "nvml-wrapper", "opentelemetry", "opentelemetry-appender-tracing", @@ -7779,6 +7783,7 @@ dependencies = [ "rustfs-config", "rustfs-utils", "serde", + "serde_json", "sysinfo", "temp-env", "tempfile", @@ -7789,6 +7794,7 @@ dependencies = [ "tracing-error", "tracing-opentelemetry", "tracing-subscriber", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 82b2f691..2fb58c88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -197,6 +197,9 @@ const-str = { version = "1.1.0", features = ["std", "proc"] } convert_case = "0.11.0" criterion = { version = "0.8", features = ["html_reports"] } crossbeam-queue = "0.3.12" +crossbeam-channel = "0.5.15" +crossbeam-deque = "0.8.6" +crossbeam-utils = "0.8.21" datafusion = "52.3.0" derive_builder = "0.20.2" enumset = "1.1.10" diff --git a/crates/config/src/observability/mod.rs b/crates/config/src/observability/mod.rs index 46cef701..1ce6fae5 100644 --- a/crates/config/src/observability/mod.rs +++ b/crates/config/src/observability/mod.rs @@ -48,6 +48,12 @@ pub const ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_TOTAL_SIZ pub const ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES"; pub const ENV_OBS_LOG_COMPRESS_OLD_FILES: &str = "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES"; pub const ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL"; +pub const ENV_OBS_LOG_COMPRESSION_ALGORITHM: &str = "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM"; +pub const ENV_OBS_LOG_PARALLEL_COMPRESS: &str = "RUSTFS_OBS_LOG_PARALLEL_COMPRESS"; +pub const ENV_OBS_LOG_PARALLEL_WORKERS: &str = "RUSTFS_OBS_LOG_PARALLEL_WORKERS"; +pub const ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL"; +pub const ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: &str = "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP"; +pub const ENV_OBS_LOG_ZSTD_WORKERS: &str = "RUSTFS_OBS_LOG_ZSTD_WORKERS"; pub const ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: &str = "RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS"; pub const ENV_OBS_LOG_EXCLUDE_PATTERNS: &str = "RUSTFS_OBS_LOG_EXCLUDE_PATTERNS"; pub const ENV_OBS_LOG_DELETE_EMPTY_FILES: &str = "RUSTFS_OBS_LOG_DELETE_EMPTY_FILES"; @@ -61,13 +67,24 @@ pub const DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES: u64 = 2 * 1024 * 1024 * 1024; // pub const DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: u64 = 0; // No single file limit pub const DEFAULT_OBS_LOG_COMPRESS_OLD_FILES: bool = true; pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL: u32 = 6; +pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP: &str = "gzip"; +pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD: &str = "zstd"; +pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM: &str = DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD; +pub const DEFAULT_OBS_LOG_PARALLEL_COMPRESS: bool = true; +pub const DEFAULT_OBS_LOG_PARALLEL_WORKERS: usize = 6; +pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL: i32 = 8; +pub const DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: bool = true; +pub const DEFAULT_OBS_LOG_ZSTD_WORKERS: usize = 1; pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION: &str = "gz"; pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION); +pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION: &str = "zst"; +pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION); pub const DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: u64 = 30; // Retain compressed files for 30 days pub const DEFAULT_OBS_LOG_DELETE_EMPTY_FILES: bool = true; pub const DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS: u64 = 3600; // 1 hour pub const DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS: u64 = 1800; // 0.5 hours pub const DEFAULT_OBS_LOG_DRY_RUN: bool = false; +pub const DEFAULT_OBS_LOG_MATCH_MODE_PREFIX: &str = "prefix"; pub const DEFAULT_OBS_LOG_MATCH_MODE: &str = "suffix"; /// Default values for observability configuration @@ -113,6 +130,12 @@ mod tests { assert_eq!(ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES"); assert_eq!(ENV_OBS_LOG_COMPRESS_OLD_FILES, "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES"); assert_eq!(ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL"); + assert_eq!(ENV_OBS_LOG_COMPRESSION_ALGORITHM, "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM"); + assert_eq!(ENV_OBS_LOG_PARALLEL_COMPRESS, "RUSTFS_OBS_LOG_PARALLEL_COMPRESS"); + assert_eq!(ENV_OBS_LOG_PARALLEL_WORKERS, "RUSTFS_OBS_LOG_PARALLEL_WORKERS"); + assert_eq!(ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL"); + assert_eq!(ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP"); + assert_eq!(ENV_OBS_LOG_ZSTD_WORKERS, "RUSTFS_OBS_LOG_ZSTD_WORKERS"); assert_eq!( ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, "RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS" @@ -131,6 +154,10 @@ mod tests { assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development"); assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test"); assert_eq!(DEFAULT_OBS_ENVIRONMENT_STAGING, "staging"); + assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP, "gzip"); + assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, "zstd"); + assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX, "prefix"); assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE, "suffix"); + assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, "zstd"); } } diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index cb8310f2..85fe9a65 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -40,6 +40,10 @@ flate2 = { workspace = true } glob = { workspace = true } jiff = { workspace = true } metrics = { workspace = true } +crossbeam-channel = { workspace = true } +crossbeam-deque = { workspace = true } +crossbeam-utils = { workspace = true } +num_cpus = { workspace = true } nvml-wrapper = { workspace = true, optional = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } @@ -56,6 +60,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", " tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] } sysinfo = { workspace = true } thiserror = { workspace = true } +zstd = { workspace = true, features = ["zstdmt"] } [target.'cfg(unix)'.dependencies] pyroscope = { workspace = true, features = ["backend-pprof-rs"] } @@ -65,3 +70,4 @@ pyroscope = { workspace = true, features = ["backend-pprof-rs"] } tokio = { workspace = true, features = ["full"] } tempfile = { workspace = true } temp-env = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/obs/README.md b/crates/obs/README.md index 13c6a456..59c07905 100644 --- a/crates/obs/README.md +++ b/crates/obs/README.md @@ -14,7 +14,7 @@ logging, distributed tracing, metrics via OpenTelemetry, and continuous profilin | **Distributed tracing** | OTLP/HTTP export to Jaeger, Tempo, or any OTel collector | | **Metrics** | OTLP/HTTP export, bridged from the `metrics` crate facade | | **Continuous Profiling** | CPU/Memory profiling export to Pyroscope | -| **Log cleanup** | Background task: size limits, gzip compression, retention policies | +| **Log cleanup** | Background task: size limits, zstd/gzip compression, retention policies | | **GPU metrics** *(optional)* | Enable with the `gpu` feature flag | --- @@ -148,9 +148,15 @@ All configuration is read from environment variables at startup. |-------------------------------------------------|--------------|-------------------------------------------------------------| | `RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES` | `2147483648` | Hard cap on total log directory size (2 GiB) | | `RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES` | `0` | Per-file size cap; `0` = unlimited | -| `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | `true` | Gzip-compress files before deleting | +| `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | `true` | Compress files before deleting | | `RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL` | `6` | Gzip level `1` (fastest) – `9` (best) | -| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | `30` | Delete `.gz` archives older than N days; `0` = keep forever | +| `RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM` | `zstd` | Compression codec: `zstd` or `gzip` | +| `RUSTFS_OBS_LOG_PARALLEL_COMPRESS` | `true` | Enable work-stealing parallel compression | +| `RUSTFS_OBS_LOG_PARALLEL_WORKERS` | `6` | Number of cleaner worker threads | +| `RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL` | `8` | Zstd level `1` (fastest) – `21` (best ratio) | +| `RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP` | `true` | Fallback to gzip when zstd compression fails | +| `RUSTFS_OBS_LOG_ZSTD_WORKERS` | `1` | zstdmt worker threads per compression task | +| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | `30` | Delete `.gz` / `.zst` archives older than N days; `0` = keep forever | | `RUSTFS_OBS_LOG_EXCLUDE_PATTERNS` | _(empty)_ | Comma-separated glob patterns to never clean up | | `RUSTFS_OBS_LOG_DELETE_EMPTY_FILES` | `true` | Remove zero-byte files | | `RUSTFS_OBS_LOG_MIN_FILE_AGE_SECONDS` | `3600` | Minimum file age (seconds) before cleanup | @@ -159,6 +165,159 @@ All configuration is read from environment variables at startup. --- +## Cleaner & Rotation Metrics + +The log rotation and cleanup pipeline emits these metrics (via the `metrics` facade): + +| Metric | Type | Description | +|---|---|---| +| `rustfs.log_cleaner.deleted_files_total` | counter | Number of files deleted per cleanup pass | +| `rustfs.log_cleaner.freed_bytes_total` | counter | Bytes reclaimed by deletion | +| `rustfs.log_cleaner.compress_duration_seconds` | histogram | Compression stage duration | +| `rustfs.log_cleaner.steal_success_rate` | gauge | Work-stealing success ratio in parallel mode | +| `rustfs.log_cleaner.runs_total` | counter | Successful cleanup loop runs | +| `rustfs.log_cleaner.run_failures_total` | counter | Failed or panicked cleanup loop runs | +| `rustfs.log_cleaner.rotation_total` | counter | Successful file rotations | +| `rustfs.log_cleaner.rotation_failures_total` | counter | Failed file rotations | +| `rustfs.log_cleaner.rotation_duration_seconds` | histogram | Rotation latency | +| `rustfs.log_cleaner.active_file_size_bytes` | gauge | Current active log file size | + +These metrics cover compression, cleanup, and file rotation end-to-end. + +### Metric Semantics + +- `deleted_files_total` and `freed_bytes_total` are emitted after each cleanup pass and include both normal log cleanup and expired compressed archive cleanup. +- `compress_duration_seconds` measures compression stage wall-clock time for both serial and parallel modes. +- `steal_success_rate` is updated by the parallel work-stealing path and remains at the last computed value. +- `rotation_*` metrics are emitted by `RollingAppender` and include retries; a failed final rotation increments `rotation_failures_total`. +- `active_file_size_bytes` is sampled on writes and after successful roll, so dashboards can track current active file growth. + +### Grafana Dashboard JSON Draft (Ready to Import) + +> Save this as `rustfs-log-cleaner-dashboard.json`, then import from Grafana UI. +> For Prometheus datasources, metric names are usually normalized to underscores, +> so `rustfs.log_cleaner.deleted_files_total` becomes `rustfs_log_cleaner_deleted_files_total`. +> +> The same panels are now checked in at: +> `.docker/observability/grafana/dashboards/rustfs.json` +> (row title: `Log Cleaner`). + +```json +{ + "uid": "rustfs-log-cleaner", + "title": "RustFS Log Cleaner", + "timezone": "browser", + "schemaVersion": 39, + "version": 1, + "refresh": "10s", + "tags": ["rustfs", "observability", "log-cleaner"], + "time": { + "from": "now-6h", + "to": "now" + }, + "panels": [ + { + "id": 1, + "title": "Cleanup Runs / Failures", + "type": "timeseries", + "targets": [ + { "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_runs_total[5m]))", "legendFormat": "runs/s" }, + { "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_run_failures_total[5m]))", "legendFormat": "failures/s" } + ], + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 } + }, + { + "id": 2, + "title": "Freed Bytes / Deleted Files", + "type": "timeseries", + "targets": [ + { "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_freed_bytes_total[15m]))", "legendFormat": "bytes/s" }, + { "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_deleted_files_total[15m]))", "legendFormat": "files/s" } + ], + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 } + }, + { + "id": 3, + "title": "Compression P95 Latency", + "type": "timeseries", + "targets": [ + { + "refId": "A", + "expr": "histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket[5m])) by (le))", + "legendFormat": "p95" + } + ], + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 } + }, + { + "id": 4, + "title": "Rotation Success / Failure", + "type": "timeseries", + "targets": [ + { "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_rotation_total[5m]))", "legendFormat": "rotation/s" }, + { "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_rotation_failures_total[5m]))", "legendFormat": "rotation_failures/s" } + ], + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 } + }, + { + "id": 5, + "title": "Steal Success Rate", + "type": "timeseries", + "targets": [ + { "refId": "A", "expr": "max(rustfs_log_cleaner_steal_success_rate)", "legendFormat": "ratio" } + ], + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 16 } + }, + { + "id": 6, + "title": "Active File Size", + "type": "timeseries", + "targets": [ + { "refId": "A", "expr": "max(rustfs_log_cleaner_active_file_size_bytes)", "legendFormat": "bytes" } + ], + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 16 } + } + ] +} +``` + +### PromQL Templates + +Use these templates directly in Grafana panels/alerts. + +- **Cleanup run rate** + - `sum(rate(rustfs_log_cleaner_runs_total[$__rate_interval]))` +- **Cleanup failure rate** + - `sum(rate(rustfs_log_cleaner_run_failures_total[$__rate_interval]))` +- **Cleanup failure ratio** + - `sum(rate(rustfs_log_cleaner_run_failures_total[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_runs_total[$__rate_interval])), 1e-9)` +- **Freed bytes throughput** + - `sum(rate(rustfs_log_cleaner_freed_bytes_total[$__rate_interval]))` +- **Deleted files throughput** + - `sum(rate(rustfs_log_cleaner_deleted_files_total[$__rate_interval]))` +- **Compression p95 latency** + - `histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket[$__rate_interval])) by (le))` +- **Rotation failure ratio** + - `sum(rate(rustfs_log_cleaner_rotation_failures_total[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_rotation_total[$__rate_interval])), 1e-9)` +- **Work-stealing efficiency (latest)** + - `max(rustfs_log_cleaner_steal_success_rate)` +- **Active file size (latest)** + - `max(rustfs_log_cleaner_active_file_size_bytes)` + +### Suggested Alerts + +- **CleanupFailureRatioHigh**: failure ratio > 0.05 for 10m. +- **CompressionLatencyP95High**: p95 above your baseline SLO for 15m. +- **RotationFailuresDetected**: rotation failure rate > 0 for 3 consecutive windows. +- **NoCleanupActivity**: runs rate == 0 for expected active environments. + +### Metrics Compatibility + +The project is currently in active development. Metric names and labels are updated directly when architecture evolves, and no backward-compatibility shim is maintained for old names. +Use the metric names documented in this README as the current source of truth. + +--- + ## Examples ### Stdout-only (development default) @@ -207,6 +366,19 @@ export RUSTFS_OBS_LOG_DRY_RUN=true # Observe log output — no files will actually be deleted. ``` +### Parallel zstd cleanup (recommended production profile) + +```bash +export RUSTFS_OBS_LOG_DIRECTORY=/var/log/rustfs +export RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM=zstd +export RUSTFS_OBS_LOG_PARALLEL_COMPRESS=true +export RUSTFS_OBS_LOG_PARALLEL_WORKERS=6 +export RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL=8 +export RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP=true +export RUSTFS_OBS_LOG_ZSTD_WORKERS=1 +./rustfs +``` + --- ## Module Structure @@ -229,9 +401,9 @@ rustfs-obs/src/ │ ├── cleaner/ # Background log-file cleanup subsystem │ ├── mod.rs # LogCleaner public API + tests -│ ├── types.rs # FileInfo shared type +│ ├── types.rs # Shared cleaner types (match mode, compression codec, FileInfo) │ ├── scanner.rs # Filesystem discovery -│ ├── compress.rs # Gzip compression helper +│ ├── compress.rs # Gzip/Zstd compression helper │ └── core.rs # Selection, compression, deletion logic │ └── system/ # Host metrics (CPU, memory, disk, GPU) diff --git a/crates/obs/src/cleaner/README.md b/crates/obs/src/cleaner/README.md index 7f59a2b7..70e32fed 100644 --- a/crates/obs/src/cleaner/README.md +++ b/crates/obs/src/cleaner/README.md @@ -1,71 +1,130 @@ # Log Cleaner Subsystem -The `cleaner` module provides a robust, background log-file lifecycle manager for RustFS. It is designed to run periodically to enforce retention policies, compress old logs, and prevent disk exhaustion. +The `cleaner` module is a production-focused background lifecycle manager for RustFS log archives. +It periodically discovers rolled files, applies retention constraints, compresses candidates, and then deletes sources safely. -## Architecture +The subsystem is designed to be conservative by default: -The cleaner operates as a pipeline: +- it never touches the currently active log file; +- it refuses symlink deletion during the destructive phase; +- it keeps compression and source deletion as separate steps; +- it supports a full dry-run mode for policy verification. -1. **Discovery (`scanner.rs`)**: Scans the configured log directory for eligible files. - * **Non-recursive**: Only scans the top-level directory for safety. - * **Filtering**: Ignores the currently active log file, files matching exclude patterns, and files that do not match the configured prefix/suffix pattern. - * **Performance**: Uses `std::fs::read_dir` directly to minimize overhead and syscalls. +## Execution Pipeline -2. **Selection (`core.rs`)**: Applies retention policies to select files for deletion. - * **Keep Count**: Ensures at least `N` recent files are kept. - * **Total Size**: Deletes oldest files if the total size exceeds the limit. - * **Single File Size**: Deletes individual files that exceed a size limit (e.g., runaway logs). +1. **Discovery (`scanner.rs`)** + - Performs a shallow `read_dir` scan (no recursion) for predictable latency. + - Excludes the active log file, exclusion-pattern matches, and files younger than the age threshold. + - Classifies regular logs and compressed archives (`.gz` / `.zst`) in one pass. -3. **Action (`core.rs` / `compress.rs`)**: - * **Compression**: Optionally compresses selected files using Gzip (level 1-9) before deletion. - * **Deletion**: Removes the original file (and eventually the compressed archive based on retention days). +2. **Selection (`core.rs`)** + - Enforces keep-count first. + - Applies total-size and single-file-size constraints to oldest files. + - Produces an ordered list of files to process. -## Configuration +3. **Compression + Deletion (`core.rs` + `compress.rs`)** + - Supports `zstd` and `gzip` codecs. + - Uses parallel work stealing when enabled (`Injector + Worker::new_fifo + Stealer`). + - Always deletes source files in a serial pass after compression to minimize file-lock race issues. -The cleaner is configured via `LogCleanerBuilder`. When initialized via `rustfs-obs::init_obs`, it reads from environment variables. +4. **Archive Expiry (`core.rs`)** + - Applies a separate retention window to already-compressed files. + - Keeps archive expiration independent from plain-log keep-count logic. -| Parameter | Env Var | Description | -|-----------|---------|-------------| -| `log_dir` | `RUSTFS_OBS_LOG_DIRECTORY` | The directory to scan. | -| `file_pattern` | `RUSTFS_OBS_LOG_FILENAME` | The base filename pattern (e.g., `rustfs.log`). | -| `active_filename` | (Derived) | The exact name of the currently active log file, excluded from cleanup. | -| `match_mode` | `RUSTFS_OBS_LOG_MATCH_MODE` | `prefix` or `suffix`. Determines how `file_pattern` is matched against filenames. | -| `keep_files` | `RUSTFS_OBS_LOG_KEEP_FILES` | Minimum number of rolling log files to keep. | -| `max_total_size_bytes` | `RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES` | Maximum aggregate size of all log files. Oldest files are deleted to satisfy this. | -| `compress_old_files` | `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | If `true`, files selected for removal are first gzipped. | -| `compressed_file_retention_days` | `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | Age in days after which `.gz` files are deleted. | +## Compression Modes -## Timestamp Format & Rotation +- **Primary codec**: `zstd` (default) for better ratio and faster decompression. +- **Fallback codec**: `gzip` when zstd fallback is enabled. +- **Dry-run**: reports planned compression/deletion operations without touching filesystem state. -The cleaner works in tandem with the `RollingAppender` in `telemetry/rolling.rs`. +## Safety Model -* **Rotation**: Logs are rotated based on time (Daily/Hourly/Minutely) or Size. -* **Naming**: Archived logs use a high-precision timestamp format: `YYYYMMDDHHMMSS.uuuuuu` (microseconds), plus a unique counter to prevent collisions. - * **Suffix Mode**: `-.` (e.g., `20231027103001.123456-0.rustfs.log`) - * **Prefix Mode**: `.-` (e.g., `rustfs.log.20231027103001.123456-0`) +- **No recursive traversal**: the scanner only inspects the immediate log directory. +- **No symlink following**: filesystem metadata is collected with `symlink_metadata`. +- **Idempotent archives**: an existing `*.gz` or `*.zst` target means the file is treated as already compressed. +- **Best-effort cleanup**: individual file failures are logged and do not abort the whole maintenance pass. -This high-precision naming ensures that files sort chronologically by name, and collisions are virtually impossible even under high load. +## Work-Stealing Strategy -## Usage Example +The parallel path in `core.rs` uses this fixed lookup sequence per worker: + +1. `local_worker.pop()` +2. `injector.steal_batch_and_pop(&local_worker)` +3. randomized victim polling via `Steal::from_iter(...)` + +This strategy keeps local cache affinity while still balancing stragglers. + +## Metrics and Tracing + +The cleaner emits tracing events and runtime metrics: + +- `rustfs.log_cleaner.deleted_files_total` (counter) +- `rustfs.log_cleaner.freed_bytes_total` (counter) +- `rustfs.log_cleaner.compress_duration_seconds` (histogram) +- `rustfs.log_cleaner.steal_success_rate` (gauge) +- `rustfs.log_cleaner.rotation_total` (counter) +- `rustfs.log_cleaner.rotation_failures_total` (counter) +- `rustfs.log_cleaner.rotation_duration_seconds` (histogram) +- `rustfs.log_cleaner.active_file_size_bytes` (gauge) + +These values can be wired into dashboards and alert rules for cleanup health. + +## Retention Decision Order + +For regular logs, the cleaner evaluates candidates in this order: + +1. keep at least `keep_files` newest matching generations; +2. remove older files if total retained size still exceeds `max_total_size_bytes`; +3. remove any file whose individual size exceeds `max_single_file_size_bytes`; +4. if compression is enabled, archive before deletion; +5. delete the original file only after successful compression. + +## Key Environment Variables + +| Env Var | Meaning | +|---|---| +| `RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM` | `zstd` or `gzip` | +| `RUSTFS_OBS_LOG_PARALLEL_COMPRESS` | Enable work-stealing compression | +| `RUSTFS_OBS_LOG_PARALLEL_WORKERS` | Worker count for parallel compressor | +| `RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL` | Zstd level (1-21) | +| `RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP` | Fallback switch on zstd failure | +| `RUSTFS_OBS_LOG_ZSTD_WORKERS` | zstdmt worker threads per compression task | +| `RUSTFS_OBS_LOG_DRY_RUN` | Dry-run mode | +| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | Retention window for `*.gz` / `*.zst` archives | +| `RUSTFS_OBS_LOG_DELETE_EMPTY_FILES` | Remove zero-byte regular log files during scanning | +| `RUSTFS_OBS_LOG_MIN_FILE_AGE_SECONDS` | Minimum age for regular log eligibility | + +## Builder Example ```rust use rustfs_obs::LogCleaner; -use rustfs_obs::types::FileMatchMode; +use rustfs_obs::types::{CompressionAlgorithm, FileMatchMode}; use std::path::PathBuf; let cleaner = LogCleaner::builder( PathBuf::from("/var/log/rustfs"), - "rustfs.log.".to_string(), + "rustfs.log".to_string(), "rustfs.log".to_string(), ) -.match_mode(FileMatchMode::Prefix) -.keep_files(10) -.max_total_size_bytes(1024 * 1024 * 100) // 100 MB +.match_mode(FileMatchMode::Suffix) +.keep_files(30) +.max_total_size_bytes(2 * 1024 * 1024 * 1024) .compress_old_files(true) +.compression_algorithm(CompressionAlgorithm::Zstd) +.parallel_compress(true) +.parallel_workers(6) +.zstd_compression_level(8) +.zstd_fallback_to_gzip(true) +.zstd_workers(1) +.dry_run(false) .build(); -// Run cleanup (blocking operation, spawn in a background task) -if let Ok((deleted, freed)) = cleaner.cleanup() { - println!("Cleaned up {} files, freed {} bytes", deleted, freed); -} +let _ = cleaner.cleanup(); ``` + +## Operational Notes + +- Prefer `FileMatchMode::Suffix` when rotations prepend timestamps to the filename. +- Prefer `FileMatchMode::Prefix` when rotations append counters or timestamps after a stable base name. +- Keep `parallel_workers` modest when `zstd_workers` is greater than `1`, because each compression task may already use internal codec threads. + diff --git a/crates/obs/src/cleaner/compress.rs b/crates/obs/src/cleaner/compress.rs index 2810c80e..26c44a0a 100644 --- a/crates/obs/src/cleaner/compress.rs +++ b/crates/obs/src/cleaner/compress.rs @@ -12,66 +12,205 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Gzip compression helper for old log files. +//! Compression helpers for old log files. //! -//! Files are compressed in place: `` → `.gz`. The original file -//! is **not** deleted here — deletion is handled by the caller after -//! compression succeeds. +//! This module performs compression only. Source-file deletion is intentionally +//! handled by the caller in a separate step to avoid platform-specific file +//! locking issues (especially on Windows). +//! +//! The separation is important for operational safety: a failed archive write +//! must never result in premature source deletion, and an already-existing +//! archive should make repeated cleanup passes idempotent. +use super::types::CompressionAlgorithm; use flate2::Compression; use flate2::write::GzEncoder; -use rustfs_config::observability::DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION; +use std::ffi::OsString; use std::fs::File; use std::io::{BufReader, BufWriter, Write}; -use std::path::Path; -use tracing::{debug, info}; +use std::path::{Path, PathBuf}; +use tracing::{debug, info, warn}; -/// Compress `path` to `.gz` using gzip. +/// Compression options shared by serial and parallel cleaner paths. /// -/// If a `.gz` file for the given path already exists the function returns -/// `Ok(())` immediately without overwriting the existing archive. -/// -/// # Arguments -/// * `path` - Path to the uncompressed log file. -/// * `level` - Gzip compression level (`1`–`9`); clamped automatically. -/// * `dry_run` - When `true`, log what would be done without writing anything. -/// -/// # Errors -/// Propagates any I/O error encountered while opening, reading, writing, or -/// flushing files. -pub(super) fn compress_file(path: &Path, level: u32, dry_run: bool) -> Result<(), std::io::Error> { - let gz_path = path.with_extension(DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION); +/// The core cleaner prepares this immutable bundle once per cleanup pass and +/// then hands it to each worker so all files in that run use the same policy. +#[derive(Debug, Clone)] +pub(super) struct CompressionOptions { + /// Preferred compression codec for the current cleanup pass. + pub algorithm: CompressionAlgorithm, + /// Gzip level (1..=9). + pub gzip_level: u32, + /// Zstd level (1..=21). + pub zstd_level: i32, + /// Internal zstd worker threads used by zstdmt. + pub zstd_workers: usize, + /// If true, fallback to gzip when zstd encoding fails. + pub zstd_fallback_to_gzip: bool, + /// Dry-run mode reports planned actions without writing files. + pub dry_run: bool, +} - if gz_path.exists() { - debug!("Compressed file already exists, skipping: {:?}", gz_path); - return Ok(()); +/// Compression output metadata used for metrics and deletion gating. +/// +/// Callers inspect the output size and archive path before deciding whether it +/// is safe to remove the source log file. +#[derive(Debug, Clone)] +pub(super) struct CompressionOutput { + /// Final path of the compressed archive file. + pub archive_path: PathBuf, + /// Codec actually used to produce the output. + pub algorithm_used: CompressionAlgorithm, + /// Input bytes before compression. + pub input_bytes: u64, + /// Compressed output bytes on disk. + pub output_bytes: u64, +} + +/// Compress a single source file with the requested codec and fallback policy. +/// +/// This function centralizes the fallback behavior so the orchestration layer +/// does not need per-codec branching. +pub(super) fn compress_file(path: &Path, options: &CompressionOptions) -> Result { + match options.algorithm { + CompressionAlgorithm::Gzip => compress_gzip(path, options.gzip_level, options.dry_run), + CompressionAlgorithm::Zstd => match compress_zstd(path, options.zstd_level, options.zstd_workers, options.dry_run) { + Ok(output) => Ok(output), + Err(err) if options.zstd_fallback_to_gzip => { + warn!( + file = ?path, + error = %err, + "zstd compression failed, fallback to gzip" + ); + compress_gzip(path, options.gzip_level, options.dry_run) + } + Err(err) => Err(err), + }, + } +} + +/// Compress a file to `*.gz` using the configured gzip level. +fn compress_gzip(path: &Path, level: u32, dry_run: bool) -> Result { + let archive_path = archive_path(path, CompressionAlgorithm::Gzip); + compress_with_writer(path, &archive_path, dry_run, CompressionAlgorithm::Gzip, |reader, writer| { + let mut encoder = GzEncoder::new(writer, Compression::new(level.clamp(1, 9))); + let written = std::io::copy(reader, &mut encoder)?; + let mut out = encoder.finish()?; + out.flush()?; + Ok(written) + }) +} + +/// Compress a file to `*.zst` using multi-threaded zstd when available. +fn compress_zstd(path: &Path, level: i32, workers: usize, dry_run: bool) -> Result { + let archive_path = archive_path(path, CompressionAlgorithm::Zstd); + compress_with_writer(path, &archive_path, dry_run, CompressionAlgorithm::Zstd, |reader, writer| { + let mut encoder = zstd::stream::Encoder::new(writer, level.clamp(1, 21))?; + encoder.multithread(workers.max(1) as u32)?; + let written = std::io::copy(reader, &mut encoder)?; + let mut out = encoder.finish()?; + out.flush()?; + Ok(written) + }) +} + +fn compress_with_writer( + path: &Path, + archive_path: &Path, + dry_run: bool, + algorithm_used: CompressionAlgorithm, + mut writer_fn: F, +) -> Result +where + F: FnMut(&mut BufReader, BufWriter) -> Result, +{ + // Keep idempotent behavior: existing archive means this file has already + // been handled in a previous cleanup pass. + if archive_path.exists() { + debug!(file = ?archive_path, "compressed archive already exists, skipping"); + let input_bytes = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0); + let output_bytes = std::fs::metadata(archive_path).map(|m| m.len()).unwrap_or(0); + return Ok(CompressionOutput { + archive_path: archive_path.to_path_buf(), + algorithm_used, + input_bytes, + output_bytes, + }); } + let input_bytes = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0); if dry_run { - info!("[DRY RUN] Would compress file: {:?} -> {:?}", path, gz_path); - return Ok(()); + info!("[DRY RUN] Would compress file: {:?} -> {:?}", path, archive_path); + return Ok(CompressionOutput { + archive_path: archive_path.to_path_buf(), + algorithm_used, + input_bytes, + output_bytes: 0, + }); } + // Create the output archive only after the dry-run short-circuit so this + // helper remains side-effect free when the caller is evaluating policy. let input = File::open(path)?; - let output = File::create(&gz_path)?; + // Write to a temporary file first and then atomically rename it into place + // so callers never observe a partially written archive at `archive_path`. + let mut tmp_name = archive_path.as_os_str().to_owned(); + tmp_name.push(".tmp"); + let tmp_archive_path = std::path::PathBuf::from(tmp_name); + + let output = File::create(&tmp_archive_path)?; let mut reader = BufReader::new(input); - let mut writer = BufWriter::new(output); + let writer = BufWriter::new(output); - let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level.clamp(1, 9))); - std::io::copy(&mut reader, &mut encoder)?; - let compressed = encoder.finish()?; + if let Err(e) = writer_fn(&mut reader, writer) { + // Best-effort cleanup of the incomplete temporary archive. + let _ = std::fs::remove_file(&tmp_archive_path); + return Err(e); + } - writer.write_all(&compressed)?; - writer.flush()?; + // Preserve Unix mode bits so rotated archives keep the same access policy. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + if let Ok(src_meta) = std::fs::metadata(path) { + let mode = src_meta.permissions().mode(); + let _ = std::fs::set_permissions(&tmp_archive_path, std::fs::Permissions::from_mode(mode)); + } + } + + // Atomically move the fully written temp file into its final location. + std::fs::rename(&tmp_archive_path, archive_path)?; + + let output_bytes = std::fs::metadata(archive_path).map(|m| m.len()).unwrap_or(0); debug!( - "Compressed {:?} -> {:?} ({} bytes -> {} bytes)", - path, - gz_path, - std::fs::metadata(path).map(|m| m.len()).unwrap_or(0), - compressed.len() + file = ?path, + archive = ?archive_path, + input_bytes, + output_bytes, + algorithm = %algorithm_used, + "compression finished" ); - Ok(()) + Ok(CompressionOutput { + archive_path: archive_path.to_path_buf(), + algorithm_used, + input_bytes, + output_bytes, + }) +} + +/// Build `.` without replacing an existing extension. +/// +/// Rotated log filenames often already encode a generation number or suffix +/// (for example `server.log.3`). Appending the archive extension preserves that +/// information in the final artifact name (`server.log.3.gz`). +fn archive_path(path: &Path, algorithm: CompressionAlgorithm) -> PathBuf { + let ext = algorithm.extension(); + let mut file_name: OsString = path + .file_name() + .map_or_else(|| OsString::from("archive"), |n| n.to_os_string()); + file_name.push(format!(".{ext}")); + path.with_file_name(file_name) } diff --git a/crates/obs/src/cleaner/core.rs b/crates/obs/src/cleaner/core.rs index 6c8dbc81..69980c0c 100644 --- a/crates/obs/src/cleaner/core.rs +++ b/crates/obs/src/cleaner/core.rs @@ -14,67 +14,89 @@ //! Core log-file cleanup orchestration. //! -//! [`LogCleaner`] is the public entry point for the cleanup subsystem. -//! Construct it with [`LogCleaner::builder`] and call [`LogCleaner::cleanup`] -//! periodically (e.g. from a `tokio::spawn`-ed loop). -//! -//! Internally the cleaner delegates to: -//! - [`super::scanner`] — to discover which files exist and which are eligible, -//! - [`super::compress`] — to gzip-compress files before they are deleted, -//! - [`LogCleaner::select_files_to_delete`] — to apply count / size limits. +//! This module connects scanning, retention selection, compression, and safe +//! deletion into one reusable service object. The public surface is intentionally +//! small: callers configure a [`LogCleaner`] once and then trigger discrete +//! cleanup passes whenever log rotation or background maintenance requires it. -use super::compress::compress_file; +use super::compress::{CompressionOptions, compress_file}; use super::scanner::{LogScanResult, scan_log_directory}; -use super::types::{FileInfo, FileMatchMode}; +use super::types::{CompressionAlgorithm, FileInfo, FileMatchMode, default_parallel_workers}; +use crate::global::{ + METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS, METRIC_LOG_CLEANER_DELETED_FILES_TOTAL, METRIC_LOG_CLEANER_FREED_BYTES_TOTAL, + METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE, +}; +use crossbeam_channel::bounded; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use crossbeam_utils::thread; +use metrics::{counter, gauge, histogram}; use rustfs_config::DEFAULT_LOG_KEEP_FILES; use std::path::PathBuf; -use std::time::SystemTime; -use tracing::{debug, error, info}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant, SystemTime}; +use tracing::{debug, error, info, warn}; + +#[derive(Debug)] +struct CompressionTaskResult { + /// Original file metadata so successful workers can be deleted later. + file: FileInfo, + /// Whether compression completed successfully for this file. + compressed: bool, +} /// Log-file lifecycle manager. /// -/// Holds all cleanup policy parameters and exposes a single [`cleanup`] method -/// that performs one full cleanup pass. -/// -/// # Thread-safety -/// `LogCleaner` is `Send + Sync`. Multiple callers can share a reference -/// (e.g. via `Arc`) and call `cleanup` concurrently without data races, -/// because no mutable state is mutated after construction. +/// A cleaner instance is immutable after construction and therefore safe to +/// reuse across periodic background jobs. Each call to [`LogCleaner::cleanup`] +/// performs a fresh directory scan and applies the configured retention rules. pub struct LogCleaner { - /// Directory containing the managed log files. + /// Directory containing the active and rotated log files. pub(super) log_dir: PathBuf, - /// Pattern string to match files (used as prefix or suffix). + /// Pattern used to recognize relevant log generations. pub(super) file_pattern: String, - /// Exact name of the active log file (to exclude from cleanup). + /// The currently active log file that must never be touched. pub(super) active_filename: String, - /// Whether to match by prefix or suffix. + /// Whether `file_pattern` is interpreted as a prefix or suffix. pub(super) match_mode: FileMatchMode, - /// The cleaner will never delete files if doing so would leave fewer than - /// this many files in the directory. + /// Minimum number of regular log files to keep regardless of size. pub(super) keep_files: usize, - /// Hard ceiling on the total bytes of all managed files; `0` = no limit. + /// Optional cap for the cumulative size of regular logs. pub(super) max_total_size_bytes: u64, - /// Hard ceiling on a single file's size; `0` = no per-file limit. + /// Optional cap for an individual regular log file. pub(super) max_single_file_size_bytes: u64, - /// Compress eligible files with gzip before removing them. + /// Whether selected regular logs should be compressed before deletion. pub(super) compress_old_files: bool, - /// Gzip compression level (`1`–`9`, clamped on construction). + /// Gzip compression level used when gzip is selected or used as fallback. pub(super) gzip_compression_level: u32, - /// Delete compressed archives older than this many days; `0` = keep forever. + /// Retention window for already compressed archives, expressed in days. pub(super) compressed_file_retention_days: u64, - /// Compiled glob patterns for files that must never be cleaned up. + /// Glob patterns that are excluded before any cleanup decision is made. pub(super) exclude_patterns: Vec, - /// Delete zero-byte files even when they are younger than `min_file_age_seconds`. + /// Whether zero-byte regular logs may be removed during scanning. pub(super) delete_empty_files: bool, - /// Files younger than this threshold (in seconds) are never touched. + /// Minimum age a regular log must reach before it becomes eligible. pub(super) min_file_age_seconds: u64, - /// When `true`, log what would be done without performing any destructive - /// filesystem operations. + /// Dry-run mode reports intended actions without modifying files. pub(super) dry_run: bool, + // Parallel compression controls while keeping backward compatibility with + // the original serial cleaner behavior. + /// Preferred archive codec for compression-enabled cleanup passes. + pub(super) compression_algorithm: CompressionAlgorithm, + /// Enables the work-stealing compression path when compression is active. + pub(super) parallel_compress: bool, + /// Number of worker threads used by the parallel compressor. + pub(super) parallel_workers: usize, + /// Zstd compression level when zstd is selected. + pub(super) zstd_compression_level: i32, + /// Whether a failed zstd attempt should retry with gzip. + pub(super) zstd_fallback_to_gzip: bool, + /// Number of internal threads requested from the zstd encoder. + pub(super) zstd_workers: usize, } impl LogCleaner { - /// Create a builder to construct a `LogCleaner`. + /// Create a builder with the required path and filename matching inputs. pub fn builder( log_dir: impl Into, file_pattern: impl Into, @@ -84,19 +106,6 @@ impl LogCleaner { } /// Perform one full cleanup pass. - /// - /// Steps: - /// 1. Scan the log directory for managed files (excluding the active file). - /// 2. Apply count/size policies to select files for deletion. - /// 3. Optionally compress selected files, then delete them. - /// 4. Collect and delete expired compressed archives. - /// - /// # Returns - /// A tuple `(deleted_count, freed_bytes)` covering all deletions in this - /// pass (both regular files and expired compressed archives). - /// - /// # Errors - /// Returns an [`std::io::Error`] if the log directory cannot be read. pub fn cleanup(&self) -> Result<(usize, u64), std::io::Error> { if !self.log_dir.exists() { debug!("Log directory does not exist: {:?}", self.log_dir); @@ -106,8 +115,6 @@ impl LogCleaner { let mut total_deleted = 0usize; let mut total_freed = 0u64; - // ── 1. Discover active log files (Archives only) ────────────────────── - // We explicitly pass `active_filename` to exclude it from the list. let LogScanResult { mut logs, mut compressed_archives, @@ -122,7 +129,6 @@ impl LogCleaner { self.dry_run, )?; - // ── 2. Select + compress + delete (Regular Logs) ────────────────────── if !logs.is_empty() { logs.sort_by_key(|f| f.modified); let total_size: u64 = logs.iter().map(|f| f.size).sum(); @@ -134,16 +140,20 @@ impl LogCleaner { total_size as f64 / 1024.0 / 1024.0 ); + // Select the oldest files first, then additionally trim any files + // that still violate configured size constraints. let to_delete = self.select_files_to_process(&logs, total_size); - if !to_delete.is_empty() { - let (d, f) = self.compress_and_delete(&to_delete)?; - total_deleted += d; - total_freed += f; + let (deleted, freed) = if self.parallel_compress && self.compress_old_files { + self.parallel_stealing_compress(&to_delete)? + } else { + self.serial_compress_and_delete(&to_delete)? + }; + total_deleted += deleted; + total_freed += freed; } } - // ── 3. Remove expired compressed archives ───────────────────────────── if !compressed_archives.is_empty() && self.compressed_file_retention_days > 0 { let expired = self.select_expired_compressed(&mut compressed_archives); if !expired.is_empty() { @@ -154,6 +164,8 @@ impl LogCleaner { } if total_deleted > 0 || total_freed > 0 { + counter!(METRIC_LOG_CLEANER_DELETED_FILES_TOTAL).increment(total_deleted as u64); + counter!(METRIC_LOG_CLEANER_FREED_BYTES_TOTAL).increment(total_freed); info!( "Cleanup completed: deleted {} files, freed {} bytes ({:.2} MB)", total_deleted, @@ -165,51 +177,31 @@ impl LogCleaner { Ok((total_deleted, total_freed)) } - // ─── Selection ──────────────────────────────────────────────────────────── - - /// Choose which files from `files` (sorted oldest-first) should be deleted. + /// Choose regular log files that should be compressed and/or deleted. /// - /// The algorithm respects three constraints in order: - /// 1. Always keep at least `keep_files` files (archives). - /// 2. Delete old files while the total size exceeds `max_total_size_bytes`. - /// 3. Delete any file whose individual size exceeds `max_single_file_size_bytes`. + /// The `files` slice must already be sorted from oldest to newest. The + /// method first preserves the newest `keep_files` generations, then applies + /// total-size and per-file-size limits to the remaining tail. pub(super) fn select_files_to_process(&self, files: &[FileInfo], total_size: u64) -> Vec { let mut to_delete = Vec::new(); - if files.is_empty() { return to_delete; } - // Calculate how many files we *must* delete to satisfy keep_files. let must_delete_count = files.len().saturating_sub(self.keep_files); - let mut current_size = total_size; for (idx, file) in files.iter().enumerate() { - // Condition 1: Enforce keep_files. - // If we are in the range of files that exceed the count limit, delete them. if idx < must_delete_count { current_size = current_size.saturating_sub(file.size); to_delete.push(file.clone()); continue; } - // Condition 2: Enforce max_total_size_bytes. let over_total = self.max_total_size_bytes > 0 && current_size > self.max_total_size_bytes; - - // Condition 3: Enforce max_single_file_size_bytes. - // Note: Since active file is excluded, if an archive is > max_single, it means it - // was rotated out being too large (likely) or we lowered the limit. It should be deleted. let over_single = self.max_single_file_size_bytes > 0 && file.size > self.max_single_file_size_bytes; - if over_total { - current_size = current_size.saturating_sub(file.size); - to_delete.push(file.clone()); - } else if over_single { - debug!( - "Archive exceeds single-file size limit: {:?} ({} > {} bytes). Deleting.", - file.path, file.size, self.max_single_file_size_bytes - ); + if over_total || over_single { current_size = current_size.saturating_sub(file.size); to_delete.push(file.clone()); } @@ -218,9 +210,9 @@ impl LogCleaner { to_delete } - /// Select compressed files that have exceeded the retention period. + /// Select compressed archives whose age exceeds the archive retention window. fn select_expired_compressed(&self, files: &mut [FileInfo]) -> Vec { - let retention = std::time::Duration::from_secs(self.compressed_file_retention_days * 24 * 3600); + let retention = Duration::from_secs(self.compressed_file_retention_days * 24 * 3600); let now = SystemTime::now(); let mut expired = Vec::new(); @@ -235,22 +227,259 @@ impl LogCleaner { expired } - // ─── Compression + deletion ─────────────────────────────────────────────── - - /// Securely delete a file, preventing symlink attacks (TOCTOU). + /// Parallel compressor with work stealing. /// - /// This function verifies that the path is not a symlink before attempting deletion. - /// While strictly speaking a race condition is still theoretically possible between - /// `symlink_metadata` and `remove_file`, this check covers the vast majority of - /// privilege escalation vectors where a user replaces a log file with a symlink - /// to a system file. - fn secure_delete(&self, path: &PathBuf) -> std::io::Result<()> { - // 1. Lstat (symlink_metadata) - do not follow links - let meta = std::fs::symlink_metadata(path)?; + /// The flow is intentionally split into "parallel compression" followed by + /// "serial deletion" to reduce cross-platform file-locking failures. + /// Compression workers only decide whether an archive was created; the main + /// thread remains responsible for actual source removal so deletion policy + /// and error reporting stay deterministic. + fn parallel_stealing_compress(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> { + if files.len() <= 1 { + return self.serial_compress_and_delete(files); + } - // 2. Symlink Check - // If it's a symlink, we NEVER delete it. It might point to /etc/passwd. - // In a log directory, symlinks are unexpected and dangerous. + let worker_count = self.parallel_workers.min(files.len()).max(1); + if worker_count <= 1 { + return self.serial_compress_and_delete(files); + } + + let compression_options = self.compression_options(); + let started_at = Instant::now(); + let injector = Arc::new(Injector::new()); + for file in files { + injector.push(file.clone()); + } + + let mut workers = Vec::with_capacity(worker_count); + let mut stealers = Vec::with_capacity(worker_count); + for _ in 0..worker_count { + let worker = Worker::new_fifo(); + stealers.push(worker.stealer()); + workers.push(worker); + } + + let stealers = Arc::new(stealers); + let steal_attempts = Arc::new(AtomicU64::new(0)); + let steal_successes = Arc::new(AtomicU64::new(0)); + let (tx, rx) = bounded::(worker_count.saturating_mul(2).max(8)); + + // Spawn a fixed-size worker set in a scoped region so panics are + // contained and can be downgraded to a serial fallback instead of + // leaking detached threads. + let scope_result = thread::scope(|scope| { + for (worker_id, local_worker) in workers.into_iter().enumerate() { + let tx = tx.clone(); + let injector = Arc::clone(&injector); + let stealers = Arc::clone(&stealers); + let options = compression_options.clone(); + let attempts = Arc::clone(&steal_attempts); + let successes = Arc::clone(&steal_successes); + + scope.spawn(move |_| { + let mut seed = (worker_id as u64 + 1) + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + + loop { + // Search order: local FIFO -> global injector batch -> + // random victim stealers. + let task = if let Some(file) = local_worker.pop() { + Some(file) + } else { + match injector.steal_batch_and_pop(&local_worker) { + Steal::Success(file) => { + attempts.fetch_add(1, Ordering::Relaxed); + successes.fetch_add(1, Ordering::Relaxed); + Some(file) + } + Steal::Retry => continue, + Steal::Empty => { + let stolen = Self::steal_from_victims( + worker_id, + &local_worker, + &stealers, + &attempts, + &successes, + &mut seed, + ); + // Exit only when all task sources are empty. + if stolen.is_none() + && injector.is_empty() + && local_worker.is_empty() + && stealers.iter().all(Stealer::is_empty) + { + break; + } + stolen + } + } + }; + + let Some(file) = task else { + std::thread::yield_now(); + continue; + }; + + let compressed = match compress_file(&file.path, &options) { + Ok(output) => { + debug!( + file = ?file.path, + archive = ?output.archive_path, + algorithm = %output.algorithm_used, + input_bytes = output.input_bytes, + output_bytes = output.output_bytes, + "parallel compression done" + ); + true + } + Err(err) => { + warn!(file = ?file.path, error = %err, "parallel compression failed"); + false + } + }; + + if tx.send(CompressionTaskResult { file, compressed }).is_err() { + break; + } + } + }); + } + }); + drop(tx); + + // Any worker panic triggers deterministic fallback behavior. + if scope_result.is_err() { + warn!("parallel compression worker panicked, falling back to serial path"); + return self.serial_compress_and_delete(files); + } + + let mut deletable = Vec::with_capacity(files.len()); + for result in rx { + if result.compressed { + deletable.push(result.file); + } + } + + let (deleted, freed) = self.delete_files(&deletable)?; + let elapsed = started_at.elapsed().as_secs_f64(); + let attempts = steal_attempts.load(Ordering::Relaxed); + let successes = steal_successes.load(Ordering::Relaxed); + let success_rate = if attempts == 0 { + 0.0 + } else { + successes as f64 / attempts as f64 + }; + + // Emit post-run cleanup metrics for monitoring and alerting. + histogram!(METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS).record(elapsed); + gauge!(METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE).set(success_rate); + + info!( + workers = worker_count, + algorithm = %self.compression_algorithm, + deleted, + freed, + duration_seconds = elapsed, + steal_attempts = attempts, + steal_successes = successes, + steal_success_rate = success_rate, + "parallel cleanup finished" + ); + + Ok((deleted, freed)) + } + + /// Attempt to steal a task from peer workers using randomized victim order. + fn steal_from_victims( + worker_id: usize, + local_worker: &Worker, + stealers: &[Stealer], + attempts: &AtomicU64, + successes: &AtomicU64, + seed: &mut u64, + ) -> Option { + if stealers.len() <= 1 { + return None; + } + + // Xorshift step to randomize victim polling order and avoid convoying. + *seed ^= *seed << 13; + *seed ^= *seed >> 7; + *seed ^= *seed << 17; + let start = (*seed as usize) % stealers.len(); + + let steal_result = Steal::from_iter((0..stealers.len()).map(|offset| { + let victim = (start + offset) % stealers.len(); + if victim == worker_id { + return Steal::Empty; + } + attempts.fetch_add(1, Ordering::Relaxed); + stealers[victim].steal_batch_and_pop(local_worker) + })); + + match steal_result { + Steal::Success(file) => { + successes.fetch_add(1, Ordering::Relaxed); + Some(file) + } + Steal::Retry | Steal::Empty => None, + } + } + + /// Serial fallback path and non-parallel baseline. + /// + /// This path is also used whenever the task set is too small to benefit + /// from worker orchestration. + fn serial_compress_and_delete(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> { + let started_at = Instant::now(); + let mut deletable = Vec::with_capacity(files.len()); + + if self.compress_old_files { + let options = self.compression_options(); + for file in files { + match compress_file(&file.path, &options) { + Ok(output) => { + debug!( + file = ?file.path, + archive = ?output.archive_path, + algorithm = %output.algorithm_used, + input_bytes = output.input_bytes, + output_bytes = output.output_bytes, + "serial compression done" + ); + deletable.push(file.clone()); + } + Err(err) => { + warn!(file = ?file.path, error = %err, "serial compression failed, source kept"); + } + } + } + } else { + deletable.extend(files.iter().cloned()); + } + + let (deleted, freed) = self.delete_files(&deletable)?; + histogram!(METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS).record(started_at.elapsed().as_secs_f64()); + + Ok((deleted, freed)) + } + + /// Snapshot compression-related configuration for a single cleanup pass. + fn compression_options(&self) -> CompressionOptions { + CompressionOptions { + algorithm: self.compression_algorithm, + gzip_level: self.gzip_compression_level, + zstd_level: self.zstd_compression_level, + zstd_workers: self.zstd_workers, + zstd_fallback_to_gzip: self.zstd_fallback_to_gzip, + dry_run: self.dry_run, + } + } + + /// Delete a file while refusing symlinks and accommodating platform quirks. + fn secure_delete(&self, path: &PathBuf) -> std::io::Result<()> { + let meta = std::fs::symlink_metadata(path)?; if meta.file_type().is_symlink() { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -258,59 +487,36 @@ impl LogCleaner { )); } - // 3. Perform Deletion - std::fs::remove_file(path) - } - - /// Optionally compress and then delete the given files. - /// - /// This function is synchronous and blocking. It should be called within a - /// `spawn_blocking` task if running in an async context. - fn compress_and_delete(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> { - let mut total_deleted = 0; - let mut total_freed = 0; - - for f in files { - let mut deleted_size = 0; - if self.compress_old_files { - match compress_file(&f.path, self.gzip_compression_level, self.dry_run) { - Ok(_) => {} - Err(e) => { - tracing::warn!("Failed to compress {:?}: {}", f.path, e); + #[cfg(windows)] + { + // Retry removes to mitigate transient handle races from external + // scanners/AV software. + let mut last_err: Option = None; + for _ in 0..3 { + match std::fs::remove_file(path) { + Ok(()) => return Ok(()), + Err(err) => { + last_err = Some(err); + std::thread::sleep(Duration::from_millis(20)); } } } - - // Now delete - if self.dry_run { - info!("[DRY RUN] Would delete: {:?} ({} bytes)", f.path, f.size); - deleted_size = f.size; - } else { - match self.secure_delete(&f.path) { - Ok(()) => { - debug!("Deleted: {:?}", f.path); - deleted_size = f.size; - } - Err(e) => { - error!("Failed to delete {:?}: {}", f.path, e); - } - } - } - if deleted_size > 0 { - total_deleted += 1; - total_freed += deleted_size; + if let Some(err) = last_err { + return Err(err); } + return Ok(()); } - Ok((total_deleted, total_freed)) + #[cfg(not(windows))] + { + std::fs::remove_file(path) + } } - /// Delete all files in `files`, logging each operation. + /// Delete the supplied files and return `(deleted_count, freed_bytes)`. /// - /// Errors on individual files are logged but do **not** abort the loop. - /// - /// # Returns - /// `(deleted_count, freed_bytes)`. + /// In dry-run mode the returned counters still reflect the projected work + /// so callers and metrics can report what would have happened. pub(super) fn delete_files(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> { let mut deleted = 0usize; let mut freed = 0u64; @@ -320,16 +526,17 @@ impl LogCleaner { info!("[DRY RUN] Would delete: {:?} ({} bytes)", f.path, f.size); deleted += 1; freed += f.size; - } else { - match self.secure_delete(&f.path) { - Ok(()) => { - debug!("Deleted: {:?}", f.path); - deleted += 1; - freed += f.size; - } - Err(e) => { - error!("Failed to delete {:?}: {}", f.path, e); - } + continue; + } + + match self.secure_delete(&f.path) { + Ok(()) => { + deleted += 1; + freed += f.size; + debug!("Deleted: {:?}", f.path); + } + Err(e) => { + error!("Failed to delete {:?}: {}", f.path, e); } } } @@ -339,6 +546,9 @@ impl LogCleaner { } /// Builder for [`LogCleaner`]. +/// +/// The builder keeps startup code readable when an application only needs to +/// override a subset of retention knobs. pub struct LogCleanerBuilder { log_dir: PathBuf, file_pattern: String, @@ -354,18 +564,22 @@ pub struct LogCleanerBuilder { delete_empty_files: bool, min_file_age_seconds: u64, dry_run: bool, + compression_algorithm: CompressionAlgorithm, + parallel_compress: bool, + parallel_workers: usize, + zstd_compression_level: i32, + zstd_fallback_to_gzip: bool, + zstd_workers: usize, } impl LogCleanerBuilder { + /// Create a builder with conservative defaults. pub fn new(log_dir: impl Into, file_pattern: impl Into, active_filename: impl Into) -> Self { Self { log_dir: log_dir.into(), file_pattern: file_pattern.into(), active_filename: active_filename.into(), match_mode: FileMatchMode::Prefix, - // Default to a safe non-zero value so that a builder created - // without an explicit `keep_files()` call does not immediately - // delete all matching log files. keep_files: DEFAULT_LOG_KEEP_FILES, max_total_size_bytes: 0, max_single_file_size_bytes: 0, @@ -376,64 +590,127 @@ impl LogCleanerBuilder { delete_empty_files: false, min_file_age_seconds: 0, dry_run: false, + compression_algorithm: CompressionAlgorithm::default(), + parallel_compress: true, + parallel_workers: default_parallel_workers(), + zstd_compression_level: 8, + zstd_fallback_to_gzip: true, + zstd_workers: 1, } } + /// Configure whether `file_pattern` is matched as a prefix or suffix. pub fn match_mode(mut self, match_mode: FileMatchMode) -> Self { self.match_mode = match_mode; self } + /// Preserve at least this many newest regular log files. pub fn keep_files(mut self, keep_files: usize) -> Self { self.keep_files = keep_files; self } + /// Cap the aggregate size of retained regular log files. pub fn max_total_size_bytes(mut self, max_total_size_bytes: u64) -> Self { self.max_total_size_bytes = max_total_size_bytes; self } + /// Cap the size of any individual regular log file. pub fn max_single_file_size_bytes(mut self, max_single_file_size_bytes: u64) -> Self { self.max_single_file_size_bytes = max_single_file_size_bytes; self } + /// Enable archival compression before deleting selected source logs. pub fn compress_old_files(mut self, compress_old_files: bool) -> Self { self.compress_old_files = compress_old_files; self } + /// Set the gzip compression level used for gzip output or gzip fallback. pub fn gzip_compression_level(mut self, gzip_compression_level: u32) -> Self { self.gzip_compression_level = gzip_compression_level; self } + /// Set how long compressed archives may remain on disk. pub fn compressed_file_retention_days(mut self, days: u64) -> Self { self.compressed_file_retention_days = days; self } + /// Exclude files matching these glob patterns from every cleanup pass. pub fn exclude_patterns(mut self, patterns: Vec) -> Self { self.exclude_patterns = patterns; self } + /// Allow the scanner to remove matching zero-byte regular logs immediately. pub fn delete_empty_files(mut self, delete_empty_files: bool) -> Self { self.delete_empty_files = delete_empty_files; self } + /// Require regular log files to be at least this old before processing. pub fn min_file_age_seconds(mut self, seconds: u64) -> Self { self.min_file_age_seconds = seconds; self } + /// Enable dry-run mode for scans, compression decisions, and deletion. pub fn dry_run(mut self, dry_run: bool) -> Self { self.dry_run = dry_run; self } + /// Set the preferred compression algorithm explicitly. + pub fn compression_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self { + self.compression_algorithm = algorithm; + self + } + + /// Parse and set the compression algorithm from configuration text. + pub fn compression_algorithm_str(mut self, algorithm: impl AsRef) -> Self { + self.compression_algorithm = CompressionAlgorithm::from_config_str(algorithm.as_ref()); + self + } + + /// Enable or disable the parallel work-stealing compression path. + pub fn parallel_compress(mut self, enabled: bool) -> Self { + self.parallel_compress = enabled; + self + } + + /// Set the number of compression workers, clamped to at least one. + pub fn parallel_workers(mut self, workers: usize) -> Self { + self.parallel_workers = workers.max(1); + self + } + + /// Set the zstd compression level. + pub fn zstd_compression_level(mut self, level: i32) -> Self { + self.zstd_compression_level = level; + self + } + + /// Retry compression with gzip when zstd encoding fails. + pub fn zstd_fallback_to_gzip(mut self, enabled: bool) -> Self { + self.zstd_fallback_to_gzip = enabled; + self + } + + /// Set the number of internal worker threads requested from zstd. + pub fn zstd_workers(mut self, workers: usize) -> Self { + self.zstd_workers = workers.max(1); + self + } + + /// Finalize the builder into an immutable [`LogCleaner`]. + /// + /// Invalid glob patterns are ignored rather than failing construction, and + /// codec-related numeric values are clamped into safe ranges. pub fn build(self) -> LogCleaner { let patterns = self .exclude_patterns @@ -456,6 +733,12 @@ impl LogCleanerBuilder { delete_empty_files: self.delete_empty_files, min_file_age_seconds: self.min_file_age_seconds, dry_run: self.dry_run, + compression_algorithm: self.compression_algorithm, + parallel_compress: self.parallel_compress, + parallel_workers: self.parallel_workers.max(1), + zstd_compression_level: self.zstd_compression_level.clamp(1, 21), + zstd_fallback_to_gzip: self.zstd_fallback_to_gzip, + zstd_workers: self.zstd_workers.max(1), } } } diff --git a/crates/obs/src/cleaner/mod.rs b/crates/obs/src/cleaner/mod.rs index 4676bc59..b5707011 100644 --- a/crates/obs/src/cleaner/mod.rs +++ b/crates/obs/src/cleaner/mod.rs @@ -14,17 +14,32 @@ //! Log-file cleanup subsystem. //! -//! This module provides [`LogCleaner`], a configurable manager that -//! periodically removes, compresses, or archives old rolling log files. +//! This module exposes the high-level [`LogCleaner`] API used by the +//! observability layer to manage rotated log files after they are no longer +//! active. The implementation is intentionally split into small focused +//! sub-modules so each concern can evolve independently without turning the +//! cleaner into a monolithic state machine. +//! +//! At a high level, one cleanup pass follows this lifecycle: +//! +//! 1. scan the target directory and classify matching entries; +//! 2. decide which regular log files exceed retention constraints; +//! 3. optionally compress those files using the configured codec; +//! 4. delete only files that are safe to remove; +//! 5. separately expire already-compressed archives by age. +//! +//! The design favors operational safety over aggressive reclamation: +//! compression and deletion are decoupled, symlinks are rejected on removal, +//! and dry-run mode reports intent without mutating the filesystem. //! //! ## Sub-modules //! //! | Module | Responsibility | //! |-------------|----------------------------------------------------------| -//! | `types` | Shared data types (`FileInfo`) | -//! | `scanner` | Filesystem traversal — discovers eligible files | -//! | `compress` | Gzip compression helper | -//! | `core` | Core orchestration — selection, compression, deletion | +//! | `types` | Shared enums and metadata (`FileInfo`, match/compression choices) | +//! | `scanner` | Filesystem traversal, pattern matching, empty-file handling | +//! | `compress` | Archive creation helpers for gzip and zstd | +//! | `core` | Selection, parallel/serial processing, secure deletion | //! //! ## Usage //! @@ -60,6 +75,10 @@ mod core; mod scanner; pub mod types; +/// Primary entry point for the cleaner subsystem. +/// +/// Re-exported from [`core`] so callers can construct a cleaner without +/// knowing the internal module layout. pub use core::LogCleaner; #[cfg(test)] @@ -72,6 +91,7 @@ mod tests { use std::path::Path; use tempfile::TempDir; + /// Create a test log file with deterministic contents and size. fn create_log_file(dir: &Path, name: &str, size: usize) -> std::io::Result<()> { let path = dir.join(name); let mut f = File::create(path)?; diff --git a/crates/obs/src/cleaner/scanner.rs b/crates/obs/src/cleaner/scanner.rs index f4331da3..3a7f76e9 100644 --- a/crates/obs/src/cleaner/scanner.rs +++ b/crates/obs/src/cleaner/scanner.rs @@ -18,15 +18,23 @@ //! The one exception is zero-byte file removal — when `delete_empty_files` //! is enabled, `scan_log_directory` removes empty regular files as part of //! the scan so that they are not counted in retention calculations. +//! +//! The scanner is also the first safety boundary of the cleaner pipeline. It +//! performs a shallow directory walk, rejects symlinks by relying on +//! `symlink_metadata`, and separates plain logs from pre-compressed archives so +//! later stages can apply different retention rules without rescanning. -use super::types::{FileInfo, FileMatchMode}; -use rustfs_config::observability::DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION; +use super::types::{CompressionAlgorithm, FileInfo, FileMatchMode}; use std::fs; use std::path::Path; use std::time::SystemTime; use tracing::debug; /// Result of a single pass directory scan. +/// +/// Separating regular logs from compressed archives keeps the selection logic +/// straightforward: active retention limits apply to the former, while archive +/// expiry rules apply to the latter. pub(super) struct LogScanResult { /// Regular log files eligible for deletion/compression. pub logs: Vec, @@ -49,6 +57,7 @@ pub(super) struct LogScanResult { /// * `delete_empty_files` - When `true`, zero-byte regular files that match /// the pattern are deleted immediately inside this function and excluded /// from the returned [`LogScanResult`]. +/// * `dry_run` - When `true`, destructive actions are logged but not executed. #[allow(clippy::too_many_arguments)] pub(super) fn scan_log_directory( log_dir: &Path, @@ -121,13 +130,19 @@ pub(super) fn scan_log_directory( } // 3. Classify file type and check pattern match. - let is_compressed = filename.ends_with(DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION); + let matched_suffix = CompressionAlgorithm::compressed_suffixes() + .into_iter() + .find(|suffix| filename.ends_with(suffix)); + let is_compressed = matched_suffix.is_some(); - // For matching, we need the "base" name. - // If compressed: "foo.log.gz" -> check "foo.log" - // If regular: "foo.log" -> check "foo.log" - let name_to_match = if is_compressed { - &filename[..filename.len() - DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION.len()] + // Perform matching on the logical log filename. + // Examples: + // - regular log: `foo.log.1` -> `foo.log.1` + // - archive: `foo.log.1.gz` -> `foo.log.1` + // This allows the same include/exclude pattern configuration to apply + // to both raw and already-compressed generations. + let name_to_match = if let Some(suffix) = matched_suffix { + &filename[..filename.len() - suffix.len()] } else { filename }; @@ -149,11 +164,13 @@ pub(super) fn scan_log_directory( Err(_) => continue, // Skip files where we can't read modification time }; - // 5. Handle zero-byte files (Regular logs only). - // We generally don't delete empty compressed files implicitly, but let's stick to regular files logic. + // 5. Handle zero-byte files (regular logs only). + // Empty compressed artifacts are left alone here because they belong + // to the archive-retention path and should not disappear outside that + // explicit policy. if !is_compressed && file_size == 0 && delete_empty_files { if !dry_run { - if let Err(e) = std::fs::remove_file(&path) { + if let Err(e) = fs::remove_file(&path) { tracing::warn!("Failed to delete empty file {:?}: {}", path, e); } else { debug!("Deleted empty file: {:?}", path); @@ -164,8 +181,9 @@ pub(super) fn scan_log_directory( continue; } - // 6. Age Check (Regular logs only). - // Compressed files have their own retention check in the caller. + // 6. Age gate regular logs only. + // Compressed files deliberately bypass this check because archive + // expiry is driven by a dedicated retention horizon in the caller. if !is_compressed && let Ok(age) = now.duration_since(modified) && age.as_secs() < min_file_age_seconds diff --git a/crates/obs/src/cleaner/types.rs b/crates/obs/src/cleaner/types.rs index 0109b960..e0fc332e 100644 --- a/crates/obs/src/cleaner/types.rs +++ b/crates/obs/src/cleaner/types.rs @@ -13,7 +13,18 @@ // limitations under the License. //! Shared types used across the log-cleanup sub-modules. +//! +//! These types deliberately stay lightweight because they are passed between +//! the scanner, selector, compressor, and deletion stages. Keeping them small +//! and explicit makes the cleaner easier to reason about and cheaper to move +//! across worker threads in the parallel pipeline. +use rustfs_config::observability::{ + DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP, + DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION, + DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MATCH_MODE_PREFIX, + DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION, +}; use std::fmt; use std::path::PathBuf; use std::time::SystemTime; @@ -33,8 +44,21 @@ impl FileMatchMode { /// Returns the string representation of the match mode. pub fn as_str(&self) -> &'static str { match self { - FileMatchMode::Prefix => "prefix", - FileMatchMode::Suffix => "suffix", + FileMatchMode::Prefix => DEFAULT_OBS_LOG_MATCH_MODE_PREFIX, + FileMatchMode::Suffix => DEFAULT_OBS_LOG_MATCH_MODE, + } + } + + /// Parse a config value into a [`FileMatchMode`]. + /// + /// Any non-`prefix` value falls back to [`FileMatchMode::Suffix`] to keep + /// configuration handling permissive and aligned with the historical + /// cleaner default used by rolling log filenames. + pub fn from_config_str(value: &str) -> Self { + if value.trim().eq_ignore_ascii_case(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX) { + Self::Prefix + } else { + Self::Suffix } } } @@ -45,17 +69,135 @@ impl fmt::Display for FileMatchMode { } } +/// Compression algorithm used by the cleaner. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CompressionAlgorithm { + /// Gzip keeps backward compatibility with existing `.gz` archives. + Gzip, + /// Zstd provides better ratio and higher decompression throughput. + Zstd, +} + +impl CompressionAlgorithm { + /// Parse a normalized lowercase configuration token or extension alias. + fn parse_normalized(value: &str) -> Option { + if value == DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP || value == DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION { + Some(Self::Gzip) + } else if value == DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD || value == DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION { + Some(Self::Zstd) + } else { + None + } + } + + /// Parse from a user-facing configuration string. + /// + /// Supported values include both semantic names (`gzip`, `zstd`) and file + /// extension aliases (`gz`, `zst`). Unknown values intentionally fall back + /// to the crate default so observability startup remains resilient. + pub fn from_config_str(value: &str) -> Self { + let normalized = value.trim().to_ascii_lowercase(); + Self::parse_normalized(&normalized).unwrap_or_default() + } + + /// Archive suffix (without dot) used for this algorithm. + /// + /// The returned value is suitable for appending to an existing filename, + /// rather than replacing the source extension. + pub fn extension(self) -> &'static str { + match self { + Self::Gzip => DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION.trim_start_matches('.'), + Self::Zstd => DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION.trim_start_matches('.'), + } + } + + /// Supported compressed suffixes used by scanner retention logic. + /// + /// The scanner uses this list to recognize already-archived files and to + /// keep them on a separate retention path from plain log files. + pub fn compressed_suffixes() -> [&'static str; 2] { + [ + DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION, + DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION, + ] + } + + /// Stable lowercase string form used in logs and configuration echoes. + pub fn as_str(self) -> &'static str { + match self { + Self::Gzip => DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP, + Self::Zstd => DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, + } + } +} + +impl std::str::FromStr for CompressionAlgorithm { + type Err = &'static str; + + fn from_str(value: &str) -> Result { + let normalized = value.trim().to_ascii_lowercase(); + Self::parse_normalized(&normalized).ok_or("invalid compression algorithm") + } +} + +impl Default for CompressionAlgorithm { + fn default() -> Self { + Self::from_config_str(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM) + } +} + +impl fmt::Display for CompressionAlgorithm { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Worker-thread default used by parallel compressor. +/// +/// The worker count follows CPU capacity but stays within [4, 8] to keep +/// throughput stable and avoid oversubscription. The lower bound helps the +/// work-stealing path on small machines still exercise concurrency, while the +/// upper bound avoids swamping the host when each task may also use internal +/// codec threads. +pub fn default_parallel_workers() -> usize { + num_cpus::get().clamp(4, 8) +} + /// Metadata for a single log file discovered by the scanner. /// -/// Carries enough information to make cleanup decisions (sort by age, compare -/// size against limits, etc.) without re-reading filesystem metadata on every -/// operation. +/// This snapshot is intentionally immutable after discovery. The cleaner uses +/// it to sort candidates by age, evaluate retention constraints, and report +/// deletion metrics without re-reading metadata during every later stage. #[derive(Debug, Clone)] pub(super) struct FileInfo { - /// Absolute path to the file. + /// Absolute or scanner-produced path to the file on disk. pub path: PathBuf, /// File size in bytes at the time of discovery. + /// + /// This value is used for retention accounting and freed-byte metrics. pub size: u64, /// Last-modification timestamp from the filesystem. + /// + /// The selection phase sorts on this timestamp so the oldest files are + /// processed first. pub modified: SystemTime, } + +#[cfg(test)] +mod tests { + use super::CompressionAlgorithm; + + #[test] + fn compression_algorithm_accepts_full_names_and_aliases() { + assert_eq!(CompressionAlgorithm::from_config_str("gzip"), CompressionAlgorithm::Gzip); + assert_eq!(CompressionAlgorithm::from_config_str("GZ"), CompressionAlgorithm::Gzip); + assert_eq!(CompressionAlgorithm::from_config_str("zstd"), CompressionAlgorithm::Zstd); + assert_eq!(CompressionAlgorithm::from_config_str(" zst "), CompressionAlgorithm::Zstd); + } + + #[test] + fn compression_algorithm_defaults_or_errors_for_invalid_values() { + assert_eq!(CompressionAlgorithm::from_config_str("brotli"), CompressionAlgorithm::default()); + assert!("brotli".parse::().is_err()); + } +} diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 4f8a0e4b..9449f4ff 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -24,17 +24,20 @@ use rustfs_config::observability::{ DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS, DEFAULT_OBS_LOG_COMPRESS_OLD_FILES, - DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN, - DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, - DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, + DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, + DEFAULT_OBS_LOG_DRY_RUN, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, + DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, + DEFAULT_OBS_LOG_PARALLEL_COMPRESS, DEFAULT_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL, + DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, DEFAULT_OBS_LOG_ZSTD_WORKERS, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT, ENV_OBS_LOG_CLEANUP_INTERVAL_SECONDS, ENV_OBS_LOG_COMPRESS_OLD_FILES, ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, - ENV_OBS_LOG_DELETE_EMPTY_FILES, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_DRY_RUN, ENV_OBS_LOG_ENDPOINT, - ENV_OBS_LOG_EXCLUDE_PATTERNS, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, ENV_OBS_LOG_KEEP_FILES, - ENV_OBS_LOG_MATCH_MODE, ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES, - ENV_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL, - ENV_OBS_LOGS_EXPORT_ENABLED, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, ENV_OBS_METRICS_EXPORT_ENABLED, - ENV_OBS_PROFILING_ENDPOINT, ENV_OBS_PROFILING_EXPORT_ENABLED, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME, - ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_TRACES_EXPORT_ENABLED, ENV_OBS_USE_STDOUT, + ENV_OBS_LOG_COMPRESSION_ALGORITHM, ENV_OBS_LOG_DELETE_EMPTY_FILES, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_DRY_RUN, + ENV_OBS_LOG_ENDPOINT, ENV_OBS_LOG_EXCLUDE_PATTERNS, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, + ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_MATCH_MODE, ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES, + ENV_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_LOG_PARALLEL_COMPRESS, ENV_OBS_LOG_PARALLEL_WORKERS, ENV_OBS_LOG_ROTATION_TIME, + ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, ENV_OBS_LOG_ZSTD_WORKERS, + ENV_OBS_LOGGER_LEVEL, ENV_OBS_LOGS_EXPORT_ENABLED, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, + ENV_OBS_METRICS_EXPORT_ENABLED, ENV_OBS_PROFILING_ENDPOINT, ENV_OBS_PROFILING_EXPORT_ENABLED, ENV_OBS_SAMPLE_RATIO, + ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_TRACES_EXPORT_ENABLED, ENV_OBS_USE_STDOUT, }; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_FILENAME, @@ -80,6 +83,11 @@ use std::env; /// let config = OtelConfig::extract_otel_config_from_env( /// Some("http://otel-collector:4318".to_string()) /// ); +/// +/// // Compression related env mapping examples: +/// // RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM=zstd +/// // RUSTFS_OBS_LOG_PARALLEL_COMPRESS=true +/// // RUSTFS_OBS_LOG_PARALLEL_WORKERS=6 /// ``` #[derive(Debug, Deserialize, Serialize, Clone)] pub struct OtelConfig { @@ -146,6 +154,18 @@ pub struct OtelConfig { pub log_compress_old_files: Option, /// Gzip compression level `1`–`9` (default: `6`). pub log_gzip_compression_level: Option, + /// Compression algorithm: `gzip` or `zstd` (default: `zstd`). + pub log_compression_algorithm: Option, + /// Enable crossbeam work-stealing parallel compression (default: `true`). + pub log_parallel_compress: Option, + /// Worker count for parallel compression (default: `6`). + pub log_parallel_workers: Option, + /// Zstd compression level `1`–`21` (default: `8`). + pub log_zstd_compression_level: Option, + /// Fallback to gzip when zstd compression fails (default: `true`). + pub log_zstd_fallback_to_gzip: Option, + /// Internal zstdmt workers per job (default: `1`). + pub log_zstd_workers: Option, /// Delete compressed archives older than this many days; `0` = keep forever /// (default: `30`). pub log_compressed_file_retention_days: Option, @@ -254,6 +274,21 @@ impl OtelConfig { ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL as u64, ) as u32), + log_compression_algorithm: Some(get_env_str( + ENV_OBS_LOG_COMPRESSION_ALGORITHM, + DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, + )), + log_parallel_compress: Some(get_env_bool(ENV_OBS_LOG_PARALLEL_COMPRESS, DEFAULT_OBS_LOG_PARALLEL_COMPRESS)), + log_parallel_workers: Some(get_env_usize(ENV_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_PARALLEL_WORKERS)), + log_zstd_compression_level: Some(get_env_u64( + ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, + DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL as u64, + ) as i32), + log_zstd_fallback_to_gzip: Some(get_env_bool( + ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, + DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, + )), + log_zstd_workers: Some(get_env_usize(ENV_OBS_LOG_ZSTD_WORKERS, DEFAULT_OBS_LOG_ZSTD_WORKERS)), log_compressed_file_retention_days: Some(get_env_u64( ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, diff --git a/crates/obs/src/global.rs b/crates/obs/src/global.rs index 1b90aa38..03b86442 100644 --- a/crates/obs/src/global.rs +++ b/crates/obs/src/global.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{AppConfig, GlobalError, OtelGuard, SystemObserver, telemetry::init_telemetry}; +use crate::{AppConfig, GlobalError, OtelConfig, OtelGuard, SystemObserver, telemetry::init_telemetry}; use std::sync::{Arc, Mutex}; use tokio::sync::OnceCell; -use tracing::{error, info}; +use tracing::{error, info, warn}; /// Global guard for OpenTelemetry tracing static GLOBAL_GUARD: OnceCell>> = OnceCell::const_new(); @@ -23,11 +23,40 @@ static GLOBAL_GUARD: OnceCell>> = OnceCell::const_new(); /// Flag indicating if observability metric is enabled pub(crate) static OBSERVABILITY_METRIC_ENABLED: OnceCell = OnceCell::const_new(); +/// Namespaced metrics for cleaner and rolling logging. +pub(crate) const METRIC_LOG_CLEANER_DELETED_FILES_TOTAL: &str = "rustfs.log_cleaner.deleted_files_total"; +pub(crate) const METRIC_LOG_CLEANER_FREED_BYTES_TOTAL: &str = "rustfs.log_cleaner.freed_bytes_total"; +pub(crate) const METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS: &str = "rustfs.log_cleaner.compress_duration_seconds"; +pub(crate) const METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE: &str = "rustfs.log_cleaner.steal_success_rate"; +pub(crate) const METRIC_LOG_CLEANER_RUNS_TOTAL: &str = "rustfs.log_cleaner.runs_total"; +pub(crate) const METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL: &str = "rustfs.log_cleaner.run_failures_total"; +pub(crate) const METRIC_LOG_CLEANER_ROTATION_TOTAL: &str = "rustfs.log_cleaner.rotation_total"; +pub(crate) const METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL: &str = "rustfs.log_cleaner.rotation_failures_total"; +pub(crate) const METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS: &str = "rustfs.log_cleaner.rotation_duration_seconds"; +pub(crate) const METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES: &str = "rustfs.log_cleaner.active_file_size_bytes"; + /// Check whether Observability metric is enabled pub fn observability_metric_enabled() -> bool { OBSERVABILITY_METRIC_ENABLED.get().copied().unwrap_or(false) } +/// Set the global observability metrics flag once. +/// +/// When this function is called multiple times, only the first value is kept +/// and later values are ignored to preserve OnceCell semantics. +pub(crate) fn set_observability_metric_enabled(enabled: bool) { + if OBSERVABILITY_METRIC_ENABLED.set(enabled).is_err() + && let Some(current) = OBSERVABILITY_METRIC_ENABLED.get() + && *current != enabled + { + warn!( + current = *current, + requested = enabled, + "OBSERVABILITY_METRIC_ENABLED was already initialized; keeping original value" + ); + } +} + /// Initialize the observability module /// /// # Parameters @@ -51,22 +80,53 @@ pub fn observability_metric_enabled() -> bool { pub async fn init_obs(endpoint: Option) -> Result { // Load the configuration file let config = AppConfig::new_with_endpoint(endpoint); + init_obs_with_config(&config.observability).await +} - let otel_guard = init_telemetry(&config.observability)?; - // Server will be created per connection - this ensures isolation +/// Initialize the observability module with an explicit [`OtelConfig`]. +/// +/// This is the lower-level counterpart to [`init_obs`]: it accepts a fully +/// constructed [`OtelConfig`] rather than building one from an endpoint URL. +/// Useful when the config is already available (e.g., embedded in a larger +/// application config struct) or when unit-testing with custom settings. +/// +/// # Parameters +/// - `config`: A pre-built [`OtelConfig`] to drive all telemetry backends. +/// +/// # Returns +/// An [`OtelGuard`] that must be kept alive for the lifetime of the +/// application. Dropping it flushes and shuts down all providers. +/// +/// # Errors +/// Returns [`GlobalError`] when the underlying telemetry backend fails to +/// initialise (e.g., cannot create the log directory, or an OTLP exporter +/// cannot connect). +/// +/// # Example +/// ```no_run +/// use rustfs_obs::{AppConfig, init_obs_with_config}; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let config = AppConfig::new_with_endpoint(Some("http://localhost:4318".to_string())); +/// let _guard = init_obs_with_config(&config.observability) +/// .await +/// .expect("observability init failed"); +/// # } +/// ``` +pub async fn init_obs_with_config(config: &OtelConfig) -> Result { + let otel_guard = init_telemetry(config)?; tokio::spawn(async move { - // Record the PID-related metrics of the current process let obs_result = SystemObserver::init_process_observer().await; match obs_result { Ok(_) => { - info!(target: "rustfs::obs::system::metrics","Process observer initialized successfully"); + info!(target: "rustfs::obs::system::metrics", "Process observer initialized successfully"); } Err(e) => { - error!(target: "rustfs::obs::system::metrics","Failed to initialize process observer: {}", e); + error!(target: "rustfs::obs::system::metrics", "Failed to initialize process observer: {}", e); } } }); - Ok(otel_guard) } @@ -121,9 +181,79 @@ pub fn get_global_guard() -> Result>, GlobalError> { #[cfg(test)] mod tests { use super::*; + + const README: &str = include_str!("../README.md"); + const GRAFANA_DASHBOARD: &str = include_str!("../../../.docker/observability/grafana/dashboards/rustfs.json"); + #[tokio::test] async fn test_get_uninitialized_guard() { let result = get_global_guard(); assert!(matches!(result, Err(GlobalError::NotInitialized))); } + + #[test] + fn test_log_cleaner_metric_constants_use_rustfs_prefix() { + let metrics = [ + METRIC_LOG_CLEANER_DELETED_FILES_TOTAL, + METRIC_LOG_CLEANER_FREED_BYTES_TOTAL, + METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS, + METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE, + METRIC_LOG_CLEANER_RUNS_TOTAL, + METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL, + METRIC_LOG_CLEANER_ROTATION_TOTAL, + METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL, + METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS, + METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES, + ]; + + for metric in metrics { + assert!( + metric.starts_with("rustfs.log_cleaner."), + "metric '{metric}' should use rustfs.log_cleaner.* namespace" + ); + } + } + + #[test] + fn test_readme_contains_grafana_dashboard_draft_section() { + assert!(README.contains("## Cleaner & Rotation Metrics")); + assert!(README.contains("### Grafana Dashboard JSON Draft (Ready to Import)")); + assert!(README.contains("\"uid\": \"rustfs-log-cleaner\"")); + assert!(README.contains("rustfs_log_cleaner_runs_total")); + assert!(README.contains("rustfs_log_cleaner_rotation_failures_total")); + } + + #[test] + fn test_readme_contains_promql_templates_section() { + assert!(README.contains("### PromQL Templates")); + assert!(README.contains("Cleanup failure ratio")); + assert!(README.contains("histogram_quantile(0.95")); + assert!(README.contains("max(rustfs_log_cleaner_active_file_size_bytes)")); + } + + #[test] + fn test_grafana_dashboard_contains_log_cleaner_row() { + assert!(GRAFANA_DASHBOARD.contains("\"title\": \"Log Cleaner\"")); + assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_runs_total")); + assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_rotation_failures_total")); + assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_active_file_size_bytes")); + } + + #[test] + fn test_readme_mentions_deployed_dashboard_path() { + assert!(README.contains(".docker/observability/grafana/dashboards/rustfs.json")); + assert!(README.contains("row title: `Log Cleaner`")); + } + + #[test] + fn test_init_obs_with_config_signature_is_correct() { + // Compile-time check: verifies that `init_obs_with_config` is public and + // accepts `&OtelConfig`, matching the README documentation. + // The future is created but intentionally not polled — this avoids + // initialising the global tracing subscriber in a unit-test context. + fn _type_check() { + let config = OtelConfig::default(); + let _future = init_obs_with_config(&config); + } + } } diff --git a/crates/obs/src/telemetry/local.rs b/crates/obs/src/telemetry/local.rs index 64569731..765bb5e8 100644 --- a/crates/obs/src/telemetry/local.rs +++ b/crates/obs/src/telemetry/local.rs @@ -25,20 +25,27 @@ //! //! The function [`init_local_logging`] is the single entry point for both //! cases; callers do **not** need to distinguish between stdout and file modes. +//! +//! The file-backed mode delegates retention and compression to +//! [`crate::cleaner`], which keeps the logging setup code focused on subscriber +//! construction while still allowing periodic housekeeping in the background. use super::guard::OtelGuard; use crate::TelemetryError; use crate::cleaner::LogCleaner; -use crate::cleaner::types::FileMatchMode; +use crate::cleaner::types::{CompressionAlgorithm, FileMatchMode}; use crate::config::OtelConfig; -use crate::global::OBSERVABILITY_METRIC_ENABLED; +use crate::global::{METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL, METRIC_LOG_CLEANER_RUNS_TOTAL, set_observability_metric_enabled}; use crate::telemetry::filter::build_env_filter; use crate::telemetry::rolling::{RollingAppender, Rotation}; use metrics::counter; use rustfs_config::observability::{ DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS, DEFAULT_OBS_LOG_COMPRESS_OLD_FILES, DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, - DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, - DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, + DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN, + DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, + DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, DEFAULT_OBS_LOG_PARALLEL_COMPRESS, + DEFAULT_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, + DEFAULT_OBS_LOG_ZSTD_WORKERS, }; use rustfs_config::{APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_STDOUT_ENABLED}; use std::sync::Arc; @@ -111,6 +118,8 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo let env_filter = build_env_filter(logger_level, None); let (nb, guard) = tracing_appender::non_blocking(std::io::stdout()); + // Keep stdout formatting JSON-shaped even in local-only mode so operators + // can ship the same log schema to external collectors if needed. let fmt_layer = tracing_subscriber::fmt::layer() .with_timer(LocalTime::rfc_3339()) .with_target(true) @@ -131,7 +140,7 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo .with(fmt_layer) .init(); - OBSERVABILITY_METRIC_ENABLED.set(false).ok(); + set_observability_metric_enabled(false); counter!("rustfs.start.total").increment(1); info!("Init stdout logging (level: {})", logger_level); @@ -154,6 +163,10 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo /// Called by [`init_local_logging`] when a log directory is present. /// Handles directory creation, permission enforcement (Unix), file appender /// setup, optional stdout mirror, and log-cleanup task spawning. +/// +/// The function intentionally performs all fallible filesystem preparation +/// before registering the subscriber so startup failures are reported early and +/// do not leave partially initialized tracing state behind. fn init_file_logging_internal( config: &OtelConfig, log_directory: &str, @@ -181,11 +194,9 @@ fn init_file_logging_internal( .unwrap_or(DEFAULT_LOG_ROTATION_TIME) .to_lowercase(); - // Determine match mode from config, defaulting to Suffix - let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() { - Some("prefix") => FileMatchMode::Prefix, - _ => FileMatchMode::Suffix, - }; + // Match mode controls how the rolling filename is recognized later by the + // cleaner. Suffix mode fits timestamp-prefixed filenames especially well. + let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE)); let rotation = match rotation_str.as_str() { "minutely" => Rotation::Minutely, @@ -207,7 +218,8 @@ fn init_file_logging_internal( let env_filter = build_env_filter(logger_level, None); let span_events = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL }; - // File layer writes JSON without ANSI codes. + // File output stays machine-readable and free of ANSI sequences so the + // resulting files are safe to parse or ship to log processors. let file_layer = tracing_subscriber::fmt::layer() .with_timer(LocalTime::rfc_3339()) .with_target(true) @@ -223,7 +235,8 @@ fn init_file_logging_internal( .with_span_events(span_events.clone()); // Optional stdout mirror: enabled explicitly via `log_stdout_enabled`, or - // unconditionally in non-production environments. + // unconditionally in non-production environments so developers still see + // immediate terminal output while file rotation remains enabled. let (stdout_layer, stdout_guard) = if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production { let (stdout_nb, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); let enable_color = std::io::stdout().is_terminal(); @@ -255,7 +268,7 @@ fn init_file_logging_internal( .with(stdout_layer) .init(); - OBSERVABILITY_METRIC_ENABLED.set(false).ok(); + set_observability_metric_enabled(false); // ── 5. Start background cleanup task ───────────────────────────────────── let cleanup_handle = spawn_cleanup_task(config, log_directory, log_filename, keep_files); @@ -284,6 +297,8 @@ fn init_file_logging_internal( /// Tightens permissions to `0755` if the directory is more permissive. /// This prevents world-writable log directories from being a security hazard. /// No-ops if permissions are already `0755` or stricter. +/// +/// The function never broadens permissions; it is strictly a hardening step. #[cfg(unix)] pub fn ensure_dir_permissions(log_directory: &str) -> Result<(), TelemetryError> { use std::fs::Permissions; @@ -325,6 +340,10 @@ pub fn ensure_dir_permissions(log_directory: &str) -> Result<(), TelemetryError> /// Tokio runtime and should be aborted (via the returned `JoinHandle`) when /// the application shuts down. /// +/// The asynchronous loop itself remains lightweight: each cleanup pass is +/// delegated to `spawn_blocking` because directory traversal, compression, and +/// deletion are inherently blocking filesystem operations. +/// /// # Arguments /// * `config` - Observability config containing cleanup parameters. /// * `log_directory` - Directory path of the rolling log files. @@ -341,16 +360,14 @@ pub fn spawn_cleanup_task( keep_files: usize, ) -> tokio::task::JoinHandle<()> { let log_dir = std::path::PathBuf::from(log_directory); - // Use suffix matching for log files like "2026-03-01-06-21.rustfs.log" - // where "rustfs.log" is the suffix. + // Use suffix matching for log files like `2026-03-01-06-21.rustfs.log` + // where `rustfs.log` is the stable suffix generated by the rolling appender. let file_pattern = config.log_filename.as_deref().unwrap_or(log_filename).to_string(); let active_filename = file_pattern.clone(); - // Determine match mode from config, defaulting to Suffix - let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() { - Some("prefix") => FileMatchMode::Prefix, - _ => FileMatchMode::Suffix, - }; + // Determine match mode from config, defaulting to the repository-wide + // observability setting when the caller leaves it unset. + let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE)); let max_total_size = config .log_max_total_size_bytes @@ -362,6 +379,21 @@ pub fn spawn_cleanup_task( let gzip_level = config .log_gzip_compression_level .unwrap_or(DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL); + let compression_algorithm = CompressionAlgorithm::from_config_str( + config + .log_compression_algorithm + .as_deref() + .unwrap_or(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM), + ); + let parallel_compress = config.log_parallel_compress.unwrap_or(DEFAULT_OBS_LOG_PARALLEL_COMPRESS); + let parallel_workers = config.log_parallel_workers.unwrap_or(DEFAULT_OBS_LOG_PARALLEL_WORKERS); + let zstd_level = config + .log_zstd_compression_level + .unwrap_or(DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL); + let zstd_fallback_to_gzip = config + .log_zstd_fallback_to_gzip + .unwrap_or(DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP); + let zstd_workers = config.log_zstd_workers.unwrap_or(DEFAULT_OBS_LOG_ZSTD_WORKERS); let retention_days = config .log_compressed_file_retention_days .unwrap_or(DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS); @@ -387,6 +419,14 @@ pub fn spawn_cleanup_task( .max_single_file_size_bytes(max_single_file_size) .compress_old_files(compress) .gzip_compression_level(gzip_level) + // Compression behavior stays fully config-driven, but the builder + // clamps unsafe numeric values and preserves sensible defaults. + .compression_algorithm(compression_algorithm) + .parallel_compress(parallel_compress) + .parallel_workers(parallel_workers) + .zstd_compression_level(zstd_level) + .zstd_fallback_to_gzip(zstd_fallback_to_gzip) + .zstd_workers(zstd_workers) .compressed_file_retention_days(retention_days) .exclude_patterns(exclude_patterns) .delete_empty_files(delete_empty) @@ -395,17 +435,37 @@ pub fn spawn_cleanup_task( .build(), ); + info!( + compression_algorithm = %compression_algorithm, + parallel_compress, + parallel_workers, + zstd_level, + zstd_fallback_to_gzip, + zstd_workers, + "log cleaner compression profile configured" + ); + tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(cleanup_interval)); loop { + // Wait for the next scheduled tick before dispatching another pass. + // The blocking filesystem work runs on a dedicated blocking thread. interval.tick().await; let cleaner_clone = cleaner.clone(); let result = tokio::task::spawn_blocking(move || cleaner_clone.cleanup()).await; match result { - Ok(Ok(_)) => {} // Success - Ok(Err(e)) => tracing::warn!("Log cleanup failed: {}", e), - Err(e) => tracing::warn!("Log cleanup task panicked: {}", e), + Ok(Ok(_)) => { + counter!(METRIC_LOG_CLEANER_RUNS_TOTAL).increment(1); + } + Ok(Err(e)) => { + counter!(METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL).increment(1); + tracing::warn!("Log cleanup failed: {}", e); + } + Err(e) => { + counter!(METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL).increment(1); + tracing::warn!("Log cleanup task panicked: {}", e); + } } } }) @@ -418,6 +478,7 @@ mod tests { use tempfile::tempdir; #[test] + /// Invalid file names should be reported as errors instead of panicking. fn test_init_file_logging_invalid_filename_does_not_panic() { let temp_dir = tempdir().expect("create temp dir"); let temp_path = temp_dir.path().to_str().expect("temp dir path is utf-8"); diff --git a/crates/obs/src/telemetry/otel.rs b/crates/obs/src/telemetry/otel.rs index 65f42c46..fb1acf28 100644 --- a/crates/obs/src/telemetry/otel.rs +++ b/crates/obs/src/telemetry/otel.rs @@ -31,10 +31,14 @@ //! //! All exporters use **HTTP binary** (Protobuf) encoding with **gzip** //! compression for efficiency over the wire. +//! +//! If log export is not configured, this module deliberately falls back to the +//! same rolling-file logging path used by the local backend so applications can +//! combine OTLP traces/metrics with on-disk logs. use crate::cleaner::types::FileMatchMode; use crate::config::OtelConfig; -use crate::global::OBSERVABILITY_METRIC_ENABLED; +use crate::global::set_observability_metric_enabled; use crate::telemetry::filter::build_env_filter; use crate::telemetry::guard::OtelGuard; // Import helper functions from local.rs (sibling module) @@ -53,7 +57,7 @@ use opentelemetry_sdk::{ metrics::{PeriodicReader, SdkMeterProvider}, trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; -use rustfs_config::observability::DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES; +use rustfs_config::observability::{DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES}; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_STDOUT_ENABLED, DEFAULT_OBS_LOGS_EXPORT_ENABLED, DEFAULT_OBS_METRICS_EXPORT_ENABLED, DEFAULT_OBS_TRACES_EXPORT_ENABLED, METER_INTERVAL, SAMPLE_RATIO, @@ -97,6 +101,8 @@ pub(super) fn init_observability_http( is_production: bool, ) -> Result { // ── Resource & sampling ────────────────────────────────────────────────── + // Build the common resource once so all enabled signals report the same + // service identity and deployment metadata. let res = build_resource(config); let service_name = config.service_name.as_deref().unwrap_or(APP_NAME).to_owned(); let use_stdout = config.use_stdout.unwrap_or(!is_production); @@ -134,8 +140,9 @@ pub(super) fn init_observability_http( } }); - // If log_endpoint is not explicitly set, fallback to root_ep/v1/logs ONLY if root_ep is present. - // If both are empty, log_ep is empty, which triggers the fallback to file logging logic. + // If `log_endpoint` is not explicitly set, fall back to `root_ep/v1/logs` + // only when a root endpoint exists. An empty result intentionally triggers + // the file-logging path below instead of silently disabling application logs. let log_ep: String = config .log_endpoint .as_deref() @@ -159,6 +166,8 @@ pub(super) fn init_observability_http( let profiling_agent = init_profiler(config); // ── Logger Logic ────────────────────────────────────────────────────────── + // Logging is the only signal that may intentionally route to either OTLP + // or local files depending on configuration completeness. let mut logger_provider: Option = None; let mut otel_bridge = None; let mut file_layer_opt = None; // File layer (File mode) @@ -179,11 +188,14 @@ pub(super) fn init_observability_http( .as_ref() .map(|p| OpenTelemetryTracingBridge::new(p).with_filter(build_env_filter(logger_level, None))); - // Note: We do NOT create a separate `fmt_layer_opt` here; stdout behavior is driven by the provider. + // No separate formatting layer is added here; when OTLP logging is + // active, the OpenTelemetry bridge is the authoritative sink for + // `tracing` events unless local file logging is needed as a fallback. } let span_events = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL }; // ── Case 2: File Logging - // Supplement: If log_directory is set and no OTLP log endpoint is configured, we enable file logging logic. + // If a log directory is configured and OTLP log export is unavailable, use + // the same rolling-file behavior as the local-only telemetry backend. if let Some(log_directory) = config.log_directory.as_deref().filter(|s| !s.is_empty()) && logger_provider.is_none() { @@ -204,10 +216,7 @@ pub(super) fn init_observability_http( .as_deref() .unwrap_or(DEFAULT_LOG_ROTATION_TIME) .to_lowercase(); - let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() { - Some("prefix") => FileMatchMode::Prefix, - _ => FileMatchMode::Suffix, - }; + let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE)); let rotation = match rotation_str.as_str() { "minutely" => Rotation::Minutely, "hourly" => Rotation::Hourly, @@ -241,7 +250,8 @@ pub(super) fn init_observability_http( .with_filter(build_env_filter(logger_level, None)), ); - // Cleanup task + // The cleanup task keeps rotated files bounded while the OTLP trace and + // metric exporters continue to operate independently. cleanup_handle = Some(spawn_cleanup_task(config, log_directory, log_filename, keep_files)); info!( @@ -256,8 +266,9 @@ pub(super) fn init_observability_http( .map(|p| OpenTelemetryLayer::new(p.tracer(service_name.to_string()))); let metrics_layer = meter_provider.as_ref().map(|p| MetricsLayer::new(p.clone())); - // Optional stdout mirror (matching init_file_logging_internal logic) - // This is separate from OTLP stdout logic. If file logging is enabled, we honor its stdout rules. + // Optional stdout mirror (matching `init_file_logging_internal` logic). + // This is separate from OTLP stdout exporting; it only affects local human + // readable output for the tracing subscriber. if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production { let (stdout_nb, stdout_g) = tracing_appender::non_blocking(std::io::stdout()); stdout_guard = Some(stdout_g); @@ -309,6 +320,8 @@ pub(super) fn init_observability_http( /// Build an optional [`SdkTracerProvider`] for the given trace endpoint. /// /// Returns `None` when the endpoint is empty or trace export is disabled. +/// When enabled, the provider is also registered as the global tracer provider +/// and installs the W3C trace-context propagator. fn build_tracer_provider( trace_ep: &str, config: &OtelConfig, @@ -344,6 +357,10 @@ fn build_tracer_provider( Ok(Some(provider)) } +/// Convert a configured sample ratio into the SDK sampler strategy. +/// +/// Invalid or non-finite ratios fall back to `AlwaysOn` so telemetry does not +/// disappear due to configuration mistakes. fn build_tracer_sampler(sample_ratio: f64) -> Sampler { if sample_ratio.is_finite() && (0.0..=1.0).contains(&sample_ratio) { Sampler::TraceIdRatioBased(sample_ratio) @@ -355,6 +372,8 @@ fn build_tracer_sampler(sample_ratio: f64) -> Sampler { /// Build an optional [`SdkMeterProvider`] for the given metrics endpoint. /// /// Returns `None` when the endpoint is empty or metric export is disabled. +/// The provider is paired with the crate's metrics recorder so `metrics` crate +/// instruments flow into OpenTelemetry readers. fn build_meter_provider( metric_ep: &str, config: &OtelConfig, @@ -394,13 +413,14 @@ fn build_meter_provider( global::set_meter_provider(provider.clone()); metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?; - OBSERVABILITY_METRIC_ENABLED.set(true).ok(); + set_observability_metric_enabled(true); Ok(Some(provider)) } /// Build an optional [`SdkLoggerProvider`] for the given log endpoint. /// /// Returns `None` when the endpoint is empty or log export is disabled. +/// The caller wraps the resulting provider in an OpenTelemetry tracing bridge. fn build_logger_provider( log_ep: &str, config: &OtelConfig, @@ -427,8 +447,10 @@ fn build_logger_provider( Ok(Some(builder.build())) } -/// Starts the Pyroscope continuous profiling agent if `ENV_OBS_PROFILING_ENDPOINT` is set. -/// No-op and returns None on non-unix platforms. +/// Start the Pyroscope continuous profiling agent when profiling is enabled. +/// +/// Returns `None` on non-Unix platforms, when the feature is disabled, or when +/// no usable profiling endpoint is configured. #[cfg(unix)] fn init_profiler(config: &OtelConfig) -> Option> { use pyroscope::backend::{BackendConfig, PprofConfig, pprof_backend}; @@ -468,6 +490,9 @@ fn init_profiler(config: &OtelConfig) -> Option PeriodicReader { PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()) .with_interval(Duration::from_secs(interval)) @@ -479,6 +504,7 @@ mod tests { use super::*; #[test] + /// Valid ratios should produce trace-id-ratio sampling. fn test_build_tracer_sampler_uses_trace_ratio_for_valid_values() { let sampler = build_tracer_sampler(0.0); assert!(format!("{sampler:?}").contains("TraceIdRatioBased")); @@ -491,6 +517,7 @@ mod tests { } #[test] + /// Invalid ratios should degrade to the safest non-dropping sampler. fn test_build_tracer_sampler_rejects_invalid_ratio_with_always_on() { let sampler = build_tracer_sampler(-0.1); assert!(format!("{sampler:?}").contains("AlwaysOn")); diff --git a/crates/obs/src/telemetry/rolling.rs b/crates/obs/src/telemetry/rolling.rs index 7d4b1749..45f297e4 100644 --- a/crates/obs/src/telemetry/rolling.rs +++ b/crates/obs/src/telemetry/rolling.rs @@ -19,7 +19,12 @@ //! log files do not grow indefinitely by rotating them when they exceed a configured size. use crate::cleaner::types::FileMatchMode; +use crate::global::{ + METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES, METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS, + METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL, METRIC_LOG_CLEANER_ROTATION_TOTAL, +}; use jiff::Zoned; +use metrics::{counter, gauge, histogram}; use std::fs::{self, File}; use std::io::{self, Write}; use std::path::{Path, PathBuf}; @@ -186,6 +191,7 @@ impl RollingAppender { } fn roll(&mut self) -> io::Result<()> { + let rotate_started = std::time::Instant::now(); // 1. Close current file first to ensure all buffers are flushed to OS (if any) // and handle released. if let Some(mut file) = self.file.take() @@ -250,6 +256,9 @@ impl RollingAppender { // This overrides whatever open_file() derived from mtime, ensuring // we stick to the logical rotation time. self.last_roll_ts = now.timestamp().as_second(); + counter!(METRIC_LOG_CLEANER_ROTATION_TOTAL).increment(1); + histogram!(METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS).record(rotate_started.elapsed().as_secs_f64()); + gauge!(METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES).set(self.size as f64); return Ok(()); } Err(e) => { @@ -281,6 +290,8 @@ impl RollingAppender { "RollingAppender: Failed to rotate log file after {} retries. Error: {:?}", MAX_RETRIES, last_error ); + counter!(METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL).increment(1); + histogram!(METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS).record(rotate_started.elapsed().as_secs_f64()); // Attempt to re-open existing active file to allow continued writing self.open_file()?; @@ -314,6 +325,7 @@ impl Write for RollingAppender { if let Some(file) = &mut self.file { let n = file.write(buf)?; self.size += n as u64; + gauge!(METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES).set(self.size as f64); Ok(n) } else { Err(io::Error::other("Failed to open log file")) diff --git a/scripts/inspect_dashboard.sh b/scripts/inspect_dashboard.sh new file mode 100644 index 00000000..8039ac72 --- /dev/null +++ b/scripts/inspect_dashboard.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +# +# Copyright 2024 RustFS Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -euo pipefail + +DASHBOARD_PATH=".docker/observability/grafana/dashboards/rustfs.json" + +if ! command -v jq >/dev/null 2>&1; then + echo "ERROR: jq is required by scripts/inspect_dashboard.sh" >&2 + exit 1 +fi + +if [[ ! -f "$DASHBOARD_PATH" ]]; then + echo "ERROR: dashboard file not found: $DASHBOARD_PATH" >&2 + exit 1 +fi + +jq -r ' + def pystr: + if . == null then "None" else tostring end; + def pyrepr: + if . == null then "None" + elif type == "string" then + "\u0027" + + (gsub("\\\\"; "\\\\\\\\") + | gsub("\u0027"; "\\\\\u0027") + | gsub("\n"; "\\\\n") + | gsub("\r"; "\\\\r") + | gsub("\t"; "\\\\t")) + + "\u0027" + else tostring + end; + + "Total panels: \(.panels | length)", + "Dashboard version: \(.version)", + "", + ( + .panels[] + | [ + (.id // 0), + (.type | pystr), + (.gridPos.y | pystr), + (.gridPos.h | pystr), + (.gridPos.w | pystr), + (.title | pyrepr) + ] + | @tsv + ) +' "$DASHBOARD_PATH" | { + IFS= read -r total_line + IFS= read -r version_line + IFS= read -r blank_line + + printf '%s\n' "$total_line" + printf '%s\n' "$version_line" + printf '%s\n' "$blank_line" + + while IFS=$'\t' read -r pid ptype y h w title; do + printf " id=%4d type=%-12s y=%-4s h=%-4s w=%-4s title=%s\n" \ + "$pid" "$ptype" "$y" "$h" "$w" "$title" + done +} +