mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
feat(obs): add init_obs_with_config API and signature guard test (#2175)
Signed-off-by: houseme <housemecn@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -7764,10 +7764,14 @@ dependencies = [
|
||||
name = "rustfs-obs"
|
||||
version = "0.0.5"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
"flate2",
|
||||
"glob",
|
||||
"jiff",
|
||||
"metrics",
|
||||
"num_cpus",
|
||||
"nvml-wrapper",
|
||||
"opentelemetry",
|
||||
"opentelemetry-appender-tracing",
|
||||
@@ -7779,6 +7783,7 @@ dependencies = [
|
||||
"rustfs-config",
|
||||
"rustfs-utils",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sysinfo",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -7789,6 +7794,7 @@ dependencies = [
|
||||
"tracing-error",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -197,6 +197,9 @@ const-str = { version = "1.1.0", features = ["std", "proc"] }
|
||||
convert_case = "0.11.0"
|
||||
criterion = { version = "0.8", features = ["html_reports"] }
|
||||
crossbeam-queue = "0.3.12"
|
||||
crossbeam-channel = "0.5.15"
|
||||
crossbeam-deque = "0.8.6"
|
||||
crossbeam-utils = "0.8.21"
|
||||
datafusion = "52.3.0"
|
||||
derive_builder = "0.20.2"
|
||||
enumset = "1.1.10"
|
||||
|
||||
@@ -48,6 +48,12 @@ pub const ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_TOTAL_SIZ
|
||||
pub const ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES";
|
||||
pub const ENV_OBS_LOG_COMPRESS_OLD_FILES: &str = "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES";
|
||||
pub const ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL";
|
||||
pub const ENV_OBS_LOG_COMPRESSION_ALGORITHM: &str = "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM";
|
||||
pub const ENV_OBS_LOG_PARALLEL_COMPRESS: &str = "RUSTFS_OBS_LOG_PARALLEL_COMPRESS";
|
||||
pub const ENV_OBS_LOG_PARALLEL_WORKERS: &str = "RUSTFS_OBS_LOG_PARALLEL_WORKERS";
|
||||
pub const ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL";
|
||||
pub const ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: &str = "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP";
|
||||
pub const ENV_OBS_LOG_ZSTD_WORKERS: &str = "RUSTFS_OBS_LOG_ZSTD_WORKERS";
|
||||
pub const ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: &str = "RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS";
|
||||
pub const ENV_OBS_LOG_EXCLUDE_PATTERNS: &str = "RUSTFS_OBS_LOG_EXCLUDE_PATTERNS";
|
||||
pub const ENV_OBS_LOG_DELETE_EMPTY_FILES: &str = "RUSTFS_OBS_LOG_DELETE_EMPTY_FILES";
|
||||
@@ -61,13 +67,24 @@ pub const DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES: u64 = 2 * 1024 * 1024 * 1024; //
|
||||
pub const DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: u64 = 0; // No single file limit
|
||||
pub const DEFAULT_OBS_LOG_COMPRESS_OLD_FILES: bool = true;
|
||||
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL: u32 = 6;
|
||||
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP: &str = "gzip";
|
||||
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD: &str = "zstd";
|
||||
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM: &str = DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD;
|
||||
pub const DEFAULT_OBS_LOG_PARALLEL_COMPRESS: bool = true;
|
||||
pub const DEFAULT_OBS_LOG_PARALLEL_WORKERS: usize = 6;
|
||||
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL: i32 = 8;
|
||||
pub const DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: bool = true;
|
||||
pub const DEFAULT_OBS_LOG_ZSTD_WORKERS: usize = 1;
|
||||
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION: &str = "gz";
|
||||
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION);
|
||||
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION: &str = "zst";
|
||||
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION);
|
||||
pub const DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: u64 = 30; // Retain compressed files for 30 days
|
||||
pub const DEFAULT_OBS_LOG_DELETE_EMPTY_FILES: bool = true;
|
||||
pub const DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS: u64 = 3600; // 1 hour
|
||||
pub const DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS: u64 = 1800; // 0.5 hours
|
||||
pub const DEFAULT_OBS_LOG_DRY_RUN: bool = false;
|
||||
pub const DEFAULT_OBS_LOG_MATCH_MODE_PREFIX: &str = "prefix";
|
||||
pub const DEFAULT_OBS_LOG_MATCH_MODE: &str = "suffix";
|
||||
|
||||
/// Default values for observability configuration
|
||||
@@ -113,6 +130,12 @@ mod tests {
|
||||
assert_eq!(ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES");
|
||||
assert_eq!(ENV_OBS_LOG_COMPRESS_OLD_FILES, "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES");
|
||||
assert_eq!(ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL");
|
||||
assert_eq!(ENV_OBS_LOG_COMPRESSION_ALGORITHM, "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM");
|
||||
assert_eq!(ENV_OBS_LOG_PARALLEL_COMPRESS, "RUSTFS_OBS_LOG_PARALLEL_COMPRESS");
|
||||
assert_eq!(ENV_OBS_LOG_PARALLEL_WORKERS, "RUSTFS_OBS_LOG_PARALLEL_WORKERS");
|
||||
assert_eq!(ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL");
|
||||
assert_eq!(ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP");
|
||||
assert_eq!(ENV_OBS_LOG_ZSTD_WORKERS, "RUSTFS_OBS_LOG_ZSTD_WORKERS");
|
||||
assert_eq!(
|
||||
ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
|
||||
"RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS"
|
||||
@@ -131,6 +154,10 @@ mod tests {
|
||||
assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development");
|
||||
assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test");
|
||||
assert_eq!(DEFAULT_OBS_ENVIRONMENT_STAGING, "staging");
|
||||
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP, "gzip");
|
||||
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, "zstd");
|
||||
assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX, "prefix");
|
||||
assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE, "suffix");
|
||||
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, "zstd");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,10 @@ flate2 = { workspace = true }
|
||||
glob = { workspace = true }
|
||||
jiff = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
crossbeam-channel = { workspace = true }
|
||||
crossbeam-deque = { workspace = true }
|
||||
crossbeam-utils = { workspace = true }
|
||||
num_cpus = { workspace = true }
|
||||
nvml-wrapper = { workspace = true, optional = true }
|
||||
opentelemetry = { workspace = true }
|
||||
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
|
||||
@@ -56,6 +60,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "
|
||||
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
|
||||
sysinfo = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
zstd = { workspace = true, features = ["zstdmt"] }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
pyroscope = { workspace = true, features = ["backend-pprof-rs"] }
|
||||
@@ -65,3 +70,4 @@ pyroscope = { workspace = true, features = ["backend-pprof-rs"] }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tempfile = { workspace = true }
|
||||
temp-env = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
@@ -14,7 +14,7 @@ logging, distributed tracing, metrics via OpenTelemetry, and continuous profilin
|
||||
| **Distributed tracing** | OTLP/HTTP export to Jaeger, Tempo, or any OTel collector |
|
||||
| **Metrics** | OTLP/HTTP export, bridged from the `metrics` crate facade |
|
||||
| **Continuous Profiling** | CPU/Memory profiling export to Pyroscope |
|
||||
| **Log cleanup** | Background task: size limits, gzip compression, retention policies |
|
||||
| **Log cleanup** | Background task: size limits, zstd/gzip compression, retention policies |
|
||||
| **GPU metrics** *(optional)* | Enable with the `gpu` feature flag |
|
||||
|
||||
---
|
||||
@@ -148,9 +148,15 @@ All configuration is read from environment variables at startup.
|
||||
|-------------------------------------------------|--------------|-------------------------------------------------------------|
|
||||
| `RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES` | `2147483648` | Hard cap on total log directory size (2 GiB) |
|
||||
| `RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES` | `0` | Per-file size cap; `0` = unlimited |
|
||||
| `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | `true` | Gzip-compress files before deleting |
|
||||
| `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | `true` | Compress files before deleting |
|
||||
| `RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL` | `6` | Gzip level `1` (fastest) – `9` (best) |
|
||||
| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | `30` | Delete `.gz` archives older than N days; `0` = keep forever |
|
||||
| `RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM` | `zstd` | Compression codec: `zstd` or `gzip` |
|
||||
| `RUSTFS_OBS_LOG_PARALLEL_COMPRESS` | `true` | Enable work-stealing parallel compression |
|
||||
| `RUSTFS_OBS_LOG_PARALLEL_WORKERS` | `6` | Number of cleaner worker threads |
|
||||
| `RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL` | `8` | Zstd level `1` (fastest) – `21` (best ratio) |
|
||||
| `RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP` | `true` | Fallback to gzip when zstd compression fails |
|
||||
| `RUSTFS_OBS_LOG_ZSTD_WORKERS` | `1` | zstdmt worker threads per compression task |
|
||||
| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | `30` | Delete `.gz` / `.zst` archives older than N days; `0` = keep forever |
|
||||
| `RUSTFS_OBS_LOG_EXCLUDE_PATTERNS` | _(empty)_ | Comma-separated glob patterns to never clean up |
|
||||
| `RUSTFS_OBS_LOG_DELETE_EMPTY_FILES` | `true` | Remove zero-byte files |
|
||||
| `RUSTFS_OBS_LOG_MIN_FILE_AGE_SECONDS` | `3600` | Minimum file age (seconds) before cleanup |
|
||||
@@ -159,6 +165,159 @@ All configuration is read from environment variables at startup.
|
||||
|
||||
---
|
||||
|
||||
## Cleaner & Rotation Metrics
|
||||
|
||||
The log rotation and cleanup pipeline emits these metrics (via the `metrics` facade):
|
||||
|
||||
| Metric | Type | Description |
|
||||
|---|---|---|
|
||||
| `rustfs.log_cleaner.deleted_files_total` | counter | Number of files deleted per cleanup pass |
|
||||
| `rustfs.log_cleaner.freed_bytes_total` | counter | Bytes reclaimed by deletion |
|
||||
| `rustfs.log_cleaner.compress_duration_seconds` | histogram | Compression stage duration |
|
||||
| `rustfs.log_cleaner.steal_success_rate` | gauge | Work-stealing success ratio in parallel mode |
|
||||
| `rustfs.log_cleaner.runs_total` | counter | Successful cleanup loop runs |
|
||||
| `rustfs.log_cleaner.run_failures_total` | counter | Failed or panicked cleanup loop runs |
|
||||
| `rustfs.log_cleaner.rotation_total` | counter | Successful file rotations |
|
||||
| `rustfs.log_cleaner.rotation_failures_total` | counter | Failed file rotations |
|
||||
| `rustfs.log_cleaner.rotation_duration_seconds` | histogram | Rotation latency |
|
||||
| `rustfs.log_cleaner.active_file_size_bytes` | gauge | Current active log file size |
|
||||
|
||||
These metrics cover compression, cleanup, and file rotation end-to-end.
|
||||
|
||||
### Metric Semantics
|
||||
|
||||
- `deleted_files_total` and `freed_bytes_total` are emitted after each cleanup pass and include both normal log cleanup and expired compressed archive cleanup.
|
||||
- `compress_duration_seconds` measures compression stage wall-clock time for both serial and parallel modes.
|
||||
- `steal_success_rate` is updated by the parallel work-stealing path and remains at the last computed value.
|
||||
- `rotation_*` metrics are emitted by `RollingAppender` and include retries; a failed final rotation increments `rotation_failures_total`.
|
||||
- `active_file_size_bytes` is sampled on writes and after successful roll, so dashboards can track current active file growth.
|
||||
|
||||
### Grafana Dashboard JSON Draft (Ready to Import)
|
||||
|
||||
> Save this as `rustfs-log-cleaner-dashboard.json`, then import from Grafana UI.
|
||||
> For Prometheus datasources, metric names are usually normalized to underscores,
|
||||
> so `rustfs.log_cleaner.deleted_files_total` becomes `rustfs_log_cleaner_deleted_files_total`.
|
||||
>
|
||||
> The same panels are now checked in at:
|
||||
> `.docker/observability/grafana/dashboards/rustfs.json`
|
||||
> (row title: `Log Cleaner`).
|
||||
|
||||
```json
|
||||
{
|
||||
"uid": "rustfs-log-cleaner",
|
||||
"title": "RustFS Log Cleaner",
|
||||
"timezone": "browser",
|
||||
"schemaVersion": 39,
|
||||
"version": 1,
|
||||
"refresh": "10s",
|
||||
"tags": ["rustfs", "observability", "log-cleaner"],
|
||||
"time": {
|
||||
"from": "now-6h",
|
||||
"to": "now"
|
||||
},
|
||||
"panels": [
|
||||
{
|
||||
"id": 1,
|
||||
"title": "Cleanup Runs / Failures",
|
||||
"type": "timeseries",
|
||||
"targets": [
|
||||
{ "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_runs_total[5m]))", "legendFormat": "runs/s" },
|
||||
{ "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_run_failures_total[5m]))", "legendFormat": "failures/s" }
|
||||
],
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "Freed Bytes / Deleted Files",
|
||||
"type": "timeseries",
|
||||
"targets": [
|
||||
{ "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_freed_bytes_total[15m]))", "legendFormat": "bytes/s" },
|
||||
{ "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_deleted_files_total[15m]))", "legendFormat": "files/s" }
|
||||
],
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"title": "Compression P95 Latency",
|
||||
"type": "timeseries",
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"expr": "histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket[5m])) by (le))",
|
||||
"legendFormat": "p95"
|
||||
}
|
||||
],
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"title": "Rotation Success / Failure",
|
||||
"type": "timeseries",
|
||||
"targets": [
|
||||
{ "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_rotation_total[5m]))", "legendFormat": "rotation/s" },
|
||||
{ "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_rotation_failures_total[5m]))", "legendFormat": "rotation_failures/s" }
|
||||
],
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }
|
||||
},
|
||||
{
|
||||
"id": 5,
|
||||
"title": "Steal Success Rate",
|
||||
"type": "timeseries",
|
||||
"targets": [
|
||||
{ "refId": "A", "expr": "max(rustfs_log_cleaner_steal_success_rate)", "legendFormat": "ratio" }
|
||||
],
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 16 }
|
||||
},
|
||||
{
|
||||
"id": 6,
|
||||
"title": "Active File Size",
|
||||
"type": "timeseries",
|
||||
"targets": [
|
||||
{ "refId": "A", "expr": "max(rustfs_log_cleaner_active_file_size_bytes)", "legendFormat": "bytes" }
|
||||
],
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 16 }
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### PromQL Templates
|
||||
|
||||
Use these templates directly in Grafana panels/alerts.
|
||||
|
||||
- **Cleanup run rate**
|
||||
- `sum(rate(rustfs_log_cleaner_runs_total[$__rate_interval]))`
|
||||
- **Cleanup failure rate**
|
||||
- `sum(rate(rustfs_log_cleaner_run_failures_total[$__rate_interval]))`
|
||||
- **Cleanup failure ratio**
|
||||
- `sum(rate(rustfs_log_cleaner_run_failures_total[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_runs_total[$__rate_interval])), 1e-9)`
|
||||
- **Freed bytes throughput**
|
||||
- `sum(rate(rustfs_log_cleaner_freed_bytes_total[$__rate_interval]))`
|
||||
- **Deleted files throughput**
|
||||
- `sum(rate(rustfs_log_cleaner_deleted_files_total[$__rate_interval]))`
|
||||
- **Compression p95 latency**
|
||||
- `histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket[$__rate_interval])) by (le))`
|
||||
- **Rotation failure ratio**
|
||||
- `sum(rate(rustfs_log_cleaner_rotation_failures_total[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_rotation_total[$__rate_interval])), 1e-9)`
|
||||
- **Work-stealing efficiency (latest)**
|
||||
- `max(rustfs_log_cleaner_steal_success_rate)`
|
||||
- **Active file size (latest)**
|
||||
- `max(rustfs_log_cleaner_active_file_size_bytes)`
|
||||
|
||||
### Suggested Alerts
|
||||
|
||||
- **CleanupFailureRatioHigh**: failure ratio > 0.05 for 10m.
|
||||
- **CompressionLatencyP95High**: p95 above your baseline SLO for 15m.
|
||||
- **RotationFailuresDetected**: rotation failure rate > 0 for 3 consecutive windows.
|
||||
- **NoCleanupActivity**: runs rate == 0 for expected active environments.
|
||||
|
||||
### Metrics Compatibility
|
||||
|
||||
The project is currently in active development. Metric names and labels are updated directly when architecture evolves, and no backward-compatibility shim is maintained for old names.
|
||||
Use the metric names documented in this README as the current source of truth.
|
||||
|
||||
---
|
||||
|
||||
## Examples
|
||||
|
||||
### Stdout-only (development default)
|
||||
@@ -207,6 +366,19 @@ export RUSTFS_OBS_LOG_DRY_RUN=true
|
||||
# Observe log output — no files will actually be deleted.
|
||||
```
|
||||
|
||||
### Parallel zstd cleanup (recommended production profile)
|
||||
|
||||
```bash
|
||||
export RUSTFS_OBS_LOG_DIRECTORY=/var/log/rustfs
|
||||
export RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM=zstd
|
||||
export RUSTFS_OBS_LOG_PARALLEL_COMPRESS=true
|
||||
export RUSTFS_OBS_LOG_PARALLEL_WORKERS=6
|
||||
export RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL=8
|
||||
export RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP=true
|
||||
export RUSTFS_OBS_LOG_ZSTD_WORKERS=1
|
||||
./rustfs
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Module Structure
|
||||
@@ -229,9 +401,9 @@ rustfs-obs/src/
|
||||
│
|
||||
├── cleaner/ # Background log-file cleanup subsystem
|
||||
│ ├── mod.rs # LogCleaner public API + tests
|
||||
│ ├── types.rs # FileInfo shared type
|
||||
│ ├── types.rs # Shared cleaner types (match mode, compression codec, FileInfo)
|
||||
│ ├── scanner.rs # Filesystem discovery
|
||||
│ ├── compress.rs # Gzip compression helper
|
||||
│ ├── compress.rs # Gzip/Zstd compression helper
|
||||
│ └── core.rs # Selection, compression, deletion logic
|
||||
│
|
||||
└── system/ # Host metrics (CPU, memory, disk, GPU)
|
||||
|
||||
@@ -1,71 +1,130 @@
|
||||
# Log Cleaner Subsystem
|
||||
|
||||
The `cleaner` module provides a robust, background log-file lifecycle manager for RustFS. It is designed to run periodically to enforce retention policies, compress old logs, and prevent disk exhaustion.
|
||||
The `cleaner` module is a production-focused background lifecycle manager for RustFS log archives.
|
||||
It periodically discovers rolled files, applies retention constraints, compresses candidates, and then deletes sources safely.
|
||||
|
||||
## Architecture
|
||||
The subsystem is designed to be conservative by default:
|
||||
|
||||
The cleaner operates as a pipeline:
|
||||
- it never touches the currently active log file;
|
||||
- it refuses symlink deletion during the destructive phase;
|
||||
- it keeps compression and source deletion as separate steps;
|
||||
- it supports a full dry-run mode for policy verification.
|
||||
|
||||
1. **Discovery (`scanner.rs`)**: Scans the configured log directory for eligible files.
|
||||
* **Non-recursive**: Only scans the top-level directory for safety.
|
||||
* **Filtering**: Ignores the currently active log file, files matching exclude patterns, and files that do not match the configured prefix/suffix pattern.
|
||||
* **Performance**: Uses `std::fs::read_dir` directly to minimize overhead and syscalls.
|
||||
## Execution Pipeline
|
||||
|
||||
2. **Selection (`core.rs`)**: Applies retention policies to select files for deletion.
|
||||
* **Keep Count**: Ensures at least `N` recent files are kept.
|
||||
* **Total Size**: Deletes oldest files if the total size exceeds the limit.
|
||||
* **Single File Size**: Deletes individual files that exceed a size limit (e.g., runaway logs).
|
||||
1. **Discovery (`scanner.rs`)**
|
||||
- Performs a shallow `read_dir` scan (no recursion) for predictable latency.
|
||||
- Excludes the active log file, exclusion-pattern matches, and files younger than the age threshold.
|
||||
- Classifies regular logs and compressed archives (`.gz` / `.zst`) in one pass.
|
||||
|
||||
3. **Action (`core.rs` / `compress.rs`)**:
|
||||
* **Compression**: Optionally compresses selected files using Gzip (level 1-9) before deletion.
|
||||
* **Deletion**: Removes the original file (and eventually the compressed archive based on retention days).
|
||||
2. **Selection (`core.rs`)**
|
||||
- Enforces keep-count first.
|
||||
- Applies total-size and single-file-size constraints to oldest files.
|
||||
- Produces an ordered list of files to process.
|
||||
|
||||
## Configuration
|
||||
3. **Compression + Deletion (`core.rs` + `compress.rs`)**
|
||||
- Supports `zstd` and `gzip` codecs.
|
||||
- Uses parallel work stealing when enabled (`Injector + Worker::new_fifo + Stealer`).
|
||||
- Always deletes source files in a serial pass after compression to minimize file-lock race issues.
|
||||
|
||||
The cleaner is configured via `LogCleanerBuilder`. When initialized via `rustfs-obs::init_obs`, it reads from environment variables.
|
||||
4. **Archive Expiry (`core.rs`)**
|
||||
- Applies a separate retention window to already-compressed files.
|
||||
- Keeps archive expiration independent from plain-log keep-count logic.
|
||||
|
||||
| Parameter | Env Var | Description |
|
||||
|-----------|---------|-------------|
|
||||
| `log_dir` | `RUSTFS_OBS_LOG_DIRECTORY` | The directory to scan. |
|
||||
| `file_pattern` | `RUSTFS_OBS_LOG_FILENAME` | The base filename pattern (e.g., `rustfs.log`). |
|
||||
| `active_filename` | (Derived) | The exact name of the currently active log file, excluded from cleanup. |
|
||||
| `match_mode` | `RUSTFS_OBS_LOG_MATCH_MODE` | `prefix` or `suffix`. Determines how `file_pattern` is matched against filenames. |
|
||||
| `keep_files` | `RUSTFS_OBS_LOG_KEEP_FILES` | Minimum number of rolling log files to keep. |
|
||||
| `max_total_size_bytes` | `RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES` | Maximum aggregate size of all log files. Oldest files are deleted to satisfy this. |
|
||||
| `compress_old_files` | `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | If `true`, files selected for removal are first gzipped. |
|
||||
| `compressed_file_retention_days` | `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | Age in days after which `.gz` files are deleted. |
|
||||
## Compression Modes
|
||||
|
||||
## Timestamp Format & Rotation
|
||||
- **Primary codec**: `zstd` (default) for better ratio and faster decompression.
|
||||
- **Fallback codec**: `gzip` when zstd fallback is enabled.
|
||||
- **Dry-run**: reports planned compression/deletion operations without touching filesystem state.
|
||||
|
||||
The cleaner works in tandem with the `RollingAppender` in `telemetry/rolling.rs`.
|
||||
## Safety Model
|
||||
|
||||
* **Rotation**: Logs are rotated based on time (Daily/Hourly/Minutely) or Size.
|
||||
* **Naming**: Archived logs use a high-precision timestamp format: `YYYYMMDDHHMMSS.uuuuuu` (microseconds), plus a unique counter to prevent collisions.
|
||||
* **Suffix Mode**: `<timestamp>-<counter>.<filename>` (e.g., `20231027103001.123456-0.rustfs.log`)
|
||||
* **Prefix Mode**: `<filename>.<timestamp>-<counter>` (e.g., `rustfs.log.20231027103001.123456-0`)
|
||||
- **No recursive traversal**: the scanner only inspects the immediate log directory.
|
||||
- **No symlink following**: filesystem metadata is collected with `symlink_metadata`.
|
||||
- **Idempotent archives**: an existing `*.gz` or `*.zst` target means the file is treated as already compressed.
|
||||
- **Best-effort cleanup**: individual file failures are logged and do not abort the whole maintenance pass.
|
||||
|
||||
This high-precision naming ensures that files sort chronologically by name, and collisions are virtually impossible even under high load.
|
||||
## Work-Stealing Strategy
|
||||
|
||||
## Usage Example
|
||||
The parallel path in `core.rs` uses this fixed lookup sequence per worker:
|
||||
|
||||
1. `local_worker.pop()`
|
||||
2. `injector.steal_batch_and_pop(&local_worker)`
|
||||
3. randomized victim polling via `Steal::from_iter(...)`
|
||||
|
||||
This strategy keeps local cache affinity while still balancing stragglers.
|
||||
|
||||
## Metrics and Tracing
|
||||
|
||||
The cleaner emits tracing events and runtime metrics:
|
||||
|
||||
- `rustfs.log_cleaner.deleted_files_total` (counter)
|
||||
- `rustfs.log_cleaner.freed_bytes_total` (counter)
|
||||
- `rustfs.log_cleaner.compress_duration_seconds` (histogram)
|
||||
- `rustfs.log_cleaner.steal_success_rate` (gauge)
|
||||
- `rustfs.log_cleaner.rotation_total` (counter)
|
||||
- `rustfs.log_cleaner.rotation_failures_total` (counter)
|
||||
- `rustfs.log_cleaner.rotation_duration_seconds` (histogram)
|
||||
- `rustfs.log_cleaner.active_file_size_bytes` (gauge)
|
||||
|
||||
These values can be wired into dashboards and alert rules for cleanup health.
|
||||
|
||||
## Retention Decision Order
|
||||
|
||||
For regular logs, the cleaner evaluates candidates in this order:
|
||||
|
||||
1. keep at least `keep_files` newest matching generations;
|
||||
2. remove older files if total retained size still exceeds `max_total_size_bytes`;
|
||||
3. remove any file whose individual size exceeds `max_single_file_size_bytes`;
|
||||
4. if compression is enabled, archive before deletion;
|
||||
5. delete the original file only after successful compression.
|
||||
|
||||
## Key Environment Variables
|
||||
|
||||
| Env Var | Meaning |
|
||||
|---|---|
|
||||
| `RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM` | `zstd` or `gzip` |
|
||||
| `RUSTFS_OBS_LOG_PARALLEL_COMPRESS` | Enable work-stealing compression |
|
||||
| `RUSTFS_OBS_LOG_PARALLEL_WORKERS` | Worker count for parallel compressor |
|
||||
| `RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL` | Zstd level (1-21) |
|
||||
| `RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP` | Fallback switch on zstd failure |
|
||||
| `RUSTFS_OBS_LOG_ZSTD_WORKERS` | zstdmt worker threads per compression task |
|
||||
| `RUSTFS_OBS_LOG_DRY_RUN` | Dry-run mode |
|
||||
| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | Retention window for `*.gz` / `*.zst` archives |
|
||||
| `RUSTFS_OBS_LOG_DELETE_EMPTY_FILES` | Remove zero-byte regular log files during scanning |
|
||||
| `RUSTFS_OBS_LOG_MIN_FILE_AGE_SECONDS` | Minimum age for regular log eligibility |
|
||||
|
||||
## Builder Example
|
||||
|
||||
```rust
|
||||
use rustfs_obs::LogCleaner;
|
||||
use rustfs_obs::types::FileMatchMode;
|
||||
use rustfs_obs::types::{CompressionAlgorithm, FileMatchMode};
|
||||
use std::path::PathBuf;
|
||||
|
||||
let cleaner = LogCleaner::builder(
|
||||
PathBuf::from("/var/log/rustfs"),
|
||||
"rustfs.log.".to_string(),
|
||||
"rustfs.log".to_string(),
|
||||
"rustfs.log".to_string(),
|
||||
)
|
||||
.match_mode(FileMatchMode::Prefix)
|
||||
.keep_files(10)
|
||||
.max_total_size_bytes(1024 * 1024 * 100) // 100 MB
|
||||
.match_mode(FileMatchMode::Suffix)
|
||||
.keep_files(30)
|
||||
.max_total_size_bytes(2 * 1024 * 1024 * 1024)
|
||||
.compress_old_files(true)
|
||||
.compression_algorithm(CompressionAlgorithm::Zstd)
|
||||
.parallel_compress(true)
|
||||
.parallel_workers(6)
|
||||
.zstd_compression_level(8)
|
||||
.zstd_fallback_to_gzip(true)
|
||||
.zstd_workers(1)
|
||||
.dry_run(false)
|
||||
.build();
|
||||
|
||||
// Run cleanup (blocking operation, spawn in a background task)
|
||||
if let Ok((deleted, freed)) = cleaner.cleanup() {
|
||||
println!("Cleaned up {} files, freed {} bytes", deleted, freed);
|
||||
}
|
||||
let _ = cleaner.cleanup();
|
||||
```
|
||||
|
||||
## Operational Notes
|
||||
|
||||
- Prefer `FileMatchMode::Suffix` when rotations prepend timestamps to the filename.
|
||||
- Prefer `FileMatchMode::Prefix` when rotations append counters or timestamps after a stable base name.
|
||||
- Keep `parallel_workers` modest when `zstd_workers` is greater than `1`, because each compression task may already use internal codec threads.
|
||||
|
||||
|
||||
@@ -12,66 +12,205 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Gzip compression helper for old log files.
|
||||
//! Compression helpers for old log files.
|
||||
//!
|
||||
//! Files are compressed in place: `<name>` → `<name>.gz`. The original file
|
||||
//! is **not** deleted here — deletion is handled by the caller after
|
||||
//! compression succeeds.
|
||||
//! This module performs compression only. Source-file deletion is intentionally
|
||||
//! handled by the caller in a separate step to avoid platform-specific file
|
||||
//! locking issues (especially on Windows).
|
||||
//!
|
||||
//! The separation is important for operational safety: a failed archive write
|
||||
//! must never result in premature source deletion, and an already-existing
|
||||
//! archive should make repeated cleanup passes idempotent.
|
||||
|
||||
use super::types::CompressionAlgorithm;
|
||||
use flate2::Compression;
|
||||
use flate2::write::GzEncoder;
|
||||
use rustfs_config::observability::DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION;
|
||||
use std::ffi::OsString;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, BufWriter, Write};
|
||||
use std::path::Path;
|
||||
use tracing::{debug, info};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Compress `path` to `<path>.gz` using gzip.
|
||||
/// Compression options shared by serial and parallel cleaner paths.
|
||||
///
|
||||
/// If a `.gz` file for the given path already exists the function returns
|
||||
/// `Ok(())` immediately without overwriting the existing archive.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `path` - Path to the uncompressed log file.
|
||||
/// * `level` - Gzip compression level (`1`–`9`); clamped automatically.
|
||||
/// * `dry_run` - When `true`, log what would be done without writing anything.
|
||||
///
|
||||
/// # Errors
|
||||
/// Propagates any I/O error encountered while opening, reading, writing, or
|
||||
/// flushing files.
|
||||
pub(super) fn compress_file(path: &Path, level: u32, dry_run: bool) -> Result<(), std::io::Error> {
|
||||
let gz_path = path.with_extension(DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION);
|
||||
|
||||
if gz_path.exists() {
|
||||
debug!("Compressed file already exists, skipping: {:?}", gz_path);
|
||||
return Ok(());
|
||||
/// The core cleaner prepares this immutable bundle once per cleanup pass and
|
||||
/// then hands it to each worker so all files in that run use the same policy.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct CompressionOptions {
|
||||
/// Preferred compression codec for the current cleanup pass.
|
||||
pub algorithm: CompressionAlgorithm,
|
||||
/// Gzip level (1..=9).
|
||||
pub gzip_level: u32,
|
||||
/// Zstd level (1..=21).
|
||||
pub zstd_level: i32,
|
||||
/// Internal zstd worker threads used by zstdmt.
|
||||
pub zstd_workers: usize,
|
||||
/// If true, fallback to gzip when zstd encoding fails.
|
||||
pub zstd_fallback_to_gzip: bool,
|
||||
/// Dry-run mode reports planned actions without writing files.
|
||||
pub dry_run: bool,
|
||||
}
|
||||
|
||||
/// Compression output metadata used for metrics and deletion gating.
|
||||
///
|
||||
/// Callers inspect the output size and archive path before deciding whether it
|
||||
/// is safe to remove the source log file.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct CompressionOutput {
|
||||
/// Final path of the compressed archive file.
|
||||
pub archive_path: PathBuf,
|
||||
/// Codec actually used to produce the output.
|
||||
pub algorithm_used: CompressionAlgorithm,
|
||||
/// Input bytes before compression.
|
||||
pub input_bytes: u64,
|
||||
/// Compressed output bytes on disk.
|
||||
pub output_bytes: u64,
|
||||
}
|
||||
|
||||
/// Compress a single source file with the requested codec and fallback policy.
|
||||
///
|
||||
/// This function centralizes the fallback behavior so the orchestration layer
|
||||
/// does not need per-codec branching.
|
||||
pub(super) fn compress_file(path: &Path, options: &CompressionOptions) -> Result<CompressionOutput, std::io::Error> {
|
||||
match options.algorithm {
|
||||
CompressionAlgorithm::Gzip => compress_gzip(path, options.gzip_level, options.dry_run),
|
||||
CompressionAlgorithm::Zstd => match compress_zstd(path, options.zstd_level, options.zstd_workers, options.dry_run) {
|
||||
Ok(output) => Ok(output),
|
||||
Err(err) if options.zstd_fallback_to_gzip => {
|
||||
warn!(
|
||||
file = ?path,
|
||||
error = %err,
|
||||
"zstd compression failed, fallback to gzip"
|
||||
);
|
||||
compress_gzip(path, options.gzip_level, options.dry_run)
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Compress a file to `*.gz` using the configured gzip level.
|
||||
fn compress_gzip(path: &Path, level: u32, dry_run: bool) -> Result<CompressionOutput, std::io::Error> {
|
||||
let archive_path = archive_path(path, CompressionAlgorithm::Gzip);
|
||||
compress_with_writer(path, &archive_path, dry_run, CompressionAlgorithm::Gzip, |reader, writer| {
|
||||
let mut encoder = GzEncoder::new(writer, Compression::new(level.clamp(1, 9)));
|
||||
let written = std::io::copy(reader, &mut encoder)?;
|
||||
let mut out = encoder.finish()?;
|
||||
out.flush()?;
|
||||
Ok(written)
|
||||
})
|
||||
}
|
||||
|
||||
/// Compress a file to `*.zst` using multi-threaded zstd when available.
|
||||
fn compress_zstd(path: &Path, level: i32, workers: usize, dry_run: bool) -> Result<CompressionOutput, std::io::Error> {
|
||||
let archive_path = archive_path(path, CompressionAlgorithm::Zstd);
|
||||
compress_with_writer(path, &archive_path, dry_run, CompressionAlgorithm::Zstd, |reader, writer| {
|
||||
let mut encoder = zstd::stream::Encoder::new(writer, level.clamp(1, 21))?;
|
||||
encoder.multithread(workers.max(1) as u32)?;
|
||||
let written = std::io::copy(reader, &mut encoder)?;
|
||||
let mut out = encoder.finish()?;
|
||||
out.flush()?;
|
||||
Ok(written)
|
||||
})
|
||||
}
|
||||
|
||||
fn compress_with_writer<F>(
|
||||
path: &Path,
|
||||
archive_path: &Path,
|
||||
dry_run: bool,
|
||||
algorithm_used: CompressionAlgorithm,
|
||||
mut writer_fn: F,
|
||||
) -> Result<CompressionOutput, std::io::Error>
|
||||
where
|
||||
F: FnMut(&mut BufReader<File>, BufWriter<File>) -> Result<u64, std::io::Error>,
|
||||
{
|
||||
// Keep idempotent behavior: existing archive means this file has already
|
||||
// been handled in a previous cleanup pass.
|
||||
if archive_path.exists() {
|
||||
debug!(file = ?archive_path, "compressed archive already exists, skipping");
|
||||
let input_bytes = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
|
||||
let output_bytes = std::fs::metadata(archive_path).map(|m| m.len()).unwrap_or(0);
|
||||
return Ok(CompressionOutput {
|
||||
archive_path: archive_path.to_path_buf(),
|
||||
algorithm_used,
|
||||
input_bytes,
|
||||
output_bytes,
|
||||
});
|
||||
}
|
||||
|
||||
let input_bytes = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
|
||||
if dry_run {
|
||||
info!("[DRY RUN] Would compress file: {:?} -> {:?}", path, gz_path);
|
||||
return Ok(());
|
||||
info!("[DRY RUN] Would compress file: {:?} -> {:?}", path, archive_path);
|
||||
return Ok(CompressionOutput {
|
||||
archive_path: archive_path.to_path_buf(),
|
||||
algorithm_used,
|
||||
input_bytes,
|
||||
output_bytes: 0,
|
||||
});
|
||||
}
|
||||
|
||||
// Create the output archive only after the dry-run short-circuit so this
|
||||
// helper remains side-effect free when the caller is evaluating policy.
|
||||
let input = File::open(path)?;
|
||||
let output = File::create(&gz_path)?;
|
||||
|
||||
// Write to a temporary file first and then atomically rename it into place
|
||||
// so callers never observe a partially written archive at `archive_path`.
|
||||
let mut tmp_name = archive_path.as_os_str().to_owned();
|
||||
tmp_name.push(".tmp");
|
||||
let tmp_archive_path = std::path::PathBuf::from(tmp_name);
|
||||
|
||||
let output = File::create(&tmp_archive_path)?;
|
||||
let mut reader = BufReader::new(input);
|
||||
let mut writer = BufWriter::new(output);
|
||||
let writer = BufWriter::new(output);
|
||||
|
||||
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level.clamp(1, 9)));
|
||||
std::io::copy(&mut reader, &mut encoder)?;
|
||||
let compressed = encoder.finish()?;
|
||||
if let Err(e) = writer_fn(&mut reader, writer) {
|
||||
// Best-effort cleanup of the incomplete temporary archive.
|
||||
let _ = std::fs::remove_file(&tmp_archive_path);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
writer.write_all(&compressed)?;
|
||||
writer.flush()?;
|
||||
// Preserve Unix mode bits so rotated archives keep the same access policy.
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
if let Ok(src_meta) = std::fs::metadata(path) {
|
||||
let mode = src_meta.permissions().mode();
|
||||
let _ = std::fs::set_permissions(&tmp_archive_path, std::fs::Permissions::from_mode(mode));
|
||||
}
|
||||
}
|
||||
|
||||
// Atomically move the fully written temp file into its final location.
|
||||
std::fs::rename(&tmp_archive_path, archive_path)?;
|
||||
|
||||
let output_bytes = std::fs::metadata(archive_path).map(|m| m.len()).unwrap_or(0);
|
||||
debug!(
|
||||
"Compressed {:?} -> {:?} ({} bytes -> {} bytes)",
|
||||
path,
|
||||
gz_path,
|
||||
std::fs::metadata(path).map(|m| m.len()).unwrap_or(0),
|
||||
compressed.len()
|
||||
file = ?path,
|
||||
archive = ?archive_path,
|
||||
input_bytes,
|
||||
output_bytes,
|
||||
algorithm = %algorithm_used,
|
||||
"compression finished"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
Ok(CompressionOutput {
|
||||
archive_path: archive_path.to_path_buf(),
|
||||
algorithm_used,
|
||||
input_bytes,
|
||||
output_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
/// Build `<filename>.<ext>` without replacing an existing extension.
|
||||
///
|
||||
/// Rotated log filenames often already encode a generation number or suffix
|
||||
/// (for example `server.log.3`). Appending the archive extension preserves that
|
||||
/// information in the final artifact name (`server.log.3.gz`).
|
||||
fn archive_path(path: &Path, algorithm: CompressionAlgorithm) -> PathBuf {
|
||||
let ext = algorithm.extension();
|
||||
let mut file_name: OsString = path
|
||||
.file_name()
|
||||
.map_or_else(|| OsString::from("archive"), |n| n.to_os_string());
|
||||
file_name.push(format!(".{ext}"));
|
||||
path.with_file_name(file_name)
|
||||
}
|
||||
|
||||
@@ -14,67 +14,89 @@
|
||||
|
||||
//! Core log-file cleanup orchestration.
|
||||
//!
|
||||
//! [`LogCleaner`] is the public entry point for the cleanup subsystem.
|
||||
//! Construct it with [`LogCleaner::builder`] and call [`LogCleaner::cleanup`]
|
||||
//! periodically (e.g. from a `tokio::spawn`-ed loop).
|
||||
//!
|
||||
//! Internally the cleaner delegates to:
|
||||
//! - [`super::scanner`] — to discover which files exist and which are eligible,
|
||||
//! - [`super::compress`] — to gzip-compress files before they are deleted,
|
||||
//! - [`LogCleaner::select_files_to_delete`] — to apply count / size limits.
|
||||
//! This module connects scanning, retention selection, compression, and safe
|
||||
//! deletion into one reusable service object. The public surface is intentionally
|
||||
//! small: callers configure a [`LogCleaner`] once and then trigger discrete
|
||||
//! cleanup passes whenever log rotation or background maintenance requires it.
|
||||
|
||||
use super::compress::compress_file;
|
||||
use super::compress::{CompressionOptions, compress_file};
|
||||
use super::scanner::{LogScanResult, scan_log_directory};
|
||||
use super::types::{FileInfo, FileMatchMode};
|
||||
use super::types::{CompressionAlgorithm, FileInfo, FileMatchMode, default_parallel_workers};
|
||||
use crate::global::{
|
||||
METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS, METRIC_LOG_CLEANER_DELETED_FILES_TOTAL, METRIC_LOG_CLEANER_FREED_BYTES_TOTAL,
|
||||
METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE,
|
||||
};
|
||||
use crossbeam_channel::bounded;
|
||||
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
|
||||
use crossbeam_utils::thread;
|
||||
use metrics::{counter, gauge, histogram};
|
||||
use rustfs_config::DEFAULT_LOG_KEEP_FILES;
|
||||
use std::path::PathBuf;
|
||||
use std::time::SystemTime;
|
||||
use tracing::{debug, error, info};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CompressionTaskResult {
|
||||
/// Original file metadata so successful workers can be deleted later.
|
||||
file: FileInfo,
|
||||
/// Whether compression completed successfully for this file.
|
||||
compressed: bool,
|
||||
}
|
||||
|
||||
/// Log-file lifecycle manager.
|
||||
///
|
||||
/// Holds all cleanup policy parameters and exposes a single [`cleanup`] method
|
||||
/// that performs one full cleanup pass.
|
||||
///
|
||||
/// # Thread-safety
|
||||
/// `LogCleaner` is `Send + Sync`. Multiple callers can share a reference
|
||||
/// (e.g. via `Arc`) and call `cleanup` concurrently without data races,
|
||||
/// because no mutable state is mutated after construction.
|
||||
/// A cleaner instance is immutable after construction and therefore safe to
|
||||
/// reuse across periodic background jobs. Each call to [`LogCleaner::cleanup`]
|
||||
/// performs a fresh directory scan and applies the configured retention rules.
|
||||
pub struct LogCleaner {
|
||||
/// Directory containing the managed log files.
|
||||
/// Directory containing the active and rotated log files.
|
||||
pub(super) log_dir: PathBuf,
|
||||
/// Pattern string to match files (used as prefix or suffix).
|
||||
/// Pattern used to recognize relevant log generations.
|
||||
pub(super) file_pattern: String,
|
||||
/// Exact name of the active log file (to exclude from cleanup).
|
||||
/// The currently active log file that must never be touched.
|
||||
pub(super) active_filename: String,
|
||||
/// Whether to match by prefix or suffix.
|
||||
/// Whether `file_pattern` is interpreted as a prefix or suffix.
|
||||
pub(super) match_mode: FileMatchMode,
|
||||
/// The cleaner will never delete files if doing so would leave fewer than
|
||||
/// this many files in the directory.
|
||||
/// Minimum number of regular log files to keep regardless of size.
|
||||
pub(super) keep_files: usize,
|
||||
/// Hard ceiling on the total bytes of all managed files; `0` = no limit.
|
||||
/// Optional cap for the cumulative size of regular logs.
|
||||
pub(super) max_total_size_bytes: u64,
|
||||
/// Hard ceiling on a single file's size; `0` = no per-file limit.
|
||||
/// Optional cap for an individual regular log file.
|
||||
pub(super) max_single_file_size_bytes: u64,
|
||||
/// Compress eligible files with gzip before removing them.
|
||||
/// Whether selected regular logs should be compressed before deletion.
|
||||
pub(super) compress_old_files: bool,
|
||||
/// Gzip compression level (`1`–`9`, clamped on construction).
|
||||
/// Gzip compression level used when gzip is selected or used as fallback.
|
||||
pub(super) gzip_compression_level: u32,
|
||||
/// Delete compressed archives older than this many days; `0` = keep forever.
|
||||
/// Retention window for already compressed archives, expressed in days.
|
||||
pub(super) compressed_file_retention_days: u64,
|
||||
/// Compiled glob patterns for files that must never be cleaned up.
|
||||
/// Glob patterns that are excluded before any cleanup decision is made.
|
||||
pub(super) exclude_patterns: Vec<glob::Pattern>,
|
||||
/// Delete zero-byte files even when they are younger than `min_file_age_seconds`.
|
||||
/// Whether zero-byte regular logs may be removed during scanning.
|
||||
pub(super) delete_empty_files: bool,
|
||||
/// Files younger than this threshold (in seconds) are never touched.
|
||||
/// Minimum age a regular log must reach before it becomes eligible.
|
||||
pub(super) min_file_age_seconds: u64,
|
||||
/// When `true`, log what would be done without performing any destructive
|
||||
/// filesystem operations.
|
||||
/// Dry-run mode reports intended actions without modifying files.
|
||||
pub(super) dry_run: bool,
|
||||
// Parallel compression controls while keeping backward compatibility with
|
||||
// the original serial cleaner behavior.
|
||||
/// Preferred archive codec for compression-enabled cleanup passes.
|
||||
pub(super) compression_algorithm: CompressionAlgorithm,
|
||||
/// Enables the work-stealing compression path when compression is active.
|
||||
pub(super) parallel_compress: bool,
|
||||
/// Number of worker threads used by the parallel compressor.
|
||||
pub(super) parallel_workers: usize,
|
||||
/// Zstd compression level when zstd is selected.
|
||||
pub(super) zstd_compression_level: i32,
|
||||
/// Whether a failed zstd attempt should retry with gzip.
|
||||
pub(super) zstd_fallback_to_gzip: bool,
|
||||
/// Number of internal threads requested from the zstd encoder.
|
||||
pub(super) zstd_workers: usize,
|
||||
}
|
||||
|
||||
impl LogCleaner {
|
||||
/// Create a builder to construct a `LogCleaner`.
|
||||
/// Create a builder with the required path and filename matching inputs.
|
||||
pub fn builder(
|
||||
log_dir: impl Into<PathBuf>,
|
||||
file_pattern: impl Into<String>,
|
||||
@@ -84,19 +106,6 @@ impl LogCleaner {
|
||||
}
|
||||
|
||||
/// Perform one full cleanup pass.
|
||||
///
|
||||
/// Steps:
|
||||
/// 1. Scan the log directory for managed files (excluding the active file).
|
||||
/// 2. Apply count/size policies to select files for deletion.
|
||||
/// 3. Optionally compress selected files, then delete them.
|
||||
/// 4. Collect and delete expired compressed archives.
|
||||
///
|
||||
/// # Returns
|
||||
/// A tuple `(deleted_count, freed_bytes)` covering all deletions in this
|
||||
/// pass (both regular files and expired compressed archives).
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an [`std::io::Error`] if the log directory cannot be read.
|
||||
pub fn cleanup(&self) -> Result<(usize, u64), std::io::Error> {
|
||||
if !self.log_dir.exists() {
|
||||
debug!("Log directory does not exist: {:?}", self.log_dir);
|
||||
@@ -106,8 +115,6 @@ impl LogCleaner {
|
||||
let mut total_deleted = 0usize;
|
||||
let mut total_freed = 0u64;
|
||||
|
||||
// ── 1. Discover active log files (Archives only) ──────────────────────
|
||||
// We explicitly pass `active_filename` to exclude it from the list.
|
||||
let LogScanResult {
|
||||
mut logs,
|
||||
mut compressed_archives,
|
||||
@@ -122,7 +129,6 @@ impl LogCleaner {
|
||||
self.dry_run,
|
||||
)?;
|
||||
|
||||
// ── 2. Select + compress + delete (Regular Logs) ──────────────────────
|
||||
if !logs.is_empty() {
|
||||
logs.sort_by_key(|f| f.modified);
|
||||
let total_size: u64 = logs.iter().map(|f| f.size).sum();
|
||||
@@ -134,16 +140,20 @@ impl LogCleaner {
|
||||
total_size as f64 / 1024.0 / 1024.0
|
||||
);
|
||||
|
||||
// Select the oldest files first, then additionally trim any files
|
||||
// that still violate configured size constraints.
|
||||
let to_delete = self.select_files_to_process(&logs, total_size);
|
||||
|
||||
if !to_delete.is_empty() {
|
||||
let (d, f) = self.compress_and_delete(&to_delete)?;
|
||||
total_deleted += d;
|
||||
total_freed += f;
|
||||
let (deleted, freed) = if self.parallel_compress && self.compress_old_files {
|
||||
self.parallel_stealing_compress(&to_delete)?
|
||||
} else {
|
||||
self.serial_compress_and_delete(&to_delete)?
|
||||
};
|
||||
total_deleted += deleted;
|
||||
total_freed += freed;
|
||||
}
|
||||
}
|
||||
|
||||
// ── 3. Remove expired compressed archives ─────────────────────────────
|
||||
if !compressed_archives.is_empty() && self.compressed_file_retention_days > 0 {
|
||||
let expired = self.select_expired_compressed(&mut compressed_archives);
|
||||
if !expired.is_empty() {
|
||||
@@ -154,6 +164,8 @@ impl LogCleaner {
|
||||
}
|
||||
|
||||
if total_deleted > 0 || total_freed > 0 {
|
||||
counter!(METRIC_LOG_CLEANER_DELETED_FILES_TOTAL).increment(total_deleted as u64);
|
||||
counter!(METRIC_LOG_CLEANER_FREED_BYTES_TOTAL).increment(total_freed);
|
||||
info!(
|
||||
"Cleanup completed: deleted {} files, freed {} bytes ({:.2} MB)",
|
||||
total_deleted,
|
||||
@@ -165,51 +177,31 @@ impl LogCleaner {
|
||||
Ok((total_deleted, total_freed))
|
||||
}
|
||||
|
||||
// ─── Selection ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Choose which files from `files` (sorted oldest-first) should be deleted.
|
||||
/// Choose regular log files that should be compressed and/or deleted.
|
||||
///
|
||||
/// The algorithm respects three constraints in order:
|
||||
/// 1. Always keep at least `keep_files` files (archives).
|
||||
/// 2. Delete old files while the total size exceeds `max_total_size_bytes`.
|
||||
/// 3. Delete any file whose individual size exceeds `max_single_file_size_bytes`.
|
||||
/// The `files` slice must already be sorted from oldest to newest. The
|
||||
/// method first preserves the newest `keep_files` generations, then applies
|
||||
/// total-size and per-file-size limits to the remaining tail.
|
||||
pub(super) fn select_files_to_process(&self, files: &[FileInfo], total_size: u64) -> Vec<FileInfo> {
|
||||
let mut to_delete = Vec::new();
|
||||
|
||||
if files.is_empty() {
|
||||
return to_delete;
|
||||
}
|
||||
|
||||
// Calculate how many files we *must* delete to satisfy keep_files.
|
||||
let must_delete_count = files.len().saturating_sub(self.keep_files);
|
||||
|
||||
let mut current_size = total_size;
|
||||
|
||||
for (idx, file) in files.iter().enumerate() {
|
||||
// Condition 1: Enforce keep_files.
|
||||
// If we are in the range of files that exceed the count limit, delete them.
|
||||
if idx < must_delete_count {
|
||||
current_size = current_size.saturating_sub(file.size);
|
||||
to_delete.push(file.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Condition 2: Enforce max_total_size_bytes.
|
||||
let over_total = self.max_total_size_bytes > 0 && current_size > self.max_total_size_bytes;
|
||||
|
||||
// Condition 3: Enforce max_single_file_size_bytes.
|
||||
// Note: Since active file is excluded, if an archive is > max_single, it means it
|
||||
// was rotated out being too large (likely) or we lowered the limit. It should be deleted.
|
||||
let over_single = self.max_single_file_size_bytes > 0 && file.size > self.max_single_file_size_bytes;
|
||||
|
||||
if over_total {
|
||||
current_size = current_size.saturating_sub(file.size);
|
||||
to_delete.push(file.clone());
|
||||
} else if over_single {
|
||||
debug!(
|
||||
"Archive exceeds single-file size limit: {:?} ({} > {} bytes). Deleting.",
|
||||
file.path, file.size, self.max_single_file_size_bytes
|
||||
);
|
||||
if over_total || over_single {
|
||||
current_size = current_size.saturating_sub(file.size);
|
||||
to_delete.push(file.clone());
|
||||
}
|
||||
@@ -218,9 +210,9 @@ impl LogCleaner {
|
||||
to_delete
|
||||
}
|
||||
|
||||
/// Select compressed files that have exceeded the retention period.
|
||||
/// Select compressed archives whose age exceeds the archive retention window.
|
||||
fn select_expired_compressed(&self, files: &mut [FileInfo]) -> Vec<FileInfo> {
|
||||
let retention = std::time::Duration::from_secs(self.compressed_file_retention_days * 24 * 3600);
|
||||
let retention = Duration::from_secs(self.compressed_file_retention_days * 24 * 3600);
|
||||
let now = SystemTime::now();
|
||||
let mut expired = Vec::new();
|
||||
|
||||
@@ -235,22 +227,259 @@ impl LogCleaner {
|
||||
expired
|
||||
}
|
||||
|
||||
// ─── Compression + deletion ───────────────────────────────────────────────
|
||||
|
||||
/// Securely delete a file, preventing symlink attacks (TOCTOU).
|
||||
/// Parallel compressor with work stealing.
|
||||
///
|
||||
/// This function verifies that the path is not a symlink before attempting deletion.
|
||||
/// While strictly speaking a race condition is still theoretically possible between
|
||||
/// `symlink_metadata` and `remove_file`, this check covers the vast majority of
|
||||
/// privilege escalation vectors where a user replaces a log file with a symlink
|
||||
/// to a system file.
|
||||
fn secure_delete(&self, path: &PathBuf) -> std::io::Result<()> {
|
||||
// 1. Lstat (symlink_metadata) - do not follow links
|
||||
let meta = std::fs::symlink_metadata(path)?;
|
||||
/// The flow is intentionally split into "parallel compression" followed by
|
||||
/// "serial deletion" to reduce cross-platform file-locking failures.
|
||||
/// Compression workers only decide whether an archive was created; the main
|
||||
/// thread remains responsible for actual source removal so deletion policy
|
||||
/// and error reporting stay deterministic.
|
||||
fn parallel_stealing_compress(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
|
||||
if files.len() <= 1 {
|
||||
return self.serial_compress_and_delete(files);
|
||||
}
|
||||
|
||||
// 2. Symlink Check
|
||||
// If it's a symlink, we NEVER delete it. It might point to /etc/passwd.
|
||||
// In a log directory, symlinks are unexpected and dangerous.
|
||||
let worker_count = self.parallel_workers.min(files.len()).max(1);
|
||||
if worker_count <= 1 {
|
||||
return self.serial_compress_and_delete(files);
|
||||
}
|
||||
|
||||
let compression_options = self.compression_options();
|
||||
let started_at = Instant::now();
|
||||
let injector = Arc::new(Injector::new());
|
||||
for file in files {
|
||||
injector.push(file.clone());
|
||||
}
|
||||
|
||||
let mut workers = Vec::with_capacity(worker_count);
|
||||
let mut stealers = Vec::with_capacity(worker_count);
|
||||
for _ in 0..worker_count {
|
||||
let worker = Worker::new_fifo();
|
||||
stealers.push(worker.stealer());
|
||||
workers.push(worker);
|
||||
}
|
||||
|
||||
let stealers = Arc::new(stealers);
|
||||
let steal_attempts = Arc::new(AtomicU64::new(0));
|
||||
let steal_successes = Arc::new(AtomicU64::new(0));
|
||||
let (tx, rx) = bounded::<CompressionTaskResult>(worker_count.saturating_mul(2).max(8));
|
||||
|
||||
// Spawn a fixed-size worker set in a scoped region so panics are
|
||||
// contained and can be downgraded to a serial fallback instead of
|
||||
// leaking detached threads.
|
||||
let scope_result = thread::scope(|scope| {
|
||||
for (worker_id, local_worker) in workers.into_iter().enumerate() {
|
||||
let tx = tx.clone();
|
||||
let injector = Arc::clone(&injector);
|
||||
let stealers = Arc::clone(&stealers);
|
||||
let options = compression_options.clone();
|
||||
let attempts = Arc::clone(&steal_attempts);
|
||||
let successes = Arc::clone(&steal_successes);
|
||||
|
||||
scope.spawn(move |_| {
|
||||
let mut seed = (worker_id as u64 + 1)
|
||||
.wrapping_mul(6364136223846793005)
|
||||
.wrapping_add(1442695040888963407);
|
||||
|
||||
loop {
|
||||
// Search order: local FIFO -> global injector batch ->
|
||||
// random victim stealers.
|
||||
let task = if let Some(file) = local_worker.pop() {
|
||||
Some(file)
|
||||
} else {
|
||||
match injector.steal_batch_and_pop(&local_worker) {
|
||||
Steal::Success(file) => {
|
||||
attempts.fetch_add(1, Ordering::Relaxed);
|
||||
successes.fetch_add(1, Ordering::Relaxed);
|
||||
Some(file)
|
||||
}
|
||||
Steal::Retry => continue,
|
||||
Steal::Empty => {
|
||||
let stolen = Self::steal_from_victims(
|
||||
worker_id,
|
||||
&local_worker,
|
||||
&stealers,
|
||||
&attempts,
|
||||
&successes,
|
||||
&mut seed,
|
||||
);
|
||||
// Exit only when all task sources are empty.
|
||||
if stolen.is_none()
|
||||
&& injector.is_empty()
|
||||
&& local_worker.is_empty()
|
||||
&& stealers.iter().all(Stealer::is_empty)
|
||||
{
|
||||
break;
|
||||
}
|
||||
stolen
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let Some(file) = task else {
|
||||
std::thread::yield_now();
|
||||
continue;
|
||||
};
|
||||
|
||||
let compressed = match compress_file(&file.path, &options) {
|
||||
Ok(output) => {
|
||||
debug!(
|
||||
file = ?file.path,
|
||||
archive = ?output.archive_path,
|
||||
algorithm = %output.algorithm_used,
|
||||
input_bytes = output.input_bytes,
|
||||
output_bytes = output.output_bytes,
|
||||
"parallel compression done"
|
||||
);
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(file = ?file.path, error = %err, "parallel compression failed");
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if tx.send(CompressionTaskResult { file, compressed }).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
drop(tx);
|
||||
|
||||
// Any worker panic triggers deterministic fallback behavior.
|
||||
if scope_result.is_err() {
|
||||
warn!("parallel compression worker panicked, falling back to serial path");
|
||||
return self.serial_compress_and_delete(files);
|
||||
}
|
||||
|
||||
let mut deletable = Vec::with_capacity(files.len());
|
||||
for result in rx {
|
||||
if result.compressed {
|
||||
deletable.push(result.file);
|
||||
}
|
||||
}
|
||||
|
||||
let (deleted, freed) = self.delete_files(&deletable)?;
|
||||
let elapsed = started_at.elapsed().as_secs_f64();
|
||||
let attempts = steal_attempts.load(Ordering::Relaxed);
|
||||
let successes = steal_successes.load(Ordering::Relaxed);
|
||||
let success_rate = if attempts == 0 {
|
||||
0.0
|
||||
} else {
|
||||
successes as f64 / attempts as f64
|
||||
};
|
||||
|
||||
// Emit post-run cleanup metrics for monitoring and alerting.
|
||||
histogram!(METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS).record(elapsed);
|
||||
gauge!(METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE).set(success_rate);
|
||||
|
||||
info!(
|
||||
workers = worker_count,
|
||||
algorithm = %self.compression_algorithm,
|
||||
deleted,
|
||||
freed,
|
||||
duration_seconds = elapsed,
|
||||
steal_attempts = attempts,
|
||||
steal_successes = successes,
|
||||
steal_success_rate = success_rate,
|
||||
"parallel cleanup finished"
|
||||
);
|
||||
|
||||
Ok((deleted, freed))
|
||||
}
|
||||
|
||||
/// Attempt to steal a task from peer workers using randomized victim order.
|
||||
fn steal_from_victims(
|
||||
worker_id: usize,
|
||||
local_worker: &Worker<FileInfo>,
|
||||
stealers: &[Stealer<FileInfo>],
|
||||
attempts: &AtomicU64,
|
||||
successes: &AtomicU64,
|
||||
seed: &mut u64,
|
||||
) -> Option<FileInfo> {
|
||||
if stealers.len() <= 1 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Xorshift step to randomize victim polling order and avoid convoying.
|
||||
*seed ^= *seed << 13;
|
||||
*seed ^= *seed >> 7;
|
||||
*seed ^= *seed << 17;
|
||||
let start = (*seed as usize) % stealers.len();
|
||||
|
||||
let steal_result = Steal::from_iter((0..stealers.len()).map(|offset| {
|
||||
let victim = (start + offset) % stealers.len();
|
||||
if victim == worker_id {
|
||||
return Steal::Empty;
|
||||
}
|
||||
attempts.fetch_add(1, Ordering::Relaxed);
|
||||
stealers[victim].steal_batch_and_pop(local_worker)
|
||||
}));
|
||||
|
||||
match steal_result {
|
||||
Steal::Success(file) => {
|
||||
successes.fetch_add(1, Ordering::Relaxed);
|
||||
Some(file)
|
||||
}
|
||||
Steal::Retry | Steal::Empty => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Serial fallback path and non-parallel baseline.
|
||||
///
|
||||
/// This path is also used whenever the task set is too small to benefit
|
||||
/// from worker orchestration.
|
||||
fn serial_compress_and_delete(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
|
||||
let started_at = Instant::now();
|
||||
let mut deletable = Vec::with_capacity(files.len());
|
||||
|
||||
if self.compress_old_files {
|
||||
let options = self.compression_options();
|
||||
for file in files {
|
||||
match compress_file(&file.path, &options) {
|
||||
Ok(output) => {
|
||||
debug!(
|
||||
file = ?file.path,
|
||||
archive = ?output.archive_path,
|
||||
algorithm = %output.algorithm_used,
|
||||
input_bytes = output.input_bytes,
|
||||
output_bytes = output.output_bytes,
|
||||
"serial compression done"
|
||||
);
|
||||
deletable.push(file.clone());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(file = ?file.path, error = %err, "serial compression failed, source kept");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
deletable.extend(files.iter().cloned());
|
||||
}
|
||||
|
||||
let (deleted, freed) = self.delete_files(&deletable)?;
|
||||
histogram!(METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS).record(started_at.elapsed().as_secs_f64());
|
||||
|
||||
Ok((deleted, freed))
|
||||
}
|
||||
|
||||
/// Snapshot compression-related configuration for a single cleanup pass.
|
||||
fn compression_options(&self) -> CompressionOptions {
|
||||
CompressionOptions {
|
||||
algorithm: self.compression_algorithm,
|
||||
gzip_level: self.gzip_compression_level,
|
||||
zstd_level: self.zstd_compression_level,
|
||||
zstd_workers: self.zstd_workers,
|
||||
zstd_fallback_to_gzip: self.zstd_fallback_to_gzip,
|
||||
dry_run: self.dry_run,
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete a file while refusing symlinks and accommodating platform quirks.
|
||||
fn secure_delete(&self, path: &PathBuf) -> std::io::Result<()> {
|
||||
let meta = std::fs::symlink_metadata(path)?;
|
||||
if meta.file_type().is_symlink() {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
@@ -258,59 +487,36 @@ impl LogCleaner {
|
||||
));
|
||||
}
|
||||
|
||||
// 3. Perform Deletion
|
||||
#[cfg(windows)]
|
||||
{
|
||||
// Retry removes to mitigate transient handle races from external
|
||||
// scanners/AV software.
|
||||
let mut last_err: Option<std::io::Error> = None;
|
||||
for _ in 0..3 {
|
||||
match std::fs::remove_file(path) {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(err) => {
|
||||
last_err = Some(err);
|
||||
std::thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(err) = last_err {
|
||||
return Err(err);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
std::fs::remove_file(path)
|
||||
}
|
||||
}
|
||||
|
||||
/// Optionally compress and then delete the given files.
|
||||
/// Delete the supplied files and return `(deleted_count, freed_bytes)`.
|
||||
///
|
||||
/// This function is synchronous and blocking. It should be called within a
|
||||
/// `spawn_blocking` task if running in an async context.
|
||||
fn compress_and_delete(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
|
||||
let mut total_deleted = 0;
|
||||
let mut total_freed = 0;
|
||||
|
||||
for f in files {
|
||||
let mut deleted_size = 0;
|
||||
if self.compress_old_files {
|
||||
match compress_file(&f.path, self.gzip_compression_level, self.dry_run) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to compress {:?}: {}", f.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now delete
|
||||
if self.dry_run {
|
||||
info!("[DRY RUN] Would delete: {:?} ({} bytes)", f.path, f.size);
|
||||
deleted_size = f.size;
|
||||
} else {
|
||||
match self.secure_delete(&f.path) {
|
||||
Ok(()) => {
|
||||
debug!("Deleted: {:?}", f.path);
|
||||
deleted_size = f.size;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to delete {:?}: {}", f.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if deleted_size > 0 {
|
||||
total_deleted += 1;
|
||||
total_freed += deleted_size;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((total_deleted, total_freed))
|
||||
}
|
||||
|
||||
/// Delete all files in `files`, logging each operation.
|
||||
///
|
||||
/// Errors on individual files are logged but do **not** abort the loop.
|
||||
///
|
||||
/// # Returns
|
||||
/// `(deleted_count, freed_bytes)`.
|
||||
/// In dry-run mode the returned counters still reflect the projected work
|
||||
/// so callers and metrics can report what would have happened.
|
||||
pub(super) fn delete_files(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
|
||||
let mut deleted = 0usize;
|
||||
let mut freed = 0u64;
|
||||
@@ -320,25 +526,29 @@ impl LogCleaner {
|
||||
info!("[DRY RUN] Would delete: {:?} ({} bytes)", f.path, f.size);
|
||||
deleted += 1;
|
||||
freed += f.size;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
match self.secure_delete(&f.path) {
|
||||
Ok(()) => {
|
||||
debug!("Deleted: {:?}", f.path);
|
||||
deleted += 1;
|
||||
freed += f.size;
|
||||
debug!("Deleted: {:?}", f.path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to delete {:?}: {}", f.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((deleted, freed))
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for [`LogCleaner`].
|
||||
///
|
||||
/// The builder keeps startup code readable when an application only needs to
|
||||
/// override a subset of retention knobs.
|
||||
pub struct LogCleanerBuilder {
|
||||
log_dir: PathBuf,
|
||||
file_pattern: String,
|
||||
@@ -354,18 +564,22 @@ pub struct LogCleanerBuilder {
|
||||
delete_empty_files: bool,
|
||||
min_file_age_seconds: u64,
|
||||
dry_run: bool,
|
||||
compression_algorithm: CompressionAlgorithm,
|
||||
parallel_compress: bool,
|
||||
parallel_workers: usize,
|
||||
zstd_compression_level: i32,
|
||||
zstd_fallback_to_gzip: bool,
|
||||
zstd_workers: usize,
|
||||
}
|
||||
|
||||
impl LogCleanerBuilder {
|
||||
/// Create a builder with conservative defaults.
|
||||
pub fn new(log_dir: impl Into<PathBuf>, file_pattern: impl Into<String>, active_filename: impl Into<String>) -> Self {
|
||||
Self {
|
||||
log_dir: log_dir.into(),
|
||||
file_pattern: file_pattern.into(),
|
||||
active_filename: active_filename.into(),
|
||||
match_mode: FileMatchMode::Prefix,
|
||||
// Default to a safe non-zero value so that a builder created
|
||||
// without an explicit `keep_files()` call does not immediately
|
||||
// delete all matching log files.
|
||||
keep_files: DEFAULT_LOG_KEEP_FILES,
|
||||
max_total_size_bytes: 0,
|
||||
max_single_file_size_bytes: 0,
|
||||
@@ -376,64 +590,127 @@ impl LogCleanerBuilder {
|
||||
delete_empty_files: false,
|
||||
min_file_age_seconds: 0,
|
||||
dry_run: false,
|
||||
compression_algorithm: CompressionAlgorithm::default(),
|
||||
parallel_compress: true,
|
||||
parallel_workers: default_parallel_workers(),
|
||||
zstd_compression_level: 8,
|
||||
zstd_fallback_to_gzip: true,
|
||||
zstd_workers: 1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure whether `file_pattern` is matched as a prefix or suffix.
|
||||
pub fn match_mode(mut self, match_mode: FileMatchMode) -> Self {
|
||||
self.match_mode = match_mode;
|
||||
self
|
||||
}
|
||||
|
||||
/// Preserve at least this many newest regular log files.
|
||||
pub fn keep_files(mut self, keep_files: usize) -> Self {
|
||||
self.keep_files = keep_files;
|
||||
self
|
||||
}
|
||||
|
||||
/// Cap the aggregate size of retained regular log files.
|
||||
pub fn max_total_size_bytes(mut self, max_total_size_bytes: u64) -> Self {
|
||||
self.max_total_size_bytes = max_total_size_bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Cap the size of any individual regular log file.
|
||||
pub fn max_single_file_size_bytes(mut self, max_single_file_size_bytes: u64) -> Self {
|
||||
self.max_single_file_size_bytes = max_single_file_size_bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable archival compression before deleting selected source logs.
|
||||
pub fn compress_old_files(mut self, compress_old_files: bool) -> Self {
|
||||
self.compress_old_files = compress_old_files;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the gzip compression level used for gzip output or gzip fallback.
|
||||
pub fn gzip_compression_level(mut self, gzip_compression_level: u32) -> Self {
|
||||
self.gzip_compression_level = gzip_compression_level;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set how long compressed archives may remain on disk.
|
||||
pub fn compressed_file_retention_days(mut self, days: u64) -> Self {
|
||||
self.compressed_file_retention_days = days;
|
||||
self
|
||||
}
|
||||
|
||||
/// Exclude files matching these glob patterns from every cleanup pass.
|
||||
pub fn exclude_patterns(mut self, patterns: Vec<String>) -> Self {
|
||||
self.exclude_patterns = patterns;
|
||||
self
|
||||
}
|
||||
|
||||
/// Allow the scanner to remove matching zero-byte regular logs immediately.
|
||||
pub fn delete_empty_files(mut self, delete_empty_files: bool) -> Self {
|
||||
self.delete_empty_files = delete_empty_files;
|
||||
self
|
||||
}
|
||||
|
||||
/// Require regular log files to be at least this old before processing.
|
||||
pub fn min_file_age_seconds(mut self, seconds: u64) -> Self {
|
||||
self.min_file_age_seconds = seconds;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable dry-run mode for scans, compression decisions, and deletion.
|
||||
pub fn dry_run(mut self, dry_run: bool) -> Self {
|
||||
self.dry_run = dry_run;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the preferred compression algorithm explicitly.
|
||||
pub fn compression_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
|
||||
self.compression_algorithm = algorithm;
|
||||
self
|
||||
}
|
||||
|
||||
/// Parse and set the compression algorithm from configuration text.
|
||||
pub fn compression_algorithm_str(mut self, algorithm: impl AsRef<str>) -> Self {
|
||||
self.compression_algorithm = CompressionAlgorithm::from_config_str(algorithm.as_ref());
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable or disable the parallel work-stealing compression path.
|
||||
pub fn parallel_compress(mut self, enabled: bool) -> Self {
|
||||
self.parallel_compress = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the number of compression workers, clamped to at least one.
|
||||
pub fn parallel_workers(mut self, workers: usize) -> Self {
|
||||
self.parallel_workers = workers.max(1);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the zstd compression level.
|
||||
pub fn zstd_compression_level(mut self, level: i32) -> Self {
|
||||
self.zstd_compression_level = level;
|
||||
self
|
||||
}
|
||||
|
||||
/// Retry compression with gzip when zstd encoding fails.
|
||||
pub fn zstd_fallback_to_gzip(mut self, enabled: bool) -> Self {
|
||||
self.zstd_fallback_to_gzip = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the number of internal worker threads requested from zstd.
|
||||
pub fn zstd_workers(mut self, workers: usize) -> Self {
|
||||
self.zstd_workers = workers.max(1);
|
||||
self
|
||||
}
|
||||
|
||||
/// Finalize the builder into an immutable [`LogCleaner`].
|
||||
///
|
||||
/// Invalid glob patterns are ignored rather than failing construction, and
|
||||
/// codec-related numeric values are clamped into safe ranges.
|
||||
pub fn build(self) -> LogCleaner {
|
||||
let patterns = self
|
||||
.exclude_patterns
|
||||
@@ -456,6 +733,12 @@ impl LogCleanerBuilder {
|
||||
delete_empty_files: self.delete_empty_files,
|
||||
min_file_age_seconds: self.min_file_age_seconds,
|
||||
dry_run: self.dry_run,
|
||||
compression_algorithm: self.compression_algorithm,
|
||||
parallel_compress: self.parallel_compress,
|
||||
parallel_workers: self.parallel_workers.max(1),
|
||||
zstd_compression_level: self.zstd_compression_level.clamp(1, 21),
|
||||
zstd_fallback_to_gzip: self.zstd_fallback_to_gzip,
|
||||
zstd_workers: self.zstd_workers.max(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,17 +14,32 @@
|
||||
|
||||
//! Log-file cleanup subsystem.
|
||||
//!
|
||||
//! This module provides [`LogCleaner`], a configurable manager that
|
||||
//! periodically removes, compresses, or archives old rolling log files.
|
||||
//! This module exposes the high-level [`LogCleaner`] API used by the
|
||||
//! observability layer to manage rotated log files after they are no longer
|
||||
//! active. The implementation is intentionally split into small focused
|
||||
//! sub-modules so each concern can evolve independently without turning the
|
||||
//! cleaner into a monolithic state machine.
|
||||
//!
|
||||
//! At a high level, one cleanup pass follows this lifecycle:
|
||||
//!
|
||||
//! 1. scan the target directory and classify matching entries;
|
||||
//! 2. decide which regular log files exceed retention constraints;
|
||||
//! 3. optionally compress those files using the configured codec;
|
||||
//! 4. delete only files that are safe to remove;
|
||||
//! 5. separately expire already-compressed archives by age.
|
||||
//!
|
||||
//! The design favors operational safety over aggressive reclamation:
|
||||
//! compression and deletion are decoupled, symlinks are rejected on removal,
|
||||
//! and dry-run mode reports intent without mutating the filesystem.
|
||||
//!
|
||||
//! ## Sub-modules
|
||||
//!
|
||||
//! | Module | Responsibility |
|
||||
//! |-------------|----------------------------------------------------------|
|
||||
//! | `types` | Shared data types (`FileInfo`) |
|
||||
//! | `scanner` | Filesystem traversal — discovers eligible files |
|
||||
//! | `compress` | Gzip compression helper |
|
||||
//! | `core` | Core orchestration — selection, compression, deletion |
|
||||
//! | `types` | Shared enums and metadata (`FileInfo`, match/compression choices) |
|
||||
//! | `scanner` | Filesystem traversal, pattern matching, empty-file handling |
|
||||
//! | `compress` | Archive creation helpers for gzip and zstd |
|
||||
//! | `core` | Selection, parallel/serial processing, secure deletion |
|
||||
//!
|
||||
//! ## Usage
|
||||
//!
|
||||
@@ -60,6 +75,10 @@ mod core;
|
||||
mod scanner;
|
||||
pub mod types;
|
||||
|
||||
/// Primary entry point for the cleaner subsystem.
|
||||
///
|
||||
/// Re-exported from [`core`] so callers can construct a cleaner without
|
||||
/// knowing the internal module layout.
|
||||
pub use core::LogCleaner;
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -72,6 +91,7 @@ mod tests {
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
|
||||
/// Create a test log file with deterministic contents and size.
|
||||
fn create_log_file(dir: &Path, name: &str, size: usize) -> std::io::Result<()> {
|
||||
let path = dir.join(name);
|
||||
let mut f = File::create(path)?;
|
||||
|
||||
@@ -18,15 +18,23 @@
|
||||
//! The one exception is zero-byte file removal — when `delete_empty_files`
|
||||
//! is enabled, `scan_log_directory` removes empty regular files as part of
|
||||
//! the scan so that they are not counted in retention calculations.
|
||||
//!
|
||||
//! The scanner is also the first safety boundary of the cleaner pipeline. It
|
||||
//! performs a shallow directory walk, rejects symlinks by relying on
|
||||
//! `symlink_metadata`, and separates plain logs from pre-compressed archives so
|
||||
//! later stages can apply different retention rules without rescanning.
|
||||
|
||||
use super::types::{FileInfo, FileMatchMode};
|
||||
use rustfs_config::observability::DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION;
|
||||
use super::types::{CompressionAlgorithm, FileInfo, FileMatchMode};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::time::SystemTime;
|
||||
use tracing::debug;
|
||||
|
||||
/// Result of a single pass directory scan.
|
||||
///
|
||||
/// Separating regular logs from compressed archives keeps the selection logic
|
||||
/// straightforward: active retention limits apply to the former, while archive
|
||||
/// expiry rules apply to the latter.
|
||||
pub(super) struct LogScanResult {
|
||||
/// Regular log files eligible for deletion/compression.
|
||||
pub logs: Vec<FileInfo>,
|
||||
@@ -49,6 +57,7 @@ pub(super) struct LogScanResult {
|
||||
/// * `delete_empty_files` - When `true`, zero-byte regular files that match
|
||||
/// the pattern are deleted immediately inside this function and excluded
|
||||
/// from the returned [`LogScanResult`].
|
||||
/// * `dry_run` - When `true`, destructive actions are logged but not executed.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn scan_log_directory(
|
||||
log_dir: &Path,
|
||||
@@ -121,13 +130,19 @@ pub(super) fn scan_log_directory(
|
||||
}
|
||||
|
||||
// 3. Classify file type and check pattern match.
|
||||
let is_compressed = filename.ends_with(DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION);
|
||||
let matched_suffix = CompressionAlgorithm::compressed_suffixes()
|
||||
.into_iter()
|
||||
.find(|suffix| filename.ends_with(suffix));
|
||||
let is_compressed = matched_suffix.is_some();
|
||||
|
||||
// For matching, we need the "base" name.
|
||||
// If compressed: "foo.log.gz" -> check "foo.log"
|
||||
// If regular: "foo.log" -> check "foo.log"
|
||||
let name_to_match = if is_compressed {
|
||||
&filename[..filename.len() - DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION.len()]
|
||||
// Perform matching on the logical log filename.
|
||||
// Examples:
|
||||
// - regular log: `foo.log.1` -> `foo.log.1`
|
||||
// - archive: `foo.log.1.gz` -> `foo.log.1`
|
||||
// This allows the same include/exclude pattern configuration to apply
|
||||
// to both raw and already-compressed generations.
|
||||
let name_to_match = if let Some(suffix) = matched_suffix {
|
||||
&filename[..filename.len() - suffix.len()]
|
||||
} else {
|
||||
filename
|
||||
};
|
||||
@@ -149,11 +164,13 @@ pub(super) fn scan_log_directory(
|
||||
Err(_) => continue, // Skip files where we can't read modification time
|
||||
};
|
||||
|
||||
// 5. Handle zero-byte files (Regular logs only).
|
||||
// We generally don't delete empty compressed files implicitly, but let's stick to regular files logic.
|
||||
// 5. Handle zero-byte files (regular logs only).
|
||||
// Empty compressed artifacts are left alone here because they belong
|
||||
// to the archive-retention path and should not disappear outside that
|
||||
// explicit policy.
|
||||
if !is_compressed && file_size == 0 && delete_empty_files {
|
||||
if !dry_run {
|
||||
if let Err(e) = std::fs::remove_file(&path) {
|
||||
if let Err(e) = fs::remove_file(&path) {
|
||||
tracing::warn!("Failed to delete empty file {:?}: {}", path, e);
|
||||
} else {
|
||||
debug!("Deleted empty file: {:?}", path);
|
||||
@@ -164,8 +181,9 @@ pub(super) fn scan_log_directory(
|
||||
continue;
|
||||
}
|
||||
|
||||
// 6. Age Check (Regular logs only).
|
||||
// Compressed files have their own retention check in the caller.
|
||||
// 6. Age gate regular logs only.
|
||||
// Compressed files deliberately bypass this check because archive
|
||||
// expiry is driven by a dedicated retention horizon in the caller.
|
||||
if !is_compressed
|
||||
&& let Ok(age) = now.duration_since(modified)
|
||||
&& age.as_secs() < min_file_age_seconds
|
||||
|
||||
@@ -13,7 +13,18 @@
|
||||
// limitations under the License.
|
||||
|
||||
//! Shared types used across the log-cleanup sub-modules.
|
||||
//!
|
||||
//! These types deliberately stay lightweight because they are passed between
|
||||
//! the scanner, selector, compressor, and deletion stages. Keeping them small
|
||||
//! and explicit makes the cleaner easier to reason about and cheaper to move
|
||||
//! across worker threads in the parallel pipeline.
|
||||
|
||||
use rustfs_config::observability::{
|
||||
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP,
|
||||
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION,
|
||||
DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MATCH_MODE_PREFIX,
|
||||
DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION,
|
||||
};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::time::SystemTime;
|
||||
@@ -33,8 +44,21 @@ impl FileMatchMode {
|
||||
/// Returns the string representation of the match mode.
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
FileMatchMode::Prefix => "prefix",
|
||||
FileMatchMode::Suffix => "suffix",
|
||||
FileMatchMode::Prefix => DEFAULT_OBS_LOG_MATCH_MODE_PREFIX,
|
||||
FileMatchMode::Suffix => DEFAULT_OBS_LOG_MATCH_MODE,
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a config value into a [`FileMatchMode`].
|
||||
///
|
||||
/// Any non-`prefix` value falls back to [`FileMatchMode::Suffix`] to keep
|
||||
/// configuration handling permissive and aligned with the historical
|
||||
/// cleaner default used by rolling log filenames.
|
||||
pub fn from_config_str(value: &str) -> Self {
|
||||
if value.trim().eq_ignore_ascii_case(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX) {
|
||||
Self::Prefix
|
||||
} else {
|
||||
Self::Suffix
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,17 +69,135 @@ impl fmt::Display for FileMatchMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Compression algorithm used by the cleaner.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum CompressionAlgorithm {
|
||||
/// Gzip keeps backward compatibility with existing `.gz` archives.
|
||||
Gzip,
|
||||
/// Zstd provides better ratio and higher decompression throughput.
|
||||
Zstd,
|
||||
}
|
||||
|
||||
impl CompressionAlgorithm {
|
||||
/// Parse a normalized lowercase configuration token or extension alias.
|
||||
fn parse_normalized(value: &str) -> Option<Self> {
|
||||
if value == DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP || value == DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION {
|
||||
Some(Self::Gzip)
|
||||
} else if value == DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD || value == DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION {
|
||||
Some(Self::Zstd)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse from a user-facing configuration string.
|
||||
///
|
||||
/// Supported values include both semantic names (`gzip`, `zstd`) and file
|
||||
/// extension aliases (`gz`, `zst`). Unknown values intentionally fall back
|
||||
/// to the crate default so observability startup remains resilient.
|
||||
pub fn from_config_str(value: &str) -> Self {
|
||||
let normalized = value.trim().to_ascii_lowercase();
|
||||
Self::parse_normalized(&normalized).unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Archive suffix (without dot) used for this algorithm.
|
||||
///
|
||||
/// The returned value is suitable for appending to an existing filename,
|
||||
/// rather than replacing the source extension.
|
||||
pub fn extension(self) -> &'static str {
|
||||
match self {
|
||||
Self::Gzip => DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION.trim_start_matches('.'),
|
||||
Self::Zstd => DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION.trim_start_matches('.'),
|
||||
}
|
||||
}
|
||||
|
||||
/// Supported compressed suffixes used by scanner retention logic.
|
||||
///
|
||||
/// The scanner uses this list to recognize already-archived files and to
|
||||
/// keep them on a separate retention path from plain log files.
|
||||
pub fn compressed_suffixes() -> [&'static str; 2] {
|
||||
[
|
||||
DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION,
|
||||
DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION,
|
||||
]
|
||||
}
|
||||
|
||||
/// Stable lowercase string form used in logs and configuration echoes.
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Gzip => DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP,
|
||||
Self::Zstd => DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for CompressionAlgorithm {
|
||||
type Err = &'static str;
|
||||
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
let normalized = value.trim().to_ascii_lowercase();
|
||||
Self::parse_normalized(&normalized).ok_or("invalid compression algorithm")
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CompressionAlgorithm {
|
||||
fn default() -> Self {
|
||||
Self::from_config_str(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for CompressionAlgorithm {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
/// Worker-thread default used by parallel compressor.
|
||||
///
|
||||
/// The worker count follows CPU capacity but stays within [4, 8] to keep
|
||||
/// throughput stable and avoid oversubscription. The lower bound helps the
|
||||
/// work-stealing path on small machines still exercise concurrency, while the
|
||||
/// upper bound avoids swamping the host when each task may also use internal
|
||||
/// codec threads.
|
||||
pub fn default_parallel_workers() -> usize {
|
||||
num_cpus::get().clamp(4, 8)
|
||||
}
|
||||
|
||||
/// Metadata for a single log file discovered by the scanner.
|
||||
///
|
||||
/// Carries enough information to make cleanup decisions (sort by age, compare
|
||||
/// size against limits, etc.) without re-reading filesystem metadata on every
|
||||
/// operation.
|
||||
/// This snapshot is intentionally immutable after discovery. The cleaner uses
|
||||
/// it to sort candidates by age, evaluate retention constraints, and report
|
||||
/// deletion metrics without re-reading metadata during every later stage.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct FileInfo {
|
||||
/// Absolute path to the file.
|
||||
/// Absolute or scanner-produced path to the file on disk.
|
||||
pub path: PathBuf,
|
||||
/// File size in bytes at the time of discovery.
|
||||
///
|
||||
/// This value is used for retention accounting and freed-byte metrics.
|
||||
pub size: u64,
|
||||
/// Last-modification timestamp from the filesystem.
|
||||
///
|
||||
/// The selection phase sorts on this timestamp so the oldest files are
|
||||
/// processed first.
|
||||
pub modified: SystemTime,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::CompressionAlgorithm;
|
||||
|
||||
#[test]
|
||||
fn compression_algorithm_accepts_full_names_and_aliases() {
|
||||
assert_eq!(CompressionAlgorithm::from_config_str("gzip"), CompressionAlgorithm::Gzip);
|
||||
assert_eq!(CompressionAlgorithm::from_config_str("GZ"), CompressionAlgorithm::Gzip);
|
||||
assert_eq!(CompressionAlgorithm::from_config_str("zstd"), CompressionAlgorithm::Zstd);
|
||||
assert_eq!(CompressionAlgorithm::from_config_str(" zst "), CompressionAlgorithm::Zstd);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compression_algorithm_defaults_or_errors_for_invalid_values() {
|
||||
assert_eq!(CompressionAlgorithm::from_config_str("brotli"), CompressionAlgorithm::default());
|
||||
assert!("brotli".parse::<CompressionAlgorithm>().is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,17 +24,20 @@
|
||||
|
||||
use rustfs_config::observability::{
|
||||
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS, DEFAULT_OBS_LOG_COMPRESS_OLD_FILES,
|
||||
DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN,
|
||||
DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES,
|
||||
DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT,
|
||||
DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES,
|
||||
DEFAULT_OBS_LOG_DRY_RUN, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE,
|
||||
DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS,
|
||||
DEFAULT_OBS_LOG_PARALLEL_COMPRESS, DEFAULT_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL,
|
||||
DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, DEFAULT_OBS_LOG_ZSTD_WORKERS, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT,
|
||||
ENV_OBS_LOG_CLEANUP_INTERVAL_SECONDS, ENV_OBS_LOG_COMPRESS_OLD_FILES, ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
|
||||
ENV_OBS_LOG_DELETE_EMPTY_FILES, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_DRY_RUN, ENV_OBS_LOG_ENDPOINT,
|
||||
ENV_OBS_LOG_EXCLUDE_PATTERNS, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, ENV_OBS_LOG_KEEP_FILES,
|
||||
ENV_OBS_LOG_MATCH_MODE, ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES,
|
||||
ENV_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL,
|
||||
ENV_OBS_LOGS_EXPORT_ENABLED, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, ENV_OBS_METRICS_EXPORT_ENABLED,
|
||||
ENV_OBS_PROFILING_ENDPOINT, ENV_OBS_PROFILING_EXPORT_ENABLED, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME,
|
||||
ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_TRACES_EXPORT_ENABLED, ENV_OBS_USE_STDOUT,
|
||||
ENV_OBS_LOG_COMPRESSION_ALGORITHM, ENV_OBS_LOG_DELETE_EMPTY_FILES, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_DRY_RUN,
|
||||
ENV_OBS_LOG_ENDPOINT, ENV_OBS_LOG_EXCLUDE_PATTERNS, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL,
|
||||
ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_MATCH_MODE, ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES,
|
||||
ENV_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_LOG_PARALLEL_COMPRESS, ENV_OBS_LOG_PARALLEL_WORKERS, ENV_OBS_LOG_ROTATION_TIME,
|
||||
ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, ENV_OBS_LOG_ZSTD_WORKERS,
|
||||
ENV_OBS_LOGGER_LEVEL, ENV_OBS_LOGS_EXPORT_ENABLED, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT,
|
||||
ENV_OBS_METRICS_EXPORT_ENABLED, ENV_OBS_PROFILING_ENDPOINT, ENV_OBS_PROFILING_EXPORT_ENABLED, ENV_OBS_SAMPLE_RATIO,
|
||||
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_TRACES_EXPORT_ENABLED, ENV_OBS_USE_STDOUT,
|
||||
};
|
||||
use rustfs_config::{
|
||||
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_FILENAME,
|
||||
@@ -80,6 +83,11 @@ use std::env;
|
||||
/// let config = OtelConfig::extract_otel_config_from_env(
|
||||
/// Some("http://otel-collector:4318".to_string())
|
||||
/// );
|
||||
///
|
||||
/// // Compression related env mapping examples:
|
||||
/// // RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM=zstd
|
||||
/// // RUSTFS_OBS_LOG_PARALLEL_COMPRESS=true
|
||||
/// // RUSTFS_OBS_LOG_PARALLEL_WORKERS=6
|
||||
/// ```
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub struct OtelConfig {
|
||||
@@ -146,6 +154,18 @@ pub struct OtelConfig {
|
||||
pub log_compress_old_files: Option<bool>,
|
||||
/// Gzip compression level `1`–`9` (default: `6`).
|
||||
pub log_gzip_compression_level: Option<u32>,
|
||||
/// Compression algorithm: `gzip` or `zstd` (default: `zstd`).
|
||||
pub log_compression_algorithm: Option<String>,
|
||||
/// Enable crossbeam work-stealing parallel compression (default: `true`).
|
||||
pub log_parallel_compress: Option<bool>,
|
||||
/// Worker count for parallel compression (default: `6`).
|
||||
pub log_parallel_workers: Option<usize>,
|
||||
/// Zstd compression level `1`–`21` (default: `8`).
|
||||
pub log_zstd_compression_level: Option<i32>,
|
||||
/// Fallback to gzip when zstd compression fails (default: `true`).
|
||||
pub log_zstd_fallback_to_gzip: Option<bool>,
|
||||
/// Internal zstdmt workers per job (default: `1`).
|
||||
pub log_zstd_workers: Option<usize>,
|
||||
/// Delete compressed archives older than this many days; `0` = keep forever
|
||||
/// (default: `30`).
|
||||
pub log_compressed_file_retention_days: Option<u64>,
|
||||
@@ -254,6 +274,21 @@ impl OtelConfig {
|
||||
ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL,
|
||||
DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL as u64,
|
||||
) as u32),
|
||||
log_compression_algorithm: Some(get_env_str(
|
||||
ENV_OBS_LOG_COMPRESSION_ALGORITHM,
|
||||
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM,
|
||||
)),
|
||||
log_parallel_compress: Some(get_env_bool(ENV_OBS_LOG_PARALLEL_COMPRESS, DEFAULT_OBS_LOG_PARALLEL_COMPRESS)),
|
||||
log_parallel_workers: Some(get_env_usize(ENV_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_PARALLEL_WORKERS)),
|
||||
log_zstd_compression_level: Some(get_env_u64(
|
||||
ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL,
|
||||
DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL as u64,
|
||||
) as i32),
|
||||
log_zstd_fallback_to_gzip: Some(get_env_bool(
|
||||
ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP,
|
||||
DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP,
|
||||
)),
|
||||
log_zstd_workers: Some(get_env_usize(ENV_OBS_LOG_ZSTD_WORKERS, DEFAULT_OBS_LOG_ZSTD_WORKERS)),
|
||||
log_compressed_file_retention_days: Some(get_env_u64(
|
||||
ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
|
||||
DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
|
||||
|
||||
@@ -12,10 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{AppConfig, GlobalError, OtelGuard, SystemObserver, telemetry::init_telemetry};
|
||||
use crate::{AppConfig, GlobalError, OtelConfig, OtelGuard, SystemObserver, telemetry::init_telemetry};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
/// Global guard for OpenTelemetry tracing
|
||||
static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
|
||||
@@ -23,11 +23,40 @@ static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
|
||||
/// Flag indicating if observability metric is enabled
|
||||
pub(crate) static OBSERVABILITY_METRIC_ENABLED: OnceCell<bool> = OnceCell::const_new();
|
||||
|
||||
/// Namespaced metrics for cleaner and rolling logging.
|
||||
pub(crate) const METRIC_LOG_CLEANER_DELETED_FILES_TOTAL: &str = "rustfs.log_cleaner.deleted_files_total";
|
||||
pub(crate) const METRIC_LOG_CLEANER_FREED_BYTES_TOTAL: &str = "rustfs.log_cleaner.freed_bytes_total";
|
||||
pub(crate) const METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS: &str = "rustfs.log_cleaner.compress_duration_seconds";
|
||||
pub(crate) const METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE: &str = "rustfs.log_cleaner.steal_success_rate";
|
||||
pub(crate) const METRIC_LOG_CLEANER_RUNS_TOTAL: &str = "rustfs.log_cleaner.runs_total";
|
||||
pub(crate) const METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL: &str = "rustfs.log_cleaner.run_failures_total";
|
||||
pub(crate) const METRIC_LOG_CLEANER_ROTATION_TOTAL: &str = "rustfs.log_cleaner.rotation_total";
|
||||
pub(crate) const METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL: &str = "rustfs.log_cleaner.rotation_failures_total";
|
||||
pub(crate) const METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS: &str = "rustfs.log_cleaner.rotation_duration_seconds";
|
||||
pub(crate) const METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES: &str = "rustfs.log_cleaner.active_file_size_bytes";
|
||||
|
||||
/// Check whether Observability metric is enabled
|
||||
pub fn observability_metric_enabled() -> bool {
|
||||
OBSERVABILITY_METRIC_ENABLED.get().copied().unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Set the global observability metrics flag once.
|
||||
///
|
||||
/// When this function is called multiple times, only the first value is kept
|
||||
/// and later values are ignored to preserve OnceCell semantics.
|
||||
pub(crate) fn set_observability_metric_enabled(enabled: bool) {
|
||||
if OBSERVABILITY_METRIC_ENABLED.set(enabled).is_err()
|
||||
&& let Some(current) = OBSERVABILITY_METRIC_ENABLED.get()
|
||||
&& *current != enabled
|
||||
{
|
||||
warn!(
|
||||
current = *current,
|
||||
requested = enabled,
|
||||
"OBSERVABILITY_METRIC_ENABLED was already initialized; keeping original value"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the observability module
|
||||
///
|
||||
/// # Parameters
|
||||
@@ -51,11 +80,43 @@ pub fn observability_metric_enabled() -> bool {
|
||||
pub async fn init_obs(endpoint: Option<String>) -> Result<OtelGuard, GlobalError> {
|
||||
// Load the configuration file
|
||||
let config = AppConfig::new_with_endpoint(endpoint);
|
||||
init_obs_with_config(&config.observability).await
|
||||
}
|
||||
|
||||
let otel_guard = init_telemetry(&config.observability)?;
|
||||
// Server will be created per connection - this ensures isolation
|
||||
/// Initialize the observability module with an explicit [`OtelConfig`].
|
||||
///
|
||||
/// This is the lower-level counterpart to [`init_obs`]: it accepts a fully
|
||||
/// constructed [`OtelConfig`] rather than building one from an endpoint URL.
|
||||
/// Useful when the config is already available (e.g., embedded in a larger
|
||||
/// application config struct) or when unit-testing with custom settings.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `config`: A pre-built [`OtelConfig`] to drive all telemetry backends.
|
||||
///
|
||||
/// # Returns
|
||||
/// An [`OtelGuard`] that must be kept alive for the lifetime of the
|
||||
/// application. Dropping it flushes and shuts down all providers.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns [`GlobalError`] when the underlying telemetry backend fails to
|
||||
/// initialise (e.g., cannot create the log directory, or an OTLP exporter
|
||||
/// cannot connect).
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use rustfs_obs::{AppConfig, init_obs_with_config};
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// let config = AppConfig::new_with_endpoint(Some("http://localhost:4318".to_string()));
|
||||
/// let _guard = init_obs_with_config(&config.observability)
|
||||
/// .await
|
||||
/// .expect("observability init failed");
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn init_obs_with_config(config: &OtelConfig) -> Result<OtelGuard, GlobalError> {
|
||||
let otel_guard = init_telemetry(config)?;
|
||||
tokio::spawn(async move {
|
||||
// Record the PID-related metrics of the current process
|
||||
let obs_result = SystemObserver::init_process_observer().await;
|
||||
match obs_result {
|
||||
Ok(_) => {
|
||||
@@ -66,7 +127,6 @@ pub async fn init_obs(endpoint: Option<String>) -> Result<OtelGuard, GlobalError
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(otel_guard)
|
||||
}
|
||||
|
||||
@@ -121,9 +181,79 @@ pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GlobalError> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const README: &str = include_str!("../README.md");
|
||||
const GRAFANA_DASHBOARD: &str = include_str!("../../../.docker/observability/grafana/dashboards/rustfs.json");
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_uninitialized_guard() {
|
||||
let result = get_global_guard();
|
||||
assert!(matches!(result, Err(GlobalError::NotInitialized)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_cleaner_metric_constants_use_rustfs_prefix() {
|
||||
let metrics = [
|
||||
METRIC_LOG_CLEANER_DELETED_FILES_TOTAL,
|
||||
METRIC_LOG_CLEANER_FREED_BYTES_TOTAL,
|
||||
METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS,
|
||||
METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE,
|
||||
METRIC_LOG_CLEANER_RUNS_TOTAL,
|
||||
METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL,
|
||||
METRIC_LOG_CLEANER_ROTATION_TOTAL,
|
||||
METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL,
|
||||
METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS,
|
||||
METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES,
|
||||
];
|
||||
|
||||
for metric in metrics {
|
||||
assert!(
|
||||
metric.starts_with("rustfs.log_cleaner."),
|
||||
"metric '{metric}' should use rustfs.log_cleaner.* namespace"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_readme_contains_grafana_dashboard_draft_section() {
|
||||
assert!(README.contains("## Cleaner & Rotation Metrics"));
|
||||
assert!(README.contains("### Grafana Dashboard JSON Draft (Ready to Import)"));
|
||||
assert!(README.contains("\"uid\": \"rustfs-log-cleaner\""));
|
||||
assert!(README.contains("rustfs_log_cleaner_runs_total"));
|
||||
assert!(README.contains("rustfs_log_cleaner_rotation_failures_total"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_readme_contains_promql_templates_section() {
|
||||
assert!(README.contains("### PromQL Templates"));
|
||||
assert!(README.contains("Cleanup failure ratio"));
|
||||
assert!(README.contains("histogram_quantile(0.95"));
|
||||
assert!(README.contains("max(rustfs_log_cleaner_active_file_size_bytes)"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_grafana_dashboard_contains_log_cleaner_row() {
|
||||
assert!(GRAFANA_DASHBOARD.contains("\"title\": \"Log Cleaner\""));
|
||||
assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_runs_total"));
|
||||
assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_rotation_failures_total"));
|
||||
assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_active_file_size_bytes"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_readme_mentions_deployed_dashboard_path() {
|
||||
assert!(README.contains(".docker/observability/grafana/dashboards/rustfs.json"));
|
||||
assert!(README.contains("row title: `Log Cleaner`"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_init_obs_with_config_signature_is_correct() {
|
||||
// Compile-time check: verifies that `init_obs_with_config` is public and
|
||||
// accepts `&OtelConfig`, matching the README documentation.
|
||||
// The future is created but intentionally not polled — this avoids
|
||||
// initialising the global tracing subscriber in a unit-test context.
|
||||
fn _type_check() {
|
||||
let config = OtelConfig::default();
|
||||
let _future = init_obs_with_config(&config);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,20 +25,27 @@
|
||||
//!
|
||||
//! The function [`init_local_logging`] is the single entry point for both
|
||||
//! cases; callers do **not** need to distinguish between stdout and file modes.
|
||||
//!
|
||||
//! The file-backed mode delegates retention and compression to
|
||||
//! [`crate::cleaner`], which keeps the logging setup code focused on subscriber
|
||||
//! construction while still allowing periodic housekeeping in the background.
|
||||
|
||||
use super::guard::OtelGuard;
|
||||
use crate::TelemetryError;
|
||||
use crate::cleaner::LogCleaner;
|
||||
use crate::cleaner::types::FileMatchMode;
|
||||
use crate::cleaner::types::{CompressionAlgorithm, FileMatchMode};
|
||||
use crate::config::OtelConfig;
|
||||
use crate::global::OBSERVABILITY_METRIC_ENABLED;
|
||||
use crate::global::{METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL, METRIC_LOG_CLEANER_RUNS_TOTAL, set_observability_metric_enabled};
|
||||
use crate::telemetry::filter::build_env_filter;
|
||||
use crate::telemetry::rolling::{RollingAppender, Rotation};
|
||||
use metrics::counter;
|
||||
use rustfs_config::observability::{
|
||||
DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS, DEFAULT_OBS_LOG_COMPRESS_OLD_FILES, DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
|
||||
DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL,
|
||||
DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS,
|
||||
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN,
|
||||
DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES,
|
||||
DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, DEFAULT_OBS_LOG_PARALLEL_COMPRESS,
|
||||
DEFAULT_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP,
|
||||
DEFAULT_OBS_LOG_ZSTD_WORKERS,
|
||||
};
|
||||
use rustfs_config::{APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_STDOUT_ENABLED};
|
||||
use std::sync::Arc;
|
||||
@@ -111,6 +118,8 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo
|
||||
let env_filter = build_env_filter(logger_level, None);
|
||||
let (nb, guard) = tracing_appender::non_blocking(std::io::stdout());
|
||||
|
||||
// Keep stdout formatting JSON-shaped even in local-only mode so operators
|
||||
// can ship the same log schema to external collectors if needed.
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_timer(LocalTime::rfc_3339())
|
||||
.with_target(true)
|
||||
@@ -131,7 +140,7 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo
|
||||
.with(fmt_layer)
|
||||
.init();
|
||||
|
||||
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
|
||||
set_observability_metric_enabled(false);
|
||||
counter!("rustfs.start.total").increment(1);
|
||||
info!("Init stdout logging (level: {})", logger_level);
|
||||
|
||||
@@ -154,6 +163,10 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo
|
||||
/// Called by [`init_local_logging`] when a log directory is present.
|
||||
/// Handles directory creation, permission enforcement (Unix), file appender
|
||||
/// setup, optional stdout mirror, and log-cleanup task spawning.
|
||||
///
|
||||
/// The function intentionally performs all fallible filesystem preparation
|
||||
/// before registering the subscriber so startup failures are reported early and
|
||||
/// do not leave partially initialized tracing state behind.
|
||||
fn init_file_logging_internal(
|
||||
config: &OtelConfig,
|
||||
log_directory: &str,
|
||||
@@ -181,11 +194,9 @@ fn init_file_logging_internal(
|
||||
.unwrap_or(DEFAULT_LOG_ROTATION_TIME)
|
||||
.to_lowercase();
|
||||
|
||||
// Determine match mode from config, defaulting to Suffix
|
||||
let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() {
|
||||
Some("prefix") => FileMatchMode::Prefix,
|
||||
_ => FileMatchMode::Suffix,
|
||||
};
|
||||
// Match mode controls how the rolling filename is recognized later by the
|
||||
// cleaner. Suffix mode fits timestamp-prefixed filenames especially well.
|
||||
let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE));
|
||||
|
||||
let rotation = match rotation_str.as_str() {
|
||||
"minutely" => Rotation::Minutely,
|
||||
@@ -207,7 +218,8 @@ fn init_file_logging_internal(
|
||||
let env_filter = build_env_filter(logger_level, None);
|
||||
let span_events = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL };
|
||||
|
||||
// File layer writes JSON without ANSI codes.
|
||||
// File output stays machine-readable and free of ANSI sequences so the
|
||||
// resulting files are safe to parse or ship to log processors.
|
||||
let file_layer = tracing_subscriber::fmt::layer()
|
||||
.with_timer(LocalTime::rfc_3339())
|
||||
.with_target(true)
|
||||
@@ -223,7 +235,8 @@ fn init_file_logging_internal(
|
||||
.with_span_events(span_events.clone());
|
||||
|
||||
// Optional stdout mirror: enabled explicitly via `log_stdout_enabled`, or
|
||||
// unconditionally in non-production environments.
|
||||
// unconditionally in non-production environments so developers still see
|
||||
// immediate terminal output while file rotation remains enabled.
|
||||
let (stdout_layer, stdout_guard) = if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production {
|
||||
let (stdout_nb, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
|
||||
let enable_color = std::io::stdout().is_terminal();
|
||||
@@ -255,7 +268,7 @@ fn init_file_logging_internal(
|
||||
.with(stdout_layer)
|
||||
.init();
|
||||
|
||||
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
|
||||
set_observability_metric_enabled(false);
|
||||
|
||||
// ── 5. Start background cleanup task ─────────────────────────────────────
|
||||
let cleanup_handle = spawn_cleanup_task(config, log_directory, log_filename, keep_files);
|
||||
@@ -284,6 +297,8 @@ fn init_file_logging_internal(
|
||||
/// Tightens permissions to `0755` if the directory is more permissive.
|
||||
/// This prevents world-writable log directories from being a security hazard.
|
||||
/// No-ops if permissions are already `0755` or stricter.
|
||||
///
|
||||
/// The function never broadens permissions; it is strictly a hardening step.
|
||||
#[cfg(unix)]
|
||||
pub fn ensure_dir_permissions(log_directory: &str) -> Result<(), TelemetryError> {
|
||||
use std::fs::Permissions;
|
||||
@@ -325,6 +340,10 @@ pub fn ensure_dir_permissions(log_directory: &str) -> Result<(), TelemetryError>
|
||||
/// Tokio runtime and should be aborted (via the returned `JoinHandle`) when
|
||||
/// the application shuts down.
|
||||
///
|
||||
/// The asynchronous loop itself remains lightweight: each cleanup pass is
|
||||
/// delegated to `spawn_blocking` because directory traversal, compression, and
|
||||
/// deletion are inherently blocking filesystem operations.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `config` - Observability config containing cleanup parameters.
|
||||
/// * `log_directory` - Directory path of the rolling log files.
|
||||
@@ -341,16 +360,14 @@ pub fn spawn_cleanup_task(
|
||||
keep_files: usize,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
let log_dir = std::path::PathBuf::from(log_directory);
|
||||
// Use suffix matching for log files like "2026-03-01-06-21.rustfs.log"
|
||||
// where "rustfs.log" is the suffix.
|
||||
// Use suffix matching for log files like `2026-03-01-06-21.rustfs.log`
|
||||
// where `rustfs.log` is the stable suffix generated by the rolling appender.
|
||||
let file_pattern = config.log_filename.as_deref().unwrap_or(log_filename).to_string();
|
||||
let active_filename = file_pattern.clone();
|
||||
|
||||
// Determine match mode from config, defaulting to Suffix
|
||||
let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() {
|
||||
Some("prefix") => FileMatchMode::Prefix,
|
||||
_ => FileMatchMode::Suffix,
|
||||
};
|
||||
// Determine match mode from config, defaulting to the repository-wide
|
||||
// observability setting when the caller leaves it unset.
|
||||
let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE));
|
||||
|
||||
let max_total_size = config
|
||||
.log_max_total_size_bytes
|
||||
@@ -362,6 +379,21 @@ pub fn spawn_cleanup_task(
|
||||
let gzip_level = config
|
||||
.log_gzip_compression_level
|
||||
.unwrap_or(DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL);
|
||||
let compression_algorithm = CompressionAlgorithm::from_config_str(
|
||||
config
|
||||
.log_compression_algorithm
|
||||
.as_deref()
|
||||
.unwrap_or(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM),
|
||||
);
|
||||
let parallel_compress = config.log_parallel_compress.unwrap_or(DEFAULT_OBS_LOG_PARALLEL_COMPRESS);
|
||||
let parallel_workers = config.log_parallel_workers.unwrap_or(DEFAULT_OBS_LOG_PARALLEL_WORKERS);
|
||||
let zstd_level = config
|
||||
.log_zstd_compression_level
|
||||
.unwrap_or(DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL);
|
||||
let zstd_fallback_to_gzip = config
|
||||
.log_zstd_fallback_to_gzip
|
||||
.unwrap_or(DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP);
|
||||
let zstd_workers = config.log_zstd_workers.unwrap_or(DEFAULT_OBS_LOG_ZSTD_WORKERS);
|
||||
let retention_days = config
|
||||
.log_compressed_file_retention_days
|
||||
.unwrap_or(DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS);
|
||||
@@ -387,6 +419,14 @@ pub fn spawn_cleanup_task(
|
||||
.max_single_file_size_bytes(max_single_file_size)
|
||||
.compress_old_files(compress)
|
||||
.gzip_compression_level(gzip_level)
|
||||
// Compression behavior stays fully config-driven, but the builder
|
||||
// clamps unsafe numeric values and preserves sensible defaults.
|
||||
.compression_algorithm(compression_algorithm)
|
||||
.parallel_compress(parallel_compress)
|
||||
.parallel_workers(parallel_workers)
|
||||
.zstd_compression_level(zstd_level)
|
||||
.zstd_fallback_to_gzip(zstd_fallback_to_gzip)
|
||||
.zstd_workers(zstd_workers)
|
||||
.compressed_file_retention_days(retention_days)
|
||||
.exclude_patterns(exclude_patterns)
|
||||
.delete_empty_files(delete_empty)
|
||||
@@ -395,17 +435,37 @@ pub fn spawn_cleanup_task(
|
||||
.build(),
|
||||
);
|
||||
|
||||
info!(
|
||||
compression_algorithm = %compression_algorithm,
|
||||
parallel_compress,
|
||||
parallel_workers,
|
||||
zstd_level,
|
||||
zstd_fallback_to_gzip,
|
||||
zstd_workers,
|
||||
"log cleaner compression profile configured"
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(cleanup_interval));
|
||||
loop {
|
||||
// Wait for the next scheduled tick before dispatching another pass.
|
||||
// The blocking filesystem work runs on a dedicated blocking thread.
|
||||
interval.tick().await;
|
||||
let cleaner_clone = cleaner.clone();
|
||||
let result = tokio::task::spawn_blocking(move || cleaner_clone.cleanup()).await;
|
||||
|
||||
match result {
|
||||
Ok(Ok(_)) => {} // Success
|
||||
Ok(Err(e)) => tracing::warn!("Log cleanup failed: {}", e),
|
||||
Err(e) => tracing::warn!("Log cleanup task panicked: {}", e),
|
||||
Ok(Ok(_)) => {
|
||||
counter!(METRIC_LOG_CLEANER_RUNS_TOTAL).increment(1);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
counter!(METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL).increment(1);
|
||||
tracing::warn!("Log cleanup failed: {}", e);
|
||||
}
|
||||
Err(e) => {
|
||||
counter!(METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL).increment(1);
|
||||
tracing::warn!("Log cleanup task panicked: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -418,6 +478,7 @@ mod tests {
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
/// Invalid file names should be reported as errors instead of panicking.
|
||||
fn test_init_file_logging_invalid_filename_does_not_panic() {
|
||||
let temp_dir = tempdir().expect("create temp dir");
|
||||
let temp_path = temp_dir.path().to_str().expect("temp dir path is utf-8");
|
||||
|
||||
@@ -31,10 +31,14 @@
|
||||
//!
|
||||
//! All exporters use **HTTP binary** (Protobuf) encoding with **gzip**
|
||||
//! compression for efficiency over the wire.
|
||||
//!
|
||||
//! If log export is not configured, this module deliberately falls back to the
|
||||
//! same rolling-file logging path used by the local backend so applications can
|
||||
//! combine OTLP traces/metrics with on-disk logs.
|
||||
|
||||
use crate::cleaner::types::FileMatchMode;
|
||||
use crate::config::OtelConfig;
|
||||
use crate::global::OBSERVABILITY_METRIC_ENABLED;
|
||||
use crate::global::set_observability_metric_enabled;
|
||||
use crate::telemetry::filter::build_env_filter;
|
||||
use crate::telemetry::guard::OtelGuard;
|
||||
// Import helper functions from local.rs (sibling module)
|
||||
@@ -53,7 +57,7 @@ use opentelemetry_sdk::{
|
||||
metrics::{PeriodicReader, SdkMeterProvider},
|
||||
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
|
||||
};
|
||||
use rustfs_config::observability::DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES;
|
||||
use rustfs_config::observability::{DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES};
|
||||
use rustfs_config::{
|
||||
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_STDOUT_ENABLED, DEFAULT_OBS_LOGS_EXPORT_ENABLED,
|
||||
DEFAULT_OBS_METRICS_EXPORT_ENABLED, DEFAULT_OBS_TRACES_EXPORT_ENABLED, METER_INTERVAL, SAMPLE_RATIO,
|
||||
@@ -97,6 +101,8 @@ pub(super) fn init_observability_http(
|
||||
is_production: bool,
|
||||
) -> Result<OtelGuard, TelemetryError> {
|
||||
// ── Resource & sampling ──────────────────────────────────────────────────
|
||||
// Build the common resource once so all enabled signals report the same
|
||||
// service identity and deployment metadata.
|
||||
let res = build_resource(config);
|
||||
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME).to_owned();
|
||||
let use_stdout = config.use_stdout.unwrap_or(!is_production);
|
||||
@@ -134,8 +140,9 @@ pub(super) fn init_observability_http(
|
||||
}
|
||||
});
|
||||
|
||||
// If log_endpoint is not explicitly set, fallback to root_ep/v1/logs ONLY if root_ep is present.
|
||||
// If both are empty, log_ep is empty, which triggers the fallback to file logging logic.
|
||||
// If `log_endpoint` is not explicitly set, fall back to `root_ep/v1/logs`
|
||||
// only when a root endpoint exists. An empty result intentionally triggers
|
||||
// the file-logging path below instead of silently disabling application logs.
|
||||
let log_ep: String = config
|
||||
.log_endpoint
|
||||
.as_deref()
|
||||
@@ -159,6 +166,8 @@ pub(super) fn init_observability_http(
|
||||
let profiling_agent = init_profiler(config);
|
||||
|
||||
// ── Logger Logic ──────────────────────────────────────────────────────────
|
||||
// Logging is the only signal that may intentionally route to either OTLP
|
||||
// or local files depending on configuration completeness.
|
||||
let mut logger_provider: Option<SdkLoggerProvider> = None;
|
||||
let mut otel_bridge = None;
|
||||
let mut file_layer_opt = None; // File layer (File mode)
|
||||
@@ -179,11 +188,14 @@ pub(super) fn init_observability_http(
|
||||
.as_ref()
|
||||
.map(|p| OpenTelemetryTracingBridge::new(p).with_filter(build_env_filter(logger_level, None)));
|
||||
|
||||
// Note: We do NOT create a separate `fmt_layer_opt` here; stdout behavior is driven by the provider.
|
||||
// No separate formatting layer is added here; when OTLP logging is
|
||||
// active, the OpenTelemetry bridge is the authoritative sink for
|
||||
// `tracing` events unless local file logging is needed as a fallback.
|
||||
}
|
||||
let span_events = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL };
|
||||
// ── Case 2: File Logging
|
||||
// Supplement: If log_directory is set and no OTLP log endpoint is configured, we enable file logging logic.
|
||||
// If a log directory is configured and OTLP log export is unavailable, use
|
||||
// the same rolling-file behavior as the local-only telemetry backend.
|
||||
if let Some(log_directory) = config.log_directory.as_deref().filter(|s| !s.is_empty())
|
||||
&& logger_provider.is_none()
|
||||
{
|
||||
@@ -204,10 +216,7 @@ pub(super) fn init_observability_http(
|
||||
.as_deref()
|
||||
.unwrap_or(DEFAULT_LOG_ROTATION_TIME)
|
||||
.to_lowercase();
|
||||
let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() {
|
||||
Some("prefix") => FileMatchMode::Prefix,
|
||||
_ => FileMatchMode::Suffix,
|
||||
};
|
||||
let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE));
|
||||
let rotation = match rotation_str.as_str() {
|
||||
"minutely" => Rotation::Minutely,
|
||||
"hourly" => Rotation::Hourly,
|
||||
@@ -241,7 +250,8 @@ pub(super) fn init_observability_http(
|
||||
.with_filter(build_env_filter(logger_level, None)),
|
||||
);
|
||||
|
||||
// Cleanup task
|
||||
// The cleanup task keeps rotated files bounded while the OTLP trace and
|
||||
// metric exporters continue to operate independently.
|
||||
cleanup_handle = Some(spawn_cleanup_task(config, log_directory, log_filename, keep_files));
|
||||
|
||||
info!(
|
||||
@@ -256,8 +266,9 @@ pub(super) fn init_observability_http(
|
||||
.map(|p| OpenTelemetryLayer::new(p.tracer(service_name.to_string())));
|
||||
let metrics_layer = meter_provider.as_ref().map(|p| MetricsLayer::new(p.clone()));
|
||||
|
||||
// Optional stdout mirror (matching init_file_logging_internal logic)
|
||||
// This is separate from OTLP stdout logic. If file logging is enabled, we honor its stdout rules.
|
||||
// Optional stdout mirror (matching `init_file_logging_internal` logic).
|
||||
// This is separate from OTLP stdout exporting; it only affects local human
|
||||
// readable output for the tracing subscriber.
|
||||
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production {
|
||||
let (stdout_nb, stdout_g) = tracing_appender::non_blocking(std::io::stdout());
|
||||
stdout_guard = Some(stdout_g);
|
||||
@@ -309,6 +320,8 @@ pub(super) fn init_observability_http(
|
||||
/// Build an optional [`SdkTracerProvider`] for the given trace endpoint.
|
||||
///
|
||||
/// Returns `None` when the endpoint is empty or trace export is disabled.
|
||||
/// When enabled, the provider is also registered as the global tracer provider
|
||||
/// and installs the W3C trace-context propagator.
|
||||
fn build_tracer_provider(
|
||||
trace_ep: &str,
|
||||
config: &OtelConfig,
|
||||
@@ -344,6 +357,10 @@ fn build_tracer_provider(
|
||||
Ok(Some(provider))
|
||||
}
|
||||
|
||||
/// Convert a configured sample ratio into the SDK sampler strategy.
|
||||
///
|
||||
/// Invalid or non-finite ratios fall back to `AlwaysOn` so telemetry does not
|
||||
/// disappear due to configuration mistakes.
|
||||
fn build_tracer_sampler(sample_ratio: f64) -> Sampler {
|
||||
if sample_ratio.is_finite() && (0.0..=1.0).contains(&sample_ratio) {
|
||||
Sampler::TraceIdRatioBased(sample_ratio)
|
||||
@@ -355,6 +372,8 @@ fn build_tracer_sampler(sample_ratio: f64) -> Sampler {
|
||||
/// Build an optional [`SdkMeterProvider`] for the given metrics endpoint.
|
||||
///
|
||||
/// Returns `None` when the endpoint is empty or metric export is disabled.
|
||||
/// The provider is paired with the crate's metrics recorder so `metrics` crate
|
||||
/// instruments flow into OpenTelemetry readers.
|
||||
fn build_meter_provider(
|
||||
metric_ep: &str,
|
||||
config: &OtelConfig,
|
||||
@@ -394,13 +413,14 @@ fn build_meter_provider(
|
||||
|
||||
global::set_meter_provider(provider.clone());
|
||||
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
|
||||
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
|
||||
set_observability_metric_enabled(true);
|
||||
Ok(Some(provider))
|
||||
}
|
||||
|
||||
/// Build an optional [`SdkLoggerProvider`] for the given log endpoint.
|
||||
///
|
||||
/// Returns `None` when the endpoint is empty or log export is disabled.
|
||||
/// The caller wraps the resulting provider in an OpenTelemetry tracing bridge.
|
||||
fn build_logger_provider(
|
||||
log_ep: &str,
|
||||
config: &OtelConfig,
|
||||
@@ -427,8 +447,10 @@ fn build_logger_provider(
|
||||
Ok(Some(builder.build()))
|
||||
}
|
||||
|
||||
/// Starts the Pyroscope continuous profiling agent if `ENV_OBS_PROFILING_ENDPOINT` is set.
|
||||
/// No-op and returns None on non-unix platforms.
|
||||
/// Start the Pyroscope continuous profiling agent when profiling is enabled.
|
||||
///
|
||||
/// Returns `None` on non-Unix platforms, when the feature is disabled, or when
|
||||
/// no usable profiling endpoint is configured.
|
||||
#[cfg(unix)]
|
||||
fn init_profiler(config: &OtelConfig) -> Option<pyroscope::PyroscopeAgent<pyroscope::pyroscope::PyroscopeAgentRunning>> {
|
||||
use pyroscope::backend::{BackendConfig, PprofConfig, pprof_backend};
|
||||
@@ -468,6 +490,9 @@ fn init_profiler(config: &OtelConfig) -> Option<pyroscope::PyroscopeAgent<pyrosc
|
||||
}
|
||||
|
||||
/// Create a stdout periodic metrics reader for the given interval.
|
||||
///
|
||||
/// This helper is primarily used for local development and diagnostics when
|
||||
/// operators want to see exported metric points without an OTLP collector.
|
||||
fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout::MetricExporter> {
|
||||
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
|
||||
.with_interval(Duration::from_secs(interval))
|
||||
@@ -479,6 +504,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
/// Valid ratios should produce trace-id-ratio sampling.
|
||||
fn test_build_tracer_sampler_uses_trace_ratio_for_valid_values() {
|
||||
let sampler = build_tracer_sampler(0.0);
|
||||
assert!(format!("{sampler:?}").contains("TraceIdRatioBased"));
|
||||
@@ -491,6 +517,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Invalid ratios should degrade to the safest non-dropping sampler.
|
||||
fn test_build_tracer_sampler_rejects_invalid_ratio_with_always_on() {
|
||||
let sampler = build_tracer_sampler(-0.1);
|
||||
assert!(format!("{sampler:?}").contains("AlwaysOn"));
|
||||
|
||||
@@ -19,7 +19,12 @@
|
||||
//! log files do not grow indefinitely by rotating them when they exceed a configured size.
|
||||
|
||||
use crate::cleaner::types::FileMatchMode;
|
||||
use crate::global::{
|
||||
METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES, METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS,
|
||||
METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL, METRIC_LOG_CLEANER_ROTATION_TOTAL,
|
||||
};
|
||||
use jiff::Zoned;
|
||||
use metrics::{counter, gauge, histogram};
|
||||
use std::fs::{self, File};
|
||||
use std::io::{self, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -186,6 +191,7 @@ impl RollingAppender {
|
||||
}
|
||||
|
||||
fn roll(&mut self) -> io::Result<()> {
|
||||
let rotate_started = std::time::Instant::now();
|
||||
// 1. Close current file first to ensure all buffers are flushed to OS (if any)
|
||||
// and handle released.
|
||||
if let Some(mut file) = self.file.take()
|
||||
@@ -250,6 +256,9 @@ impl RollingAppender {
|
||||
// This overrides whatever open_file() derived from mtime, ensuring
|
||||
// we stick to the logical rotation time.
|
||||
self.last_roll_ts = now.timestamp().as_second();
|
||||
counter!(METRIC_LOG_CLEANER_ROTATION_TOTAL).increment(1);
|
||||
histogram!(METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS).record(rotate_started.elapsed().as_secs_f64());
|
||||
gauge!(METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES).set(self.size as f64);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -281,6 +290,8 @@ impl RollingAppender {
|
||||
"RollingAppender: Failed to rotate log file after {} retries. Error: {:?}",
|
||||
MAX_RETRIES, last_error
|
||||
);
|
||||
counter!(METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL).increment(1);
|
||||
histogram!(METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS).record(rotate_started.elapsed().as_secs_f64());
|
||||
|
||||
// Attempt to re-open existing active file to allow continued writing
|
||||
self.open_file()?;
|
||||
@@ -314,6 +325,7 @@ impl Write for RollingAppender {
|
||||
if let Some(file) = &mut self.file {
|
||||
let n = file.write(buf)?;
|
||||
self.size += n as u64;
|
||||
gauge!(METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES).set(self.size as f64);
|
||||
Ok(n)
|
||||
} else {
|
||||
Err(io::Error::other("Failed to open log file"))
|
||||
|
||||
77
scripts/inspect_dashboard.sh
Normal file
77
scripts/inspect_dashboard.sh
Normal file
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Copyright 2024 RustFS Team
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
DASHBOARD_PATH=".docker/observability/grafana/dashboards/rustfs.json"
|
||||
|
||||
if ! command -v jq >/dev/null 2>&1; then
|
||||
echo "ERROR: jq is required by scripts/inspect_dashboard.sh" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ ! -f "$DASHBOARD_PATH" ]]; then
|
||||
echo "ERROR: dashboard file not found: $DASHBOARD_PATH" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
jq -r '
|
||||
def pystr:
|
||||
if . == null then "None" else tostring end;
|
||||
def pyrepr:
|
||||
if . == null then "None"
|
||||
elif type == "string" then
|
||||
"\u0027"
|
||||
+ (gsub("\\\\"; "\\\\\\\\")
|
||||
| gsub("\u0027"; "\\\\\u0027")
|
||||
| gsub("\n"; "\\\\n")
|
||||
| gsub("\r"; "\\\\r")
|
||||
| gsub("\t"; "\\\\t"))
|
||||
+ "\u0027"
|
||||
else tostring
|
||||
end;
|
||||
|
||||
"Total panels: \(.panels | length)",
|
||||
"Dashboard version: \(.version)",
|
||||
"",
|
||||
(
|
||||
.panels[]
|
||||
| [
|
||||
(.id // 0),
|
||||
(.type | pystr),
|
||||
(.gridPos.y | pystr),
|
||||
(.gridPos.h | pystr),
|
||||
(.gridPos.w | pystr),
|
||||
(.title | pyrepr)
|
||||
]
|
||||
| @tsv
|
||||
)
|
||||
' "$DASHBOARD_PATH" | {
|
||||
IFS= read -r total_line
|
||||
IFS= read -r version_line
|
||||
IFS= read -r blank_line
|
||||
|
||||
printf '%s\n' "$total_line"
|
||||
printf '%s\n' "$version_line"
|
||||
printf '%s\n' "$blank_line"
|
||||
|
||||
while IFS=$'\t' read -r pid ptype y h w title; do
|
||||
printf " id=%4d type=%-12s y=%-4s h=%-4s w=%-4s title=%s\n" \
|
||||
"$pid" "$ptype" "$y" "$h" "$w" "$title"
|
||||
done
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user