Merge remote-tracking branch 'origin/main' into feat/xlmeta-compat

# Conflicts:
#	crates/obs/src/telemetry/otel.rs
This commit is contained in:
weisd
2026-03-17 20:13:36 +08:00
21 changed files with 2918 additions and 434 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -23,6 +23,7 @@
#
# Manual Parameters:
# - build_docker: Build and push Docker images (default: true)
# - platforms: Comma-separated platform IDs or 'all' (default: all)
name: Build and Release
@@ -53,6 +54,11 @@ on:
required: false
default: true
type: boolean
platforms:
description: "Comma-separated targets or 'all' (e.g. linux-x86_64-musl,macos-aarch64)"
required: false
default: "all"
type: string
permissions:
contents: read
@@ -138,63 +144,70 @@ jobs:
echo " - Is prerelease: $is_prerelease"
# Build RustFS binaries
prepare-platform-matrix:
name: Prepare Platform Matrix
runs-on: ubicloud-standard-2
outputs:
matrix: ${{ steps.select.outputs.matrix }}
selected: ${{ steps.select.outputs.selected }}
steps:
- name: Select target platforms
id: select
shell: bash
run: |
set -euo pipefail
selected="${{ github.event_name == 'workflow_dispatch' && github.event.inputs.platforms || 'all' }}"
selected="$(echo "${selected}" | tr -d '[:space:]')"
if [[ -z "${selected}" ]]; then
selected="all"
fi
all='{"include":[
{"target_id":"linux-x86_64-musl","os":"ubicloud-standard-2","target":"x86_64-unknown-linux-musl","cross":false,"platform":"linux","rustflags":""},
{"target_id":"linux-aarch64-musl","os":"ubicloud-standard-2","target":"aarch64-unknown-linux-musl","cross":true,"platform":"linux","rustflags":""},
{"target_id":"linux-x86_64-gnu","os":"ubicloud-standard-2","target":"x86_64-unknown-linux-gnu","cross":false,"platform":"linux","rustflags":""},
{"target_id":"linux-aarch64-gnu","os":"ubicloud-standard-2","target":"aarch64-unknown-linux-gnu","cross":true,"platform":"linux","rustflags":""},
{"target_id":"macos-aarch64","os":"macos-latest","target":"aarch64-apple-darwin","cross":false,"platform":"macos","rustflags":""},
{"target_id":"macos-x86_64","os":"macos-latest","target":"x86_64-apple-darwin","cross":false,"platform":"macos","rustflags":""},
{"target_id":"windows-x86_64","os":"windows-latest","target":"x86_64-pc-windows-msvc","cross":false,"platform":"windows","rustflags":""}
]}'
if [[ "${selected}" == "all" ]]; then
matrix="$(jq -c . <<<"${all}")"
else
unknown="$(jq -rn --arg selected "${selected}" --argjson all "${all}" '
($selected | split(",") | map(select(length > 0))) as $req
| ($all.include | map(.target_id)) as $known
| [$req[] | select(( $known | index(.) ) == null)]
')"
if [[ "$(jq 'length' <<<"${unknown}")" -gt 0 ]]; then
echo "Unknown platforms: $(jq -r 'join(\",\")' <<<"${unknown}")" >&2
echo "Allowed: $(jq -r '.include[].target_id' <<<"${all}" | paste -sd ',' -)" >&2
exit 1
fi
matrix="$(jq -c --arg selected "${selected}" '
($selected | split(",") | map(select(length > 0))) as $req
| .include |= map(select(.target_id as $id | ($req | index($id))))
' <<<"${all}")"
fi
echo "selected=${selected}" >> "$GITHUB_OUTPUT"
echo "matrix=${matrix}" >> "$GITHUB_OUTPUT"
echo "Selected platforms: ${selected}"
build-rustfs:
name: Build RustFS
needs: [ build-check ]
if: needs.build-check.outputs.should_build == 'true'
needs: [ build-check, prepare-platform-matrix ]
if: needs.build-check.outputs.should_build == 'true' && needs.prepare-platform-matrix.result == 'success'
runs-on: ${{ matrix.os }}
timeout-minutes: 60
env:
RUSTFLAGS: ${{ matrix.rustflags }}
strategy:
fail-fast: false
matrix:
include:
# Linux builds
# Use x86-64-v2 (SSE4.2 baseline) instead of native to ensure distributed
# binaries run on older x86_64 CPUs (e.g. Intel Celeron/Atom, Synology NAS).
# See: https://github.com/rustfs/rustfs/issues/1838
- os: ubicloud-standard-2
target: x86_64-unknown-linux-musl
cross: false
platform: linux
rustflags: ''
- os: ubicloud-standard-2
target: aarch64-unknown-linux-musl
cross: true
platform: linux
rustflags: ''
- os: ubicloud-standard-2
target: x86_64-unknown-linux-gnu
cross: false
platform: linux
rustflags: ''
- os: ubicloud-standard-2
target: aarch64-unknown-linux-gnu
cross: true
platform: linux
rustflags: ''
# macOS builds
- os: macos-latest
target: aarch64-apple-darwin
cross: false
platform: macos
rustflags: ''
- os: macos-latest
target: x86_64-apple-darwin
cross: false
platform: macos
rustflags: ''
# Windows builds (temporarily disabled)
- os: windows-latest
target: x86_64-pc-windows-msvc
cross: false
platform: windows
rustflags: ''
#- os: windows-latest
# target: aarch64-pc-windows-msvc
# cross: true
# platform: windows
matrix: ${{ fromJson(needs.prepare-platform-matrix.outputs.matrix) }}
steps:
- name: Checkout repository
uses: actions/checkout@v6

10
Cargo.lock generated
View File

@@ -5108,9 +5108,9 @@ dependencies = [
[[package]]
name = "lz4_flex"
version = "0.12.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e"
checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746"
dependencies = [
"twox-hash",
]
@@ -7853,10 +7853,14 @@ dependencies = [
name = "rustfs-obs"
version = "0.0.5"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"flate2",
"glob",
"jiff",
"metrics",
"num_cpus",
"nvml-wrapper",
"opentelemetry",
"opentelemetry-appender-tracing",
@@ -7868,6 +7872,7 @@ dependencies = [
"rustfs-config",
"rustfs-utils",
"serde",
"serde_json",
"sysinfo",
"temp-env",
"tempfile",
@@ -7878,6 +7883,7 @@ dependencies = [
"tracing-error",
"tracing-opentelemetry",
"tracing-subscriber",
"zstd",
]
[[package]]

View File

@@ -199,6 +199,9 @@ const-str = { version = "1.1.0", features = ["std", "proc"] }
convert_case = "0.11.0"
criterion = { version = "0.8", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
crossbeam-channel = "0.5.15"
crossbeam-deque = "0.8.6"
crossbeam-utils = "0.8.21"
datafusion = "52.3.0"
derive_builder = "0.20.2"
enumset = "1.1.10"

View File

@@ -48,6 +48,12 @@ pub const ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_TOTAL_SIZ
pub const ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES";
pub const ENV_OBS_LOG_COMPRESS_OLD_FILES: &str = "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES";
pub const ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL";
pub const ENV_OBS_LOG_COMPRESSION_ALGORITHM: &str = "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM";
pub const ENV_OBS_LOG_PARALLEL_COMPRESS: &str = "RUSTFS_OBS_LOG_PARALLEL_COMPRESS";
pub const ENV_OBS_LOG_PARALLEL_WORKERS: &str = "RUSTFS_OBS_LOG_PARALLEL_WORKERS";
pub const ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL";
pub const ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: &str = "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP";
pub const ENV_OBS_LOG_ZSTD_WORKERS: &str = "RUSTFS_OBS_LOG_ZSTD_WORKERS";
pub const ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: &str = "RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS";
pub const ENV_OBS_LOG_EXCLUDE_PATTERNS: &str = "RUSTFS_OBS_LOG_EXCLUDE_PATTERNS";
pub const ENV_OBS_LOG_DELETE_EMPTY_FILES: &str = "RUSTFS_OBS_LOG_DELETE_EMPTY_FILES";
@@ -61,13 +67,24 @@ pub const DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES: u64 = 2 * 1024 * 1024 * 1024; //
pub const DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: u64 = 0; // No single file limit
pub const DEFAULT_OBS_LOG_COMPRESS_OLD_FILES: bool = true;
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL: u32 = 6;
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP: &str = "gzip";
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD: &str = "zstd";
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM: &str = DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD;
pub const DEFAULT_OBS_LOG_PARALLEL_COMPRESS: bool = true;
pub const DEFAULT_OBS_LOG_PARALLEL_WORKERS: usize = 6;
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL: i32 = 8;
pub const DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: bool = true;
pub const DEFAULT_OBS_LOG_ZSTD_WORKERS: usize = 1;
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION: &str = "gz";
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION);
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION: &str = "zst";
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION);
pub const DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: u64 = 30; // Retain compressed files for 30 days
pub const DEFAULT_OBS_LOG_DELETE_EMPTY_FILES: bool = true;
pub const DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS: u64 = 3600; // 1 hour
pub const DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS: u64 = 1800; // 0.5 hours
pub const DEFAULT_OBS_LOG_DRY_RUN: bool = false;
pub const DEFAULT_OBS_LOG_MATCH_MODE_PREFIX: &str = "prefix";
pub const DEFAULT_OBS_LOG_MATCH_MODE: &str = "suffix";
/// Default values for observability configuration
@@ -113,6 +130,12 @@ mod tests {
assert_eq!(ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES");
assert_eq!(ENV_OBS_LOG_COMPRESS_OLD_FILES, "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES");
assert_eq!(ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL");
assert_eq!(ENV_OBS_LOG_COMPRESSION_ALGORITHM, "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM");
assert_eq!(ENV_OBS_LOG_PARALLEL_COMPRESS, "RUSTFS_OBS_LOG_PARALLEL_COMPRESS");
assert_eq!(ENV_OBS_LOG_PARALLEL_WORKERS, "RUSTFS_OBS_LOG_PARALLEL_WORKERS");
assert_eq!(ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL");
assert_eq!(ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP");
assert_eq!(ENV_OBS_LOG_ZSTD_WORKERS, "RUSTFS_OBS_LOG_ZSTD_WORKERS");
assert_eq!(
ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
"RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS"
@@ -131,6 +154,10 @@ mod tests {
assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_STAGING, "staging");
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP, "gzip");
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, "zstd");
assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX, "prefix");
assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE, "suffix");
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, "zstd");
}
}

View File

@@ -20,8 +20,8 @@
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition, TransitionStorageClass,
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, LifecycleRuleAndOperator,
NoncurrentVersionTransition, ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition, TransitionStorageClass,
};
use std::cmp::Ordering;
use std::collections::HashMap;
@@ -158,6 +158,25 @@ impl RuleValidate for LifecycleRule {
}
}
fn lifecycle_rule_prefix(rule: &LifecycleRule) -> Option<&str> {
// Prefer a non-empty legacy prefix; treat an empty legacy prefix as if it were not set
if let Some(p) = rule.prefix.as_deref() {
if !p.is_empty() {
return Some(p);
}
}
let Some(filter) = rule.filter.as_ref() else {
return None;
};
if let Some(p) = filter.prefix.as_deref() {
return Some(p);
}
filter.and.as_ref().and_then(|and| and.prefix.as_deref())
}
#[async_trait::async_trait]
pub trait Lifecycle {
async fn has_transition(&self) -> bool;
@@ -201,8 +220,11 @@ impl Lifecycle for BucketLifecycleConfiguration {
continue;
}
let rule_prefix = &rule.prefix.clone().unwrap_or_default();
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
let rule_prefix = lifecycle_rule_prefix(rule).unwrap_or("");
if !prefix.is_empty()
&& !rule_prefix.is_empty()
&& !prefix.starts_with(rule_prefix)
&& !rule_prefix.starts_with(prefix)
{
continue;
}
@@ -321,8 +343,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
if rule.status.as_str() == ExpirationStatus::DISABLED {
continue;
}
if let Some(prefix) = rule.prefix.clone() {
if !obj.name.starts_with(prefix.as_str()) {
if let Some(rule_prefix) = lifecycle_rule_prefix(rule) {
if !obj.name.starts_with(rule_prefix) {
continue;
}
}
@@ -438,55 +460,22 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(ref lc_rules) = self.filter_rules(obj).await {
for rule in lc_rules.iter() {
if obj.expired_object_deletemarker() {
if obj.is_latest && obj.expired_object_deletemarker() {
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(mod_time, days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
}
}
}
if obj.is_latest {
if let Some(ref expiration) = rule.expiration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
if expiration.expired_object_delete_marker.is_some_and(|v| v) {
if let Some(due) = expiration.next_due(obj) {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
// Stop after scheduling an expired delete-marker event.
break;
}
continue;
}
}
}
@@ -739,8 +728,16 @@ impl LifecycleCalculate for LifecycleExpiration {
if !obj.is_latest || !obj.delete_marker {
return None;
}
// Check date first (date-based expiration takes priority over days).
// A zero unix timestamp means "not set" (default value) and is skipped.
if let Some(ref date) = self.date {
let expiry_date = OffsetDateTime::from(date.clone());
if expiry_date.unix_timestamp() != 0 {
return Some(expiry_date);
}
}
match self.days {
Some(days) => Some(expected_expiry_time(obj.mod_time.unwrap(), days)),
Some(days) => obj.mod_time.map(|mod_time| expected_expiry_time(mod_time, days)),
None => None,
}
}
@@ -905,6 +902,7 @@ impl Default for TransitionOptions {
#[cfg(test)]
mod tests {
use super::*;
use s3s::dto::LifecycleRuleFilter;
#[tokio::test]
async fn validate_rejects_non_positive_expiration_days() {
@@ -1135,4 +1133,208 @@ mod tests {
assert_eq!(err.to_string(), ERR_LIFECYCLE_INVALID_RULE_STATUS);
}
#[tokio::test]
async fn filter_rules_respects_filter_prefix() {
let mut filter = LifecycleRuleFilter::default();
filter.prefix = Some("prefix".to_string());
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(30),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: Some(filter),
id: Some("rule".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let match_obj = ObjectOpts {
name: "prefix/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let matched = lc.filter_rules(&match_obj).await.unwrap();
assert_eq!(matched.len(), 1);
let non_match_obj = ObjectOpts {
name: "other/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let not_matched = lc.filter_rules(&non_match_obj).await.unwrap();
assert_eq!(not_matched.len(), 0);
}
#[tokio::test]
async fn filter_rules_respects_filter_and_prefix() {
let mut filter = LifecycleRuleFilter::default();
let mut and = LifecycleRuleAndOperator::default();
and.prefix = Some("prefix".to_string());
filter.and = Some(and);
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(30),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: Some(filter),
id: Some("rule-and-prefix".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let match_obj = ObjectOpts {
name: "prefix/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let matched = lc.filter_rules(&match_obj).await.unwrap();
assert_eq!(matched.len(), 1);
let non_match_obj = ObjectOpts {
name: "other/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let not_matched = lc.filter_rules(&non_match_obj).await.unwrap();
assert_eq!(not_matched.len(), 0);
}
#[tokio::test]
async fn expired_object_delete_marker_requires_single_version() {
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(1),
expired_object_delete_marker: Some(true),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: None,
id: Some("rule-expired-del-marker".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let opts = ObjectOpts {
name: "obj".to_string(),
mod_time: Some(base_time),
is_latest: true,
delete_marker: true,
num_versions: 2,
version_id: Some(Uuid::new_v4()),
..Default::default()
};
let now = base_time + Duration::days(2);
let event = lc.eval_inner(&opts, now, 0).await;
assert_eq!(event.action, IlmAction::NoneAction);
}
#[tokio::test]
async fn expired_object_delete_marker_deletes_only_delete_marker_after_due() {
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(1),
expired_object_delete_marker: Some(true),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: None,
id: Some("rule-expired-del-marker".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let opts = ObjectOpts {
name: "obj".to_string(),
mod_time: Some(base_time),
is_latest: true,
delete_marker: true,
num_versions: 1,
version_id: Some(Uuid::new_v4()),
..Default::default()
};
let now = base_time + Duration::days(2);
let event = lc.eval_inner(&opts, now, 0).await;
assert_eq!(event.action, IlmAction::DeleteVersionAction);
assert_eq!(event.due, Some(expected_expiry_time(base_time, 1)));
}
#[tokio::test]
async fn expired_object_delete_marker_date_based_not_yet_due() {
// A date-based rule that has not yet reached its expiry date must not
// trigger immediate deletion (unwrap_or(now) must not override the date).
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
let future_date = base_time + Duration::days(10);
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
date: Some(future_date.into()),
expired_object_delete_marker: Some(true),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: None,
id: Some("rule-date-del-marker".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let opts = ObjectOpts {
name: "obj".to_string(),
mod_time: Some(base_time),
is_latest: true,
delete_marker: true,
num_versions: 1,
version_id: Some(Uuid::new_v4()),
..Default::default()
};
// now is before the configured date — must not schedule deletion
let now_before = base_time + Duration::days(5);
let event_before = lc.eval_inner(&opts, now_before, 0).await;
assert_eq!(event_before.action, IlmAction::NoneAction);
// now is after the configured date — must schedule deletion
let now_after = base_time + Duration::days(11);
let event_after = lc.eval_inner(&opts, now_after, 0).await;
assert_eq!(event_after.action, IlmAction::DeleteVersionAction);
assert_eq!(event_after.due, Some(future_date));
}
}

View File

@@ -40,6 +40,10 @@ flate2 = { workspace = true }
glob = { workspace = true }
jiff = { workspace = true }
metrics = { workspace = true }
crossbeam-channel = { workspace = true }
crossbeam-deque = { workspace = true }
crossbeam-utils = { workspace = true }
num_cpus = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
@@ -56,6 +60,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
sysinfo = { workspace = true }
thiserror = { workspace = true }
zstd = { workspace = true, features = ["zstdmt"] }
[target.'cfg(unix)'.dependencies]
pyroscope = { workspace = true, features = ["backend-pprof-rs"] }
@@ -65,3 +70,4 @@ pyroscope = { workspace = true, features = ["backend-pprof-rs"] }
tokio = { workspace = true, features = ["full"] }
tempfile = { workspace = true }
temp-env = { workspace = true }
serde_json = { workspace = true }

View File

@@ -14,7 +14,7 @@ logging, distributed tracing, metrics via OpenTelemetry, and continuous profilin
| **Distributed tracing** | OTLP/HTTP export to Jaeger, Tempo, or any OTel collector |
| **Metrics** | OTLP/HTTP export, bridged from the `metrics` crate facade |
| **Continuous Profiling** | CPU/Memory profiling export to Pyroscope |
| **Log cleanup** | Background task: size limits, gzip compression, retention policies |
| **Log cleanup** | Background task: size limits, zstd/gzip compression, retention policies |
| **GPU metrics** *(optional)* | Enable with the `gpu` feature flag |
---
@@ -148,9 +148,15 @@ All configuration is read from environment variables at startup.
|-------------------------------------------------|--------------|-------------------------------------------------------------|
| `RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES` | `2147483648` | Hard cap on total log directory size (2 GiB) |
| `RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES` | `0` | Per-file size cap; `0` = unlimited |
| `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | `true` | Gzip-compress files before deleting |
| `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | `true` | Compress files before deleting |
| `RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL` | `6` | Gzip level `1` (fastest) `9` (best) |
| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | `30` | Delete `.gz` archives older than N days; `0` = keep forever |
| `RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM` | `zstd` | Compression codec: `zstd` or `gzip` |
| `RUSTFS_OBS_LOG_PARALLEL_COMPRESS` | `true` | Enable work-stealing parallel compression |
| `RUSTFS_OBS_LOG_PARALLEL_WORKERS` | `6` | Number of cleaner worker threads |
| `RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL` | `8` | Zstd level `1` (fastest) `21` (best ratio) |
| `RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP` | `true` | Fallback to gzip when zstd compression fails |
| `RUSTFS_OBS_LOG_ZSTD_WORKERS` | `1` | zstdmt worker threads per compression task |
| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | `30` | Delete `.gz` / `.zst` archives older than N days; `0` = keep forever |
| `RUSTFS_OBS_LOG_EXCLUDE_PATTERNS` | _(empty)_ | Comma-separated glob patterns to never clean up |
| `RUSTFS_OBS_LOG_DELETE_EMPTY_FILES` | `true` | Remove zero-byte files |
| `RUSTFS_OBS_LOG_MIN_FILE_AGE_SECONDS` | `3600` | Minimum file age (seconds) before cleanup |
@@ -159,6 +165,159 @@ All configuration is read from environment variables at startup.
---
## Cleaner & Rotation Metrics
The log rotation and cleanup pipeline emits these metrics (via the `metrics` facade):
| Metric | Type | Description |
|---|---|---|
| `rustfs.log_cleaner.deleted_files_total` | counter | Number of files deleted per cleanup pass |
| `rustfs.log_cleaner.freed_bytes_total` | counter | Bytes reclaimed by deletion |
| `rustfs.log_cleaner.compress_duration_seconds` | histogram | Compression stage duration |
| `rustfs.log_cleaner.steal_success_rate` | gauge | Work-stealing success ratio in parallel mode |
| `rustfs.log_cleaner.runs_total` | counter | Successful cleanup loop runs |
| `rustfs.log_cleaner.run_failures_total` | counter | Failed or panicked cleanup loop runs |
| `rustfs.log_cleaner.rotation_total` | counter | Successful file rotations |
| `rustfs.log_cleaner.rotation_failures_total` | counter | Failed file rotations |
| `rustfs.log_cleaner.rotation_duration_seconds` | histogram | Rotation latency |
| `rustfs.log_cleaner.active_file_size_bytes` | gauge | Current active log file size |
These metrics cover compression, cleanup, and file rotation end-to-end.
### Metric Semantics
- `deleted_files_total` and `freed_bytes_total` are emitted after each cleanup pass and include both normal log cleanup and expired compressed archive cleanup.
- `compress_duration_seconds` measures compression stage wall-clock time for both serial and parallel modes.
- `steal_success_rate` is updated by the parallel work-stealing path and remains at the last computed value.
- `rotation_*` metrics are emitted by `RollingAppender` and include retries; a failed final rotation increments `rotation_failures_total`.
- `active_file_size_bytes` is sampled on writes and after successful roll, so dashboards can track current active file growth.
### Grafana Dashboard JSON Draft (Ready to Import)
> Save this as `rustfs-log-cleaner-dashboard.json`, then import from Grafana UI.
> For Prometheus datasources, metric names are usually normalized to underscores,
> so `rustfs.log_cleaner.deleted_files_total` becomes `rustfs_log_cleaner_deleted_files_total`.
>
> The same panels are now checked in at:
> `.docker/observability/grafana/dashboards/rustfs.json`
> (row title: `Log Cleaner`).
```json
{
"uid": "rustfs-log-cleaner",
"title": "RustFS Log Cleaner",
"timezone": "browser",
"schemaVersion": 39,
"version": 1,
"refresh": "10s",
"tags": ["rustfs", "observability", "log-cleaner"],
"time": {
"from": "now-6h",
"to": "now"
},
"panels": [
{
"id": 1,
"title": "Cleanup Runs / Failures",
"type": "timeseries",
"targets": [
{ "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_runs_total[5m]))", "legendFormat": "runs/s" },
{ "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_run_failures_total[5m]))", "legendFormat": "failures/s" }
],
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }
},
{
"id": 2,
"title": "Freed Bytes / Deleted Files",
"type": "timeseries",
"targets": [
{ "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_freed_bytes_total[15m]))", "legendFormat": "bytes/s" },
{ "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_deleted_files_total[15m]))", "legendFormat": "files/s" }
],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }
},
{
"id": 3,
"title": "Compression P95 Latency",
"type": "timeseries",
"targets": [
{
"refId": "A",
"expr": "histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "p95"
}
],
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }
},
{
"id": 4,
"title": "Rotation Success / Failure",
"type": "timeseries",
"targets": [
{ "refId": "A", "expr": "sum(rate(rustfs_log_cleaner_rotation_total[5m]))", "legendFormat": "rotation/s" },
{ "refId": "B", "expr": "sum(rate(rustfs_log_cleaner_rotation_failures_total[5m]))", "legendFormat": "rotation_failures/s" }
],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }
},
{
"id": 5,
"title": "Steal Success Rate",
"type": "timeseries",
"targets": [
{ "refId": "A", "expr": "max(rustfs_log_cleaner_steal_success_rate)", "legendFormat": "ratio" }
],
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 16 }
},
{
"id": 6,
"title": "Active File Size",
"type": "timeseries",
"targets": [
{ "refId": "A", "expr": "max(rustfs_log_cleaner_active_file_size_bytes)", "legendFormat": "bytes" }
],
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 16 }
}
]
}
```
### PromQL Templates
Use these templates directly in Grafana panels/alerts.
- **Cleanup run rate**
- `sum(rate(rustfs_log_cleaner_runs_total[$__rate_interval]))`
- **Cleanup failure rate**
- `sum(rate(rustfs_log_cleaner_run_failures_total[$__rate_interval]))`
- **Cleanup failure ratio**
- `sum(rate(rustfs_log_cleaner_run_failures_total[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_runs_total[$__rate_interval])), 1e-9)`
- **Freed bytes throughput**
- `sum(rate(rustfs_log_cleaner_freed_bytes_total[$__rate_interval]))`
- **Deleted files throughput**
- `sum(rate(rustfs_log_cleaner_deleted_files_total[$__rate_interval]))`
- **Compression p95 latency**
- `histogram_quantile(0.95, sum(rate(rustfs_log_cleaner_compress_duration_seconds_bucket[$__rate_interval])) by (le))`
- **Rotation failure ratio**
- `sum(rate(rustfs_log_cleaner_rotation_failures_total[$__rate_interval])) / clamp_min(sum(rate(rustfs_log_cleaner_rotation_total[$__rate_interval])), 1e-9)`
- **Work-stealing efficiency (latest)**
- `max(rustfs_log_cleaner_steal_success_rate)`
- **Active file size (latest)**
- `max(rustfs_log_cleaner_active_file_size_bytes)`
### Suggested Alerts
- **CleanupFailureRatioHigh**: failure ratio > 0.05 for 10m.
- **CompressionLatencyP95High**: p95 above your baseline SLO for 15m.
- **RotationFailuresDetected**: rotation failure rate > 0 for 3 consecutive windows.
- **NoCleanupActivity**: runs rate == 0 for expected active environments.
### Metrics Compatibility
The project is currently in active development. Metric names and labels are updated directly when architecture evolves, and no backward-compatibility shim is maintained for old names.
Use the metric names documented in this README as the current source of truth.
---
## Examples
### Stdout-only (development default)
@@ -207,6 +366,19 @@ export RUSTFS_OBS_LOG_DRY_RUN=true
# Observe log output — no files will actually be deleted.
```
### Parallel zstd cleanup (recommended production profile)
```bash
export RUSTFS_OBS_LOG_DIRECTORY=/var/log/rustfs
export RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM=zstd
export RUSTFS_OBS_LOG_PARALLEL_COMPRESS=true
export RUSTFS_OBS_LOG_PARALLEL_WORKERS=6
export RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL=8
export RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP=true
export RUSTFS_OBS_LOG_ZSTD_WORKERS=1
./rustfs
```
---
## Module Structure
@@ -229,9 +401,9 @@ rustfs-obs/src/
├── cleaner/ # Background log-file cleanup subsystem
│ ├── mod.rs # LogCleaner public API + tests
│ ├── types.rs # FileInfo shared type
│ ├── types.rs # Shared cleaner types (match mode, compression codec, FileInfo)
│ ├── scanner.rs # Filesystem discovery
│ ├── compress.rs # Gzip compression helper
│ ├── compress.rs # Gzip/Zstd compression helper
│ └── core.rs # Selection, compression, deletion logic
└── system/ # Host metrics (CPU, memory, disk, GPU)

View File

@@ -1,71 +1,130 @@
# Log Cleaner Subsystem
The `cleaner` module provides a robust, background log-file lifecycle manager for RustFS. It is designed to run periodically to enforce retention policies, compress old logs, and prevent disk exhaustion.
The `cleaner` module is a production-focused background lifecycle manager for RustFS log archives.
It periodically discovers rolled files, applies retention constraints, compresses candidates, and then deletes sources safely.
## Architecture
The subsystem is designed to be conservative by default:
The cleaner operates as a pipeline:
- it never touches the currently active log file;
- it refuses symlink deletion during the destructive phase;
- it keeps compression and source deletion as separate steps;
- it supports a full dry-run mode for policy verification.
1. **Discovery (`scanner.rs`)**: Scans the configured log directory for eligible files.
* **Non-recursive**: Only scans the top-level directory for safety.
* **Filtering**: Ignores the currently active log file, files matching exclude patterns, and files that do not match the configured prefix/suffix pattern.
* **Performance**: Uses `std::fs::read_dir` directly to minimize overhead and syscalls.
## Execution Pipeline
2. **Selection (`core.rs`)**: Applies retention policies to select files for deletion.
* **Keep Count**: Ensures at least `N` recent files are kept.
* **Total Size**: Deletes oldest files if the total size exceeds the limit.
* **Single File Size**: Deletes individual files that exceed a size limit (e.g., runaway logs).
1. **Discovery (`scanner.rs`)**
- Performs a shallow `read_dir` scan (no recursion) for predictable latency.
- Excludes the active log file, exclusion-pattern matches, and files younger than the age threshold.
- Classifies regular logs and compressed archives (`.gz` / `.zst`) in one pass.
3. **Action (`core.rs` / `compress.rs`)**:
* **Compression**: Optionally compresses selected files using Gzip (level 1-9) before deletion.
* **Deletion**: Removes the original file (and eventually the compressed archive based on retention days).
2. **Selection (`core.rs`)**
- Enforces keep-count first.
- Applies total-size and single-file-size constraints to oldest files.
- Produces an ordered list of files to process.
## Configuration
3. **Compression + Deletion (`core.rs` + `compress.rs`)**
- Supports `zstd` and `gzip` codecs.
- Uses parallel work stealing when enabled (`Injector + Worker::new_fifo + Stealer`).
- Always deletes source files in a serial pass after compression to minimize file-lock race issues.
The cleaner is configured via `LogCleanerBuilder`. When initialized via `rustfs-obs::init_obs`, it reads from environment variables.
4. **Archive Expiry (`core.rs`)**
- Applies a separate retention window to already-compressed files.
- Keeps archive expiration independent from plain-log keep-count logic.
| Parameter | Env Var | Description |
|-----------|---------|-------------|
| `log_dir` | `RUSTFS_OBS_LOG_DIRECTORY` | The directory to scan. |
| `file_pattern` | `RUSTFS_OBS_LOG_FILENAME` | The base filename pattern (e.g., `rustfs.log`). |
| `active_filename` | (Derived) | The exact name of the currently active log file, excluded from cleanup. |
| `match_mode` | `RUSTFS_OBS_LOG_MATCH_MODE` | `prefix` or `suffix`. Determines how `file_pattern` is matched against filenames. |
| `keep_files` | `RUSTFS_OBS_LOG_KEEP_FILES` | Minimum number of rolling log files to keep. |
| `max_total_size_bytes` | `RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES` | Maximum aggregate size of all log files. Oldest files are deleted to satisfy this. |
| `compress_old_files` | `RUSTFS_OBS_LOG_COMPRESS_OLD_FILES` | If `true`, files selected for removal are first gzipped. |
| `compressed_file_retention_days` | `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | Age in days after which `.gz` files are deleted. |
## Compression Modes
## Timestamp Format & Rotation
- **Primary codec**: `zstd` (default) for better ratio and faster decompression.
- **Fallback codec**: `gzip` when zstd fallback is enabled.
- **Dry-run**: reports planned compression/deletion operations without touching filesystem state.
The cleaner works in tandem with the `RollingAppender` in `telemetry/rolling.rs`.
## Safety Model
* **Rotation**: Logs are rotated based on time (Daily/Hourly/Minutely) or Size.
* **Naming**: Archived logs use a high-precision timestamp format: `YYYYMMDDHHMMSS.uuuuuu` (microseconds), plus a unique counter to prevent collisions.
* **Suffix Mode**: `<timestamp>-<counter>.<filename>` (e.g., `20231027103001.123456-0.rustfs.log`)
* **Prefix Mode**: `<filename>.<timestamp>-<counter>` (e.g., `rustfs.log.20231027103001.123456-0`)
- **No recursive traversal**: the scanner only inspects the immediate log directory.
- **No symlink following**: filesystem metadata is collected with `symlink_metadata`.
- **Idempotent archives**: an existing `*.gz` or `*.zst` target means the file is treated as already compressed.
- **Best-effort cleanup**: individual file failures are logged and do not abort the whole maintenance pass.
This high-precision naming ensures that files sort chronologically by name, and collisions are virtually impossible even under high load.
## Work-Stealing Strategy
## Usage Example
The parallel path in `core.rs` uses this fixed lookup sequence per worker:
1. `local_worker.pop()`
2. `injector.steal_batch_and_pop(&local_worker)`
3. randomized victim polling via `Steal::from_iter(...)`
This strategy keeps local cache affinity while still balancing stragglers.
## Metrics and Tracing
The cleaner emits tracing events and runtime metrics:
- `rustfs.log_cleaner.deleted_files_total` (counter)
- `rustfs.log_cleaner.freed_bytes_total` (counter)
- `rustfs.log_cleaner.compress_duration_seconds` (histogram)
- `rustfs.log_cleaner.steal_success_rate` (gauge)
- `rustfs.log_cleaner.rotation_total` (counter)
- `rustfs.log_cleaner.rotation_failures_total` (counter)
- `rustfs.log_cleaner.rotation_duration_seconds` (histogram)
- `rustfs.log_cleaner.active_file_size_bytes` (gauge)
These values can be wired into dashboards and alert rules for cleanup health.
## Retention Decision Order
For regular logs, the cleaner evaluates candidates in this order:
1. keep at least `keep_files` newest matching generations;
2. remove older files if total retained size still exceeds `max_total_size_bytes`;
3. remove any file whose individual size exceeds `max_single_file_size_bytes`;
4. if compression is enabled, archive before deletion;
5. delete the original file only after successful compression.
## Key Environment Variables
| Env Var | Meaning |
|---|---|
| `RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM` | `zstd` or `gzip` |
| `RUSTFS_OBS_LOG_PARALLEL_COMPRESS` | Enable work-stealing compression |
| `RUSTFS_OBS_LOG_PARALLEL_WORKERS` | Worker count for parallel compressor |
| `RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL` | Zstd level (1-21) |
| `RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP` | Fallback switch on zstd failure |
| `RUSTFS_OBS_LOG_ZSTD_WORKERS` | zstdmt worker threads per compression task |
| `RUSTFS_OBS_LOG_DRY_RUN` | Dry-run mode |
| `RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS` | Retention window for `*.gz` / `*.zst` archives |
| `RUSTFS_OBS_LOG_DELETE_EMPTY_FILES` | Remove zero-byte regular log files during scanning |
| `RUSTFS_OBS_LOG_MIN_FILE_AGE_SECONDS` | Minimum age for regular log eligibility |
## Builder Example
```rust
use rustfs_obs::LogCleaner;
use rustfs_obs::types::FileMatchMode;
use rustfs_obs::types::{CompressionAlgorithm, FileMatchMode};
use std::path::PathBuf;
let cleaner = LogCleaner::builder(
PathBuf::from("/var/log/rustfs"),
"rustfs.log.".to_string(),
"rustfs.log".to_string(),
"rustfs.log".to_string(),
)
.match_mode(FileMatchMode::Prefix)
.keep_files(10)
.max_total_size_bytes(1024 * 1024 * 100) // 100 MB
.match_mode(FileMatchMode::Suffix)
.keep_files(30)
.max_total_size_bytes(2 * 1024 * 1024 * 1024)
.compress_old_files(true)
.compression_algorithm(CompressionAlgorithm::Zstd)
.parallel_compress(true)
.parallel_workers(6)
.zstd_compression_level(8)
.zstd_fallback_to_gzip(true)
.zstd_workers(1)
.dry_run(false)
.build();
// Run cleanup (blocking operation, spawn in a background task)
if let Ok((deleted, freed)) = cleaner.cleanup() {
println!("Cleaned up {} files, freed {} bytes", deleted, freed);
}
let _ = cleaner.cleanup();
```
## Operational Notes
- Prefer `FileMatchMode::Suffix` when rotations prepend timestamps to the filename.
- Prefer `FileMatchMode::Prefix` when rotations append counters or timestamps after a stable base name.
- Keep `parallel_workers` modest when `zstd_workers` is greater than `1`, because each compression task may already use internal codec threads.

View File

@@ -12,66 +12,205 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Gzip compression helper for old log files.
//! Compression helpers for old log files.
//!
//! Files are compressed in place: `<name>` → `<name>.gz`. The original file
//! is **not** deleted here — deletion is handled by the caller after
//! compression succeeds.
//! This module performs compression only. Source-file deletion is intentionally
//! handled by the caller in a separate step to avoid platform-specific file
//! locking issues (especially on Windows).
//!
//! The separation is important for operational safety: a failed archive write
//! must never result in premature source deletion, and an already-existing
//! archive should make repeated cleanup passes idempotent.
use super::types::CompressionAlgorithm;
use flate2::Compression;
use flate2::write::GzEncoder;
use rustfs_config::observability::DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION;
use std::ffi::OsString;
use std::fs::File;
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
use tracing::{debug, info};
use std::path::{Path, PathBuf};
use tracing::{debug, info, warn};
/// Compress `path` to `<path>.gz` using gzip.
/// Compression options shared by serial and parallel cleaner paths.
///
/// If a `.gz` file for the given path already exists the function returns
/// `Ok(())` immediately without overwriting the existing archive.
///
/// # Arguments
/// * `path` - Path to the uncompressed log file.
/// * `level` - Gzip compression level (`1``9`); clamped automatically.
/// * `dry_run` - When `true`, log what would be done without writing anything.
///
/// # Errors
/// Propagates any I/O error encountered while opening, reading, writing, or
/// flushing files.
pub(super) fn compress_file(path: &Path, level: u32, dry_run: bool) -> Result<(), std::io::Error> {
let gz_path = path.with_extension(DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION);
/// The core cleaner prepares this immutable bundle once per cleanup pass and
/// then hands it to each worker so all files in that run use the same policy.
#[derive(Debug, Clone)]
pub(super) struct CompressionOptions {
/// Preferred compression codec for the current cleanup pass.
pub algorithm: CompressionAlgorithm,
/// Gzip level (1..=9).
pub gzip_level: u32,
/// Zstd level (1..=21).
pub zstd_level: i32,
/// Internal zstd worker threads used by zstdmt.
pub zstd_workers: usize,
/// If true, fallback to gzip when zstd encoding fails.
pub zstd_fallback_to_gzip: bool,
/// Dry-run mode reports planned actions without writing files.
pub dry_run: bool,
}
if gz_path.exists() {
debug!("Compressed file already exists, skipping: {:?}", gz_path);
return Ok(());
/// Compression output metadata used for metrics and deletion gating.
///
/// Callers inspect the output size and archive path before deciding whether it
/// is safe to remove the source log file.
#[derive(Debug, Clone)]
pub(super) struct CompressionOutput {
/// Final path of the compressed archive file.
pub archive_path: PathBuf,
/// Codec actually used to produce the output.
pub algorithm_used: CompressionAlgorithm,
/// Input bytes before compression.
pub input_bytes: u64,
/// Compressed output bytes on disk.
pub output_bytes: u64,
}
/// Compress a single source file with the requested codec and fallback policy.
///
/// This function centralizes the fallback behavior so the orchestration layer
/// does not need per-codec branching.
pub(super) fn compress_file(path: &Path, options: &CompressionOptions) -> Result<CompressionOutput, std::io::Error> {
match options.algorithm {
CompressionAlgorithm::Gzip => compress_gzip(path, options.gzip_level, options.dry_run),
CompressionAlgorithm::Zstd => match compress_zstd(path, options.zstd_level, options.zstd_workers, options.dry_run) {
Ok(output) => Ok(output),
Err(err) if options.zstd_fallback_to_gzip => {
warn!(
file = ?path,
error = %err,
"zstd compression failed, fallback to gzip"
);
compress_gzip(path, options.gzip_level, options.dry_run)
}
Err(err) => Err(err),
},
}
}
/// Compress a file to `*.gz` using the configured gzip level.
fn compress_gzip(path: &Path, level: u32, dry_run: bool) -> Result<CompressionOutput, std::io::Error> {
let archive_path = archive_path(path, CompressionAlgorithm::Gzip);
compress_with_writer(path, &archive_path, dry_run, CompressionAlgorithm::Gzip, |reader, writer| {
let mut encoder = GzEncoder::new(writer, Compression::new(level.clamp(1, 9)));
let written = std::io::copy(reader, &mut encoder)?;
let mut out = encoder.finish()?;
out.flush()?;
Ok(written)
})
}
/// Compress a file to `*.zst` using multi-threaded zstd when available.
fn compress_zstd(path: &Path, level: i32, workers: usize, dry_run: bool) -> Result<CompressionOutput, std::io::Error> {
let archive_path = archive_path(path, CompressionAlgorithm::Zstd);
compress_with_writer(path, &archive_path, dry_run, CompressionAlgorithm::Zstd, |reader, writer| {
let mut encoder = zstd::stream::Encoder::new(writer, level.clamp(1, 21))?;
encoder.multithread(workers.max(1) as u32)?;
let written = std::io::copy(reader, &mut encoder)?;
let mut out = encoder.finish()?;
out.flush()?;
Ok(written)
})
}
fn compress_with_writer<F>(
path: &Path,
archive_path: &Path,
dry_run: bool,
algorithm_used: CompressionAlgorithm,
mut writer_fn: F,
) -> Result<CompressionOutput, std::io::Error>
where
F: FnMut(&mut BufReader<File>, BufWriter<File>) -> Result<u64, std::io::Error>,
{
// Keep idempotent behavior: existing archive means this file has already
// been handled in a previous cleanup pass.
if archive_path.exists() {
debug!(file = ?archive_path, "compressed archive already exists, skipping");
let input_bytes = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
let output_bytes = std::fs::metadata(archive_path).map(|m| m.len()).unwrap_or(0);
return Ok(CompressionOutput {
archive_path: archive_path.to_path_buf(),
algorithm_used,
input_bytes,
output_bytes,
});
}
let input_bytes = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
if dry_run {
info!("[DRY RUN] Would compress file: {:?} -> {:?}", path, gz_path);
return Ok(());
info!("[DRY RUN] Would compress file: {:?} -> {:?}", path, archive_path);
return Ok(CompressionOutput {
archive_path: archive_path.to_path_buf(),
algorithm_used,
input_bytes,
output_bytes: 0,
});
}
// Create the output archive only after the dry-run short-circuit so this
// helper remains side-effect free when the caller is evaluating policy.
let input = File::open(path)?;
let output = File::create(&gz_path)?;
// Write to a temporary file first and then atomically rename it into place
// so callers never observe a partially written archive at `archive_path`.
let mut tmp_name = archive_path.as_os_str().to_owned();
tmp_name.push(".tmp");
let tmp_archive_path = std::path::PathBuf::from(tmp_name);
let output = File::create(&tmp_archive_path)?;
let mut reader = BufReader::new(input);
let mut writer = BufWriter::new(output);
let writer = BufWriter::new(output);
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level.clamp(1, 9)));
std::io::copy(&mut reader, &mut encoder)?;
let compressed = encoder.finish()?;
if let Err(e) = writer_fn(&mut reader, writer) {
// Best-effort cleanup of the incomplete temporary archive.
let _ = std::fs::remove_file(&tmp_archive_path);
return Err(e);
}
writer.write_all(&compressed)?;
writer.flush()?;
// Preserve Unix mode bits so rotated archives keep the same access policy.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Ok(src_meta) = std::fs::metadata(path) {
let mode = src_meta.permissions().mode();
let _ = std::fs::set_permissions(&tmp_archive_path, std::fs::Permissions::from_mode(mode));
}
}
// Atomically move the fully written temp file into its final location.
std::fs::rename(&tmp_archive_path, archive_path)?;
let output_bytes = std::fs::metadata(archive_path).map(|m| m.len()).unwrap_or(0);
debug!(
"Compressed {:?} -> {:?} ({} bytes -> {} bytes)",
path,
gz_path,
std::fs::metadata(path).map(|m| m.len()).unwrap_or(0),
compressed.len()
file = ?path,
archive = ?archive_path,
input_bytes,
output_bytes,
algorithm = %algorithm_used,
"compression finished"
);
Ok(())
Ok(CompressionOutput {
archive_path: archive_path.to_path_buf(),
algorithm_used,
input_bytes,
output_bytes,
})
}
/// Build `<filename>.<ext>` without replacing an existing extension.
///
/// Rotated log filenames often already encode a generation number or suffix
/// (for example `server.log.3`). Appending the archive extension preserves that
/// information in the final artifact name (`server.log.3.gz`).
fn archive_path(path: &Path, algorithm: CompressionAlgorithm) -> PathBuf {
let ext = algorithm.extension();
let mut file_name: OsString = path
.file_name()
.map_or_else(|| OsString::from("archive"), |n| n.to_os_string());
file_name.push(format!(".{ext}"));
path.with_file_name(file_name)
}

View File

@@ -14,67 +14,89 @@
//! Core log-file cleanup orchestration.
//!
//! [`LogCleaner`] is the public entry point for the cleanup subsystem.
//! Construct it with [`LogCleaner::builder`] and call [`LogCleaner::cleanup`]
//! periodically (e.g. from a `tokio::spawn`-ed loop).
//!
//! Internally the cleaner delegates to:
//! - [`super::scanner`] — to discover which files exist and which are eligible,
//! - [`super::compress`] — to gzip-compress files before they are deleted,
//! - [`LogCleaner::select_files_to_delete`] — to apply count / size limits.
//! This module connects scanning, retention selection, compression, and safe
//! deletion into one reusable service object. The public surface is intentionally
//! small: callers configure a [`LogCleaner`] once and then trigger discrete
//! cleanup passes whenever log rotation or background maintenance requires it.
use super::compress::compress_file;
use super::compress::{CompressionOptions, compress_file};
use super::scanner::{LogScanResult, scan_log_directory};
use super::types::{FileInfo, FileMatchMode};
use super::types::{CompressionAlgorithm, FileInfo, FileMatchMode, default_parallel_workers};
use crate::global::{
METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS, METRIC_LOG_CLEANER_DELETED_FILES_TOTAL, METRIC_LOG_CLEANER_FREED_BYTES_TOTAL,
METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE,
};
use crossbeam_channel::bounded;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_utils::thread;
use metrics::{counter, gauge, histogram};
use rustfs_config::DEFAULT_LOG_KEEP_FILES;
use std::path::PathBuf;
use std::time::SystemTime;
use tracing::{debug, error, info};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime};
use tracing::{debug, error, info, warn};
#[derive(Debug)]
struct CompressionTaskResult {
/// Original file metadata so successful workers can be deleted later.
file: FileInfo,
/// Whether compression completed successfully for this file.
compressed: bool,
}
/// Log-file lifecycle manager.
///
/// Holds all cleanup policy parameters and exposes a single [`cleanup`] method
/// that performs one full cleanup pass.
///
/// # Thread-safety
/// `LogCleaner` is `Send + Sync`. Multiple callers can share a reference
/// (e.g. via `Arc`) and call `cleanup` concurrently without data races,
/// because no mutable state is mutated after construction.
/// A cleaner instance is immutable after construction and therefore safe to
/// reuse across periodic background jobs. Each call to [`LogCleaner::cleanup`]
/// performs a fresh directory scan and applies the configured retention rules.
pub struct LogCleaner {
/// Directory containing the managed log files.
/// Directory containing the active and rotated log files.
pub(super) log_dir: PathBuf,
/// Pattern string to match files (used as prefix or suffix).
/// Pattern used to recognize relevant log generations.
pub(super) file_pattern: String,
/// Exact name of the active log file (to exclude from cleanup).
/// The currently active log file that must never be touched.
pub(super) active_filename: String,
/// Whether to match by prefix or suffix.
/// Whether `file_pattern` is interpreted as a prefix or suffix.
pub(super) match_mode: FileMatchMode,
/// The cleaner will never delete files if doing so would leave fewer than
/// this many files in the directory.
/// Minimum number of regular log files to keep regardless of size.
pub(super) keep_files: usize,
/// Hard ceiling on the total bytes of all managed files; `0` = no limit.
/// Optional cap for the cumulative size of regular logs.
pub(super) max_total_size_bytes: u64,
/// Hard ceiling on a single file's size; `0` = no per-file limit.
/// Optional cap for an individual regular log file.
pub(super) max_single_file_size_bytes: u64,
/// Compress eligible files with gzip before removing them.
/// Whether selected regular logs should be compressed before deletion.
pub(super) compress_old_files: bool,
/// Gzip compression level (`1``9`, clamped on construction).
/// Gzip compression level used when gzip is selected or used as fallback.
pub(super) gzip_compression_level: u32,
/// Delete compressed archives older than this many days; `0` = keep forever.
/// Retention window for already compressed archives, expressed in days.
pub(super) compressed_file_retention_days: u64,
/// Compiled glob patterns for files that must never be cleaned up.
/// Glob patterns that are excluded before any cleanup decision is made.
pub(super) exclude_patterns: Vec<glob::Pattern>,
/// Delete zero-byte files even when they are younger than `min_file_age_seconds`.
/// Whether zero-byte regular logs may be removed during scanning.
pub(super) delete_empty_files: bool,
/// Files younger than this threshold (in seconds) are never touched.
/// Minimum age a regular log must reach before it becomes eligible.
pub(super) min_file_age_seconds: u64,
/// When `true`, log what would be done without performing any destructive
/// filesystem operations.
/// Dry-run mode reports intended actions without modifying files.
pub(super) dry_run: bool,
// Parallel compression controls while keeping backward compatibility with
// the original serial cleaner behavior.
/// Preferred archive codec for compression-enabled cleanup passes.
pub(super) compression_algorithm: CompressionAlgorithm,
/// Enables the work-stealing compression path when compression is active.
pub(super) parallel_compress: bool,
/// Number of worker threads used by the parallel compressor.
pub(super) parallel_workers: usize,
/// Zstd compression level when zstd is selected.
pub(super) zstd_compression_level: i32,
/// Whether a failed zstd attempt should retry with gzip.
pub(super) zstd_fallback_to_gzip: bool,
/// Number of internal threads requested from the zstd encoder.
pub(super) zstd_workers: usize,
}
impl LogCleaner {
/// Create a builder to construct a `LogCleaner`.
/// Create a builder with the required path and filename matching inputs.
pub fn builder(
log_dir: impl Into<PathBuf>,
file_pattern: impl Into<String>,
@@ -84,19 +106,6 @@ impl LogCleaner {
}
/// Perform one full cleanup pass.
///
/// Steps:
/// 1. Scan the log directory for managed files (excluding the active file).
/// 2. Apply count/size policies to select files for deletion.
/// 3. Optionally compress selected files, then delete them.
/// 4. Collect and delete expired compressed archives.
///
/// # Returns
/// A tuple `(deleted_count, freed_bytes)` covering all deletions in this
/// pass (both regular files and expired compressed archives).
///
/// # Errors
/// Returns an [`std::io::Error`] if the log directory cannot be read.
pub fn cleanup(&self) -> Result<(usize, u64), std::io::Error> {
if !self.log_dir.exists() {
debug!("Log directory does not exist: {:?}", self.log_dir);
@@ -106,8 +115,6 @@ impl LogCleaner {
let mut total_deleted = 0usize;
let mut total_freed = 0u64;
// ── 1. Discover active log files (Archives only) ──────────────────────
// We explicitly pass `active_filename` to exclude it from the list.
let LogScanResult {
mut logs,
mut compressed_archives,
@@ -122,7 +129,6 @@ impl LogCleaner {
self.dry_run,
)?;
// ── 2. Select + compress + delete (Regular Logs) ──────────────────────
if !logs.is_empty() {
logs.sort_by_key(|f| f.modified);
let total_size: u64 = logs.iter().map(|f| f.size).sum();
@@ -134,16 +140,20 @@ impl LogCleaner {
total_size as f64 / 1024.0 / 1024.0
);
// Select the oldest files first, then additionally trim any files
// that still violate configured size constraints.
let to_delete = self.select_files_to_process(&logs, total_size);
if !to_delete.is_empty() {
let (d, f) = self.compress_and_delete(&to_delete)?;
total_deleted += d;
total_freed += f;
let (deleted, freed) = if self.parallel_compress && self.compress_old_files {
self.parallel_stealing_compress(&to_delete)?
} else {
self.serial_compress_and_delete(&to_delete)?
};
total_deleted += deleted;
total_freed += freed;
}
}
// ── 3. Remove expired compressed archives ─────────────────────────────
if !compressed_archives.is_empty() && self.compressed_file_retention_days > 0 {
let expired = self.select_expired_compressed(&mut compressed_archives);
if !expired.is_empty() {
@@ -154,6 +164,8 @@ impl LogCleaner {
}
if total_deleted > 0 || total_freed > 0 {
counter!(METRIC_LOG_CLEANER_DELETED_FILES_TOTAL).increment(total_deleted as u64);
counter!(METRIC_LOG_CLEANER_FREED_BYTES_TOTAL).increment(total_freed);
info!(
"Cleanup completed: deleted {} files, freed {} bytes ({:.2} MB)",
total_deleted,
@@ -165,51 +177,31 @@ impl LogCleaner {
Ok((total_deleted, total_freed))
}
// ─── Selection ────────────────────────────────────────────────────────────
/// Choose which files from `files` (sorted oldest-first) should be deleted.
/// Choose regular log files that should be compressed and/or deleted.
///
/// The algorithm respects three constraints in order:
/// 1. Always keep at least `keep_files` files (archives).
/// 2. Delete old files while the total size exceeds `max_total_size_bytes`.
/// 3. Delete any file whose individual size exceeds `max_single_file_size_bytes`.
/// The `files` slice must already be sorted from oldest to newest. The
/// method first preserves the newest `keep_files` generations, then applies
/// total-size and per-file-size limits to the remaining tail.
pub(super) fn select_files_to_process(&self, files: &[FileInfo], total_size: u64) -> Vec<FileInfo> {
let mut to_delete = Vec::new();
if files.is_empty() {
return to_delete;
}
// Calculate how many files we *must* delete to satisfy keep_files.
let must_delete_count = files.len().saturating_sub(self.keep_files);
let mut current_size = total_size;
for (idx, file) in files.iter().enumerate() {
// Condition 1: Enforce keep_files.
// If we are in the range of files that exceed the count limit, delete them.
if idx < must_delete_count {
current_size = current_size.saturating_sub(file.size);
to_delete.push(file.clone());
continue;
}
// Condition 2: Enforce max_total_size_bytes.
let over_total = self.max_total_size_bytes > 0 && current_size > self.max_total_size_bytes;
// Condition 3: Enforce max_single_file_size_bytes.
// Note: Since active file is excluded, if an archive is > max_single, it means it
// was rotated out being too large (likely) or we lowered the limit. It should be deleted.
let over_single = self.max_single_file_size_bytes > 0 && file.size > self.max_single_file_size_bytes;
if over_total {
current_size = current_size.saturating_sub(file.size);
to_delete.push(file.clone());
} else if over_single {
debug!(
"Archive exceeds single-file size limit: {:?} ({} > {} bytes). Deleting.",
file.path, file.size, self.max_single_file_size_bytes
);
if over_total || over_single {
current_size = current_size.saturating_sub(file.size);
to_delete.push(file.clone());
}
@@ -218,9 +210,9 @@ impl LogCleaner {
to_delete
}
/// Select compressed files that have exceeded the retention period.
/// Select compressed archives whose age exceeds the archive retention window.
fn select_expired_compressed(&self, files: &mut [FileInfo]) -> Vec<FileInfo> {
let retention = std::time::Duration::from_secs(self.compressed_file_retention_days * 24 * 3600);
let retention = Duration::from_secs(self.compressed_file_retention_days * 24 * 3600);
let now = SystemTime::now();
let mut expired = Vec::new();
@@ -235,22 +227,259 @@ impl LogCleaner {
expired
}
// ─── Compression + deletion ───────────────────────────────────────────────
/// Securely delete a file, preventing symlink attacks (TOCTOU).
/// Parallel compressor with work stealing.
///
/// This function verifies that the path is not a symlink before attempting deletion.
/// While strictly speaking a race condition is still theoretically possible between
/// `symlink_metadata` and `remove_file`, this check covers the vast majority of
/// privilege escalation vectors where a user replaces a log file with a symlink
/// to a system file.
fn secure_delete(&self, path: &PathBuf) -> std::io::Result<()> {
// 1. Lstat (symlink_metadata) - do not follow links
let meta = std::fs::symlink_metadata(path)?;
/// The flow is intentionally split into "parallel compression" followed by
/// "serial deletion" to reduce cross-platform file-locking failures.
/// Compression workers only decide whether an archive was created; the main
/// thread remains responsible for actual source removal so deletion policy
/// and error reporting stay deterministic.
fn parallel_stealing_compress(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
if files.len() <= 1 {
return self.serial_compress_and_delete(files);
}
// 2. Symlink Check
// If it's a symlink, we NEVER delete it. It might point to /etc/passwd.
// In a log directory, symlinks are unexpected and dangerous.
let worker_count = self.parallel_workers.min(files.len()).max(1);
if worker_count <= 1 {
return self.serial_compress_and_delete(files);
}
let compression_options = self.compression_options();
let started_at = Instant::now();
let injector = Arc::new(Injector::new());
for file in files {
injector.push(file.clone());
}
let mut workers = Vec::with_capacity(worker_count);
let mut stealers = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
workers.push(worker);
}
let stealers = Arc::new(stealers);
let steal_attempts = Arc::new(AtomicU64::new(0));
let steal_successes = Arc::new(AtomicU64::new(0));
let (tx, rx) = bounded::<CompressionTaskResult>(worker_count.saturating_mul(2).max(8));
// Spawn a fixed-size worker set in a scoped region so panics are
// contained and can be downgraded to a serial fallback instead of
// leaking detached threads.
let scope_result = thread::scope(|scope| {
for (worker_id, local_worker) in workers.into_iter().enumerate() {
let tx = tx.clone();
let injector = Arc::clone(&injector);
let stealers = Arc::clone(&stealers);
let options = compression_options.clone();
let attempts = Arc::clone(&steal_attempts);
let successes = Arc::clone(&steal_successes);
scope.spawn(move |_| {
let mut seed = (worker_id as u64 + 1)
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
loop {
// Search order: local FIFO -> global injector batch ->
// random victim stealers.
let task = if let Some(file) = local_worker.pop() {
Some(file)
} else {
match injector.steal_batch_and_pop(&local_worker) {
Steal::Success(file) => {
attempts.fetch_add(1, Ordering::Relaxed);
successes.fetch_add(1, Ordering::Relaxed);
Some(file)
}
Steal::Retry => continue,
Steal::Empty => {
let stolen = Self::steal_from_victims(
worker_id,
&local_worker,
&stealers,
&attempts,
&successes,
&mut seed,
);
// Exit only when all task sources are empty.
if stolen.is_none()
&& injector.is_empty()
&& local_worker.is_empty()
&& stealers.iter().all(Stealer::is_empty)
{
break;
}
stolen
}
}
};
let Some(file) = task else {
std::thread::yield_now();
continue;
};
let compressed = match compress_file(&file.path, &options) {
Ok(output) => {
debug!(
file = ?file.path,
archive = ?output.archive_path,
algorithm = %output.algorithm_used,
input_bytes = output.input_bytes,
output_bytes = output.output_bytes,
"parallel compression done"
);
true
}
Err(err) => {
warn!(file = ?file.path, error = %err, "parallel compression failed");
false
}
};
if tx.send(CompressionTaskResult { file, compressed }).is_err() {
break;
}
}
});
}
});
drop(tx);
// Any worker panic triggers deterministic fallback behavior.
if scope_result.is_err() {
warn!("parallel compression worker panicked, falling back to serial path");
return self.serial_compress_and_delete(files);
}
let mut deletable = Vec::with_capacity(files.len());
for result in rx {
if result.compressed {
deletable.push(result.file);
}
}
let (deleted, freed) = self.delete_files(&deletable)?;
let elapsed = started_at.elapsed().as_secs_f64();
let attempts = steal_attempts.load(Ordering::Relaxed);
let successes = steal_successes.load(Ordering::Relaxed);
let success_rate = if attempts == 0 {
0.0
} else {
successes as f64 / attempts as f64
};
// Emit post-run cleanup metrics for monitoring and alerting.
histogram!(METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS).record(elapsed);
gauge!(METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE).set(success_rate);
info!(
workers = worker_count,
algorithm = %self.compression_algorithm,
deleted,
freed,
duration_seconds = elapsed,
steal_attempts = attempts,
steal_successes = successes,
steal_success_rate = success_rate,
"parallel cleanup finished"
);
Ok((deleted, freed))
}
/// Attempt to steal a task from peer workers using randomized victim order.
fn steal_from_victims(
worker_id: usize,
local_worker: &Worker<FileInfo>,
stealers: &[Stealer<FileInfo>],
attempts: &AtomicU64,
successes: &AtomicU64,
seed: &mut u64,
) -> Option<FileInfo> {
if stealers.len() <= 1 {
return None;
}
// Xorshift step to randomize victim polling order and avoid convoying.
*seed ^= *seed << 13;
*seed ^= *seed >> 7;
*seed ^= *seed << 17;
let start = (*seed as usize) % stealers.len();
let steal_result = Steal::from_iter((0..stealers.len()).map(|offset| {
let victim = (start + offset) % stealers.len();
if victim == worker_id {
return Steal::Empty;
}
attempts.fetch_add(1, Ordering::Relaxed);
stealers[victim].steal_batch_and_pop(local_worker)
}));
match steal_result {
Steal::Success(file) => {
successes.fetch_add(1, Ordering::Relaxed);
Some(file)
}
Steal::Retry | Steal::Empty => None,
}
}
/// Serial fallback path and non-parallel baseline.
///
/// This path is also used whenever the task set is too small to benefit
/// from worker orchestration.
fn serial_compress_and_delete(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
let started_at = Instant::now();
let mut deletable = Vec::with_capacity(files.len());
if self.compress_old_files {
let options = self.compression_options();
for file in files {
match compress_file(&file.path, &options) {
Ok(output) => {
debug!(
file = ?file.path,
archive = ?output.archive_path,
algorithm = %output.algorithm_used,
input_bytes = output.input_bytes,
output_bytes = output.output_bytes,
"serial compression done"
);
deletable.push(file.clone());
}
Err(err) => {
warn!(file = ?file.path, error = %err, "serial compression failed, source kept");
}
}
}
} else {
deletable.extend(files.iter().cloned());
}
let (deleted, freed) = self.delete_files(&deletable)?;
histogram!(METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS).record(started_at.elapsed().as_secs_f64());
Ok((deleted, freed))
}
/// Snapshot compression-related configuration for a single cleanup pass.
fn compression_options(&self) -> CompressionOptions {
CompressionOptions {
algorithm: self.compression_algorithm,
gzip_level: self.gzip_compression_level,
zstd_level: self.zstd_compression_level,
zstd_workers: self.zstd_workers,
zstd_fallback_to_gzip: self.zstd_fallback_to_gzip,
dry_run: self.dry_run,
}
}
/// Delete a file while refusing symlinks and accommodating platform quirks.
fn secure_delete(&self, path: &PathBuf) -> std::io::Result<()> {
let meta = std::fs::symlink_metadata(path)?;
if meta.file_type().is_symlink() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
@@ -258,59 +487,36 @@ impl LogCleaner {
));
}
// 3. Perform Deletion
std::fs::remove_file(path)
}
/// Optionally compress and then delete the given files.
///
/// This function is synchronous and blocking. It should be called within a
/// `spawn_blocking` task if running in an async context.
fn compress_and_delete(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
let mut total_deleted = 0;
let mut total_freed = 0;
for f in files {
let mut deleted_size = 0;
if self.compress_old_files {
match compress_file(&f.path, self.gzip_compression_level, self.dry_run) {
Ok(_) => {}
Err(e) => {
tracing::warn!("Failed to compress {:?}: {}", f.path, e);
#[cfg(windows)]
{
// Retry removes to mitigate transient handle races from external
// scanners/AV software.
let mut last_err: Option<std::io::Error> = None;
for _ in 0..3 {
match std::fs::remove_file(path) {
Ok(()) => return Ok(()),
Err(err) => {
last_err = Some(err);
std::thread::sleep(Duration::from_millis(20));
}
}
}
// Now delete
if self.dry_run {
info!("[DRY RUN] Would delete: {:?} ({} bytes)", f.path, f.size);
deleted_size = f.size;
} else {
match self.secure_delete(&f.path) {
Ok(()) => {
debug!("Deleted: {:?}", f.path);
deleted_size = f.size;
}
Err(e) => {
error!("Failed to delete {:?}: {}", f.path, e);
}
}
}
if deleted_size > 0 {
total_deleted += 1;
total_freed += deleted_size;
if let Some(err) = last_err {
return Err(err);
}
return Ok(());
}
Ok((total_deleted, total_freed))
#[cfg(not(windows))]
{
std::fs::remove_file(path)
}
}
/// Delete all files in `files`, logging each operation.
/// Delete the supplied files and return `(deleted_count, freed_bytes)`.
///
/// Errors on individual files are logged but do **not** abort the loop.
///
/// # Returns
/// `(deleted_count, freed_bytes)`.
/// In dry-run mode the returned counters still reflect the projected work
/// so callers and metrics can report what would have happened.
pub(super) fn delete_files(&self, files: &[FileInfo]) -> Result<(usize, u64), std::io::Error> {
let mut deleted = 0usize;
let mut freed = 0u64;
@@ -320,16 +526,17 @@ impl LogCleaner {
info!("[DRY RUN] Would delete: {:?} ({} bytes)", f.path, f.size);
deleted += 1;
freed += f.size;
} else {
match self.secure_delete(&f.path) {
Ok(()) => {
debug!("Deleted: {:?}", f.path);
deleted += 1;
freed += f.size;
}
Err(e) => {
error!("Failed to delete {:?}: {}", f.path, e);
}
continue;
}
match self.secure_delete(&f.path) {
Ok(()) => {
deleted += 1;
freed += f.size;
debug!("Deleted: {:?}", f.path);
}
Err(e) => {
error!("Failed to delete {:?}: {}", f.path, e);
}
}
}
@@ -339,6 +546,9 @@ impl LogCleaner {
}
/// Builder for [`LogCleaner`].
///
/// The builder keeps startup code readable when an application only needs to
/// override a subset of retention knobs.
pub struct LogCleanerBuilder {
log_dir: PathBuf,
file_pattern: String,
@@ -354,18 +564,22 @@ pub struct LogCleanerBuilder {
delete_empty_files: bool,
min_file_age_seconds: u64,
dry_run: bool,
compression_algorithm: CompressionAlgorithm,
parallel_compress: bool,
parallel_workers: usize,
zstd_compression_level: i32,
zstd_fallback_to_gzip: bool,
zstd_workers: usize,
}
impl LogCleanerBuilder {
/// Create a builder with conservative defaults.
pub fn new(log_dir: impl Into<PathBuf>, file_pattern: impl Into<String>, active_filename: impl Into<String>) -> Self {
Self {
log_dir: log_dir.into(),
file_pattern: file_pattern.into(),
active_filename: active_filename.into(),
match_mode: FileMatchMode::Prefix,
// Default to a safe non-zero value so that a builder created
// without an explicit `keep_files()` call does not immediately
// delete all matching log files.
keep_files: DEFAULT_LOG_KEEP_FILES,
max_total_size_bytes: 0,
max_single_file_size_bytes: 0,
@@ -376,64 +590,127 @@ impl LogCleanerBuilder {
delete_empty_files: false,
min_file_age_seconds: 0,
dry_run: false,
compression_algorithm: CompressionAlgorithm::default(),
parallel_compress: true,
parallel_workers: default_parallel_workers(),
zstd_compression_level: 8,
zstd_fallback_to_gzip: true,
zstd_workers: 1,
}
}
/// Configure whether `file_pattern` is matched as a prefix or suffix.
pub fn match_mode(mut self, match_mode: FileMatchMode) -> Self {
self.match_mode = match_mode;
self
}
/// Preserve at least this many newest regular log files.
pub fn keep_files(mut self, keep_files: usize) -> Self {
self.keep_files = keep_files;
self
}
/// Cap the aggregate size of retained regular log files.
pub fn max_total_size_bytes(mut self, max_total_size_bytes: u64) -> Self {
self.max_total_size_bytes = max_total_size_bytes;
self
}
/// Cap the size of any individual regular log file.
pub fn max_single_file_size_bytes(mut self, max_single_file_size_bytes: u64) -> Self {
self.max_single_file_size_bytes = max_single_file_size_bytes;
self
}
/// Enable archival compression before deleting selected source logs.
pub fn compress_old_files(mut self, compress_old_files: bool) -> Self {
self.compress_old_files = compress_old_files;
self
}
/// Set the gzip compression level used for gzip output or gzip fallback.
pub fn gzip_compression_level(mut self, gzip_compression_level: u32) -> Self {
self.gzip_compression_level = gzip_compression_level;
self
}
/// Set how long compressed archives may remain on disk.
pub fn compressed_file_retention_days(mut self, days: u64) -> Self {
self.compressed_file_retention_days = days;
self
}
/// Exclude files matching these glob patterns from every cleanup pass.
pub fn exclude_patterns(mut self, patterns: Vec<String>) -> Self {
self.exclude_patterns = patterns;
self
}
/// Allow the scanner to remove matching zero-byte regular logs immediately.
pub fn delete_empty_files(mut self, delete_empty_files: bool) -> Self {
self.delete_empty_files = delete_empty_files;
self
}
/// Require regular log files to be at least this old before processing.
pub fn min_file_age_seconds(mut self, seconds: u64) -> Self {
self.min_file_age_seconds = seconds;
self
}
/// Enable dry-run mode for scans, compression decisions, and deletion.
pub fn dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
/// Set the preferred compression algorithm explicitly.
pub fn compression_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
self.compression_algorithm = algorithm;
self
}
/// Parse and set the compression algorithm from configuration text.
pub fn compression_algorithm_str(mut self, algorithm: impl AsRef<str>) -> Self {
self.compression_algorithm = CompressionAlgorithm::from_config_str(algorithm.as_ref());
self
}
/// Enable or disable the parallel work-stealing compression path.
pub fn parallel_compress(mut self, enabled: bool) -> Self {
self.parallel_compress = enabled;
self
}
/// Set the number of compression workers, clamped to at least one.
pub fn parallel_workers(mut self, workers: usize) -> Self {
self.parallel_workers = workers.max(1);
self
}
/// Set the zstd compression level.
pub fn zstd_compression_level(mut self, level: i32) -> Self {
self.zstd_compression_level = level;
self
}
/// Retry compression with gzip when zstd encoding fails.
pub fn zstd_fallback_to_gzip(mut self, enabled: bool) -> Self {
self.zstd_fallback_to_gzip = enabled;
self
}
/// Set the number of internal worker threads requested from zstd.
pub fn zstd_workers(mut self, workers: usize) -> Self {
self.zstd_workers = workers.max(1);
self
}
/// Finalize the builder into an immutable [`LogCleaner`].
///
/// Invalid glob patterns are ignored rather than failing construction, and
/// codec-related numeric values are clamped into safe ranges.
pub fn build(self) -> LogCleaner {
let patterns = self
.exclude_patterns
@@ -456,6 +733,12 @@ impl LogCleanerBuilder {
delete_empty_files: self.delete_empty_files,
min_file_age_seconds: self.min_file_age_seconds,
dry_run: self.dry_run,
compression_algorithm: self.compression_algorithm,
parallel_compress: self.parallel_compress,
parallel_workers: self.parallel_workers.max(1),
zstd_compression_level: self.zstd_compression_level.clamp(1, 21),
zstd_fallback_to_gzip: self.zstd_fallback_to_gzip,
zstd_workers: self.zstd_workers.max(1),
}
}
}

View File

@@ -14,17 +14,32 @@
//! Log-file cleanup subsystem.
//!
//! This module provides [`LogCleaner`], a configurable manager that
//! periodically removes, compresses, or archives old rolling log files.
//! This module exposes the high-level [`LogCleaner`] API used by the
//! observability layer to manage rotated log files after they are no longer
//! active. The implementation is intentionally split into small focused
//! sub-modules so each concern can evolve independently without turning the
//! cleaner into a monolithic state machine.
//!
//! At a high level, one cleanup pass follows this lifecycle:
//!
//! 1. scan the target directory and classify matching entries;
//! 2. decide which regular log files exceed retention constraints;
//! 3. optionally compress those files using the configured codec;
//! 4. delete only files that are safe to remove;
//! 5. separately expire already-compressed archives by age.
//!
//! The design favors operational safety over aggressive reclamation:
//! compression and deletion are decoupled, symlinks are rejected on removal,
//! and dry-run mode reports intent without mutating the filesystem.
//!
//! ## Sub-modules
//!
//! | Module | Responsibility |
//! |-------------|----------------------------------------------------------|
//! | `types` | Shared data types (`FileInfo`) |
//! | `scanner` | Filesystem traversal — discovers eligible files |
//! | `compress` | Gzip compression helper |
//! | `core` | Core orchestration — selection, compression, deletion |
//! | `types` | Shared enums and metadata (`FileInfo`, match/compression choices) |
//! | `scanner` | Filesystem traversal, pattern matching, empty-file handling |
//! | `compress` | Archive creation helpers for gzip and zstd |
//! | `core` | Selection, parallel/serial processing, secure deletion |
//!
//! ## Usage
//!
@@ -60,6 +75,10 @@ mod core;
mod scanner;
pub mod types;
/// Primary entry point for the cleaner subsystem.
///
/// Re-exported from [`core`] so callers can construct a cleaner without
/// knowing the internal module layout.
pub use core::LogCleaner;
#[cfg(test)]
@@ -72,6 +91,7 @@ mod tests {
use std::path::Path;
use tempfile::TempDir;
/// Create a test log file with deterministic contents and size.
fn create_log_file(dir: &Path, name: &str, size: usize) -> std::io::Result<()> {
let path = dir.join(name);
let mut f = File::create(path)?;

View File

@@ -18,15 +18,23 @@
//! The one exception is zero-byte file removal — when `delete_empty_files`
//! is enabled, `scan_log_directory` removes empty regular files as part of
//! the scan so that they are not counted in retention calculations.
//!
//! The scanner is also the first safety boundary of the cleaner pipeline. It
//! performs a shallow directory walk, rejects symlinks by relying on
//! `symlink_metadata`, and separates plain logs from pre-compressed archives so
//! later stages can apply different retention rules without rescanning.
use super::types::{FileInfo, FileMatchMode};
use rustfs_config::observability::DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION;
use super::types::{CompressionAlgorithm, FileInfo, FileMatchMode};
use std::fs;
use std::path::Path;
use std::time::SystemTime;
use tracing::debug;
/// Result of a single pass directory scan.
///
/// Separating regular logs from compressed archives keeps the selection logic
/// straightforward: active retention limits apply to the former, while archive
/// expiry rules apply to the latter.
pub(super) struct LogScanResult {
/// Regular log files eligible for deletion/compression.
pub logs: Vec<FileInfo>,
@@ -49,6 +57,7 @@ pub(super) struct LogScanResult {
/// * `delete_empty_files` - When `true`, zero-byte regular files that match
/// the pattern are deleted immediately inside this function and excluded
/// from the returned [`LogScanResult`].
/// * `dry_run` - When `true`, destructive actions are logged but not executed.
#[allow(clippy::too_many_arguments)]
pub(super) fn scan_log_directory(
log_dir: &Path,
@@ -121,13 +130,19 @@ pub(super) fn scan_log_directory(
}
// 3. Classify file type and check pattern match.
let is_compressed = filename.ends_with(DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION);
let matched_suffix = CompressionAlgorithm::compressed_suffixes()
.into_iter()
.find(|suffix| filename.ends_with(suffix));
let is_compressed = matched_suffix.is_some();
// For matching, we need the "base" name.
// If compressed: "foo.log.gz" -> check "foo.log"
// If regular: "foo.log" -> check "foo.log"
let name_to_match = if is_compressed {
&filename[..filename.len() - DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION.len()]
// Perform matching on the logical log filename.
// Examples:
// - regular log: `foo.log.1` -> `foo.log.1`
// - archive: `foo.log.1.gz` -> `foo.log.1`
// This allows the same include/exclude pattern configuration to apply
// to both raw and already-compressed generations.
let name_to_match = if let Some(suffix) = matched_suffix {
&filename[..filename.len() - suffix.len()]
} else {
filename
};
@@ -149,11 +164,13 @@ pub(super) fn scan_log_directory(
Err(_) => continue, // Skip files where we can't read modification time
};
// 5. Handle zero-byte files (Regular logs only).
// We generally don't delete empty compressed files implicitly, but let's stick to regular files logic.
// 5. Handle zero-byte files (regular logs only).
// Empty compressed artifacts are left alone here because they belong
// to the archive-retention path and should not disappear outside that
// explicit policy.
if !is_compressed && file_size == 0 && delete_empty_files {
if !dry_run {
if let Err(e) = std::fs::remove_file(&path) {
if let Err(e) = fs::remove_file(&path) {
tracing::warn!("Failed to delete empty file {:?}: {}", path, e);
} else {
debug!("Deleted empty file: {:?}", path);
@@ -164,8 +181,9 @@ pub(super) fn scan_log_directory(
continue;
}
// 6. Age Check (Regular logs only).
// Compressed files have their own retention check in the caller.
// 6. Age gate regular logs only.
// Compressed files deliberately bypass this check because archive
// expiry is driven by a dedicated retention horizon in the caller.
if !is_compressed
&& let Ok(age) = now.duration_since(modified)
&& age.as_secs() < min_file_age_seconds

View File

@@ -13,7 +13,18 @@
// limitations under the License.
//! Shared types used across the log-cleanup sub-modules.
//!
//! These types deliberately stay lightweight because they are passed between
//! the scanner, selector, compressor, and deletion stages. Keeping them small
//! and explicit makes the cleaner easier to reason about and cheaper to move
//! across worker threads in the parallel pipeline.
use rustfs_config::observability::{
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP,
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION,
DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MATCH_MODE_PREFIX,
DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION,
};
use std::fmt;
use std::path::PathBuf;
use std::time::SystemTime;
@@ -33,8 +44,21 @@ impl FileMatchMode {
/// Returns the string representation of the match mode.
pub fn as_str(&self) -> &'static str {
match self {
FileMatchMode::Prefix => "prefix",
FileMatchMode::Suffix => "suffix",
FileMatchMode::Prefix => DEFAULT_OBS_LOG_MATCH_MODE_PREFIX,
FileMatchMode::Suffix => DEFAULT_OBS_LOG_MATCH_MODE,
}
}
/// Parse a config value into a [`FileMatchMode`].
///
/// Any non-`prefix` value falls back to [`FileMatchMode::Suffix`] to keep
/// configuration handling permissive and aligned with the historical
/// cleaner default used by rolling log filenames.
pub fn from_config_str(value: &str) -> Self {
if value.trim().eq_ignore_ascii_case(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX) {
Self::Prefix
} else {
Self::Suffix
}
}
}
@@ -45,17 +69,135 @@ impl fmt::Display for FileMatchMode {
}
}
/// Compression algorithm used by the cleaner.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionAlgorithm {
/// Gzip keeps backward compatibility with existing `.gz` archives.
Gzip,
/// Zstd provides better ratio and higher decompression throughput.
Zstd,
}
impl CompressionAlgorithm {
/// Parse a normalized lowercase configuration token or extension alias.
fn parse_normalized(value: &str) -> Option<Self> {
if value == DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP || value == DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION {
Some(Self::Gzip)
} else if value == DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD || value == DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION {
Some(Self::Zstd)
} else {
None
}
}
/// Parse from a user-facing configuration string.
///
/// Supported values include both semantic names (`gzip`, `zstd`) and file
/// extension aliases (`gz`, `zst`). Unknown values intentionally fall back
/// to the crate default so observability startup remains resilient.
pub fn from_config_str(value: &str) -> Self {
let normalized = value.trim().to_ascii_lowercase();
Self::parse_normalized(&normalized).unwrap_or_default()
}
/// Archive suffix (without dot) used for this algorithm.
///
/// The returned value is suitable for appending to an existing filename,
/// rather than replacing the source extension.
pub fn extension(self) -> &'static str {
match self {
Self::Gzip => DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION.trim_start_matches('.'),
Self::Zstd => DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION.trim_start_matches('.'),
}
}
/// Supported compressed suffixes used by scanner retention logic.
///
/// The scanner uses this list to recognize already-archived files and to
/// keep them on a separate retention path from plain log files.
pub fn compressed_suffixes() -> [&'static str; 2] {
[
DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION,
DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION,
]
}
/// Stable lowercase string form used in logs and configuration echoes.
pub fn as_str(self) -> &'static str {
match self {
Self::Gzip => DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP,
Self::Zstd => DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD,
}
}
}
impl std::str::FromStr for CompressionAlgorithm {
type Err = &'static str;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let normalized = value.trim().to_ascii_lowercase();
Self::parse_normalized(&normalized).ok_or("invalid compression algorithm")
}
}
impl Default for CompressionAlgorithm {
fn default() -> Self {
Self::from_config_str(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM)
}
}
impl fmt::Display for CompressionAlgorithm {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
/// Worker-thread default used by parallel compressor.
///
/// The worker count follows CPU capacity but stays within [4, 8] to keep
/// throughput stable and avoid oversubscription. The lower bound helps the
/// work-stealing path on small machines still exercise concurrency, while the
/// upper bound avoids swamping the host when each task may also use internal
/// codec threads.
pub fn default_parallel_workers() -> usize {
num_cpus::get().clamp(4, 8)
}
/// Metadata for a single log file discovered by the scanner.
///
/// Carries enough information to make cleanup decisions (sort by age, compare
/// size against limits, etc.) without re-reading filesystem metadata on every
/// operation.
/// This snapshot is intentionally immutable after discovery. The cleaner uses
/// it to sort candidates by age, evaluate retention constraints, and report
/// deletion metrics without re-reading metadata during every later stage.
#[derive(Debug, Clone)]
pub(super) struct FileInfo {
/// Absolute path to the file.
/// Absolute or scanner-produced path to the file on disk.
pub path: PathBuf,
/// File size in bytes at the time of discovery.
///
/// This value is used for retention accounting and freed-byte metrics.
pub size: u64,
/// Last-modification timestamp from the filesystem.
///
/// The selection phase sorts on this timestamp so the oldest files are
/// processed first.
pub modified: SystemTime,
}
#[cfg(test)]
mod tests {
use super::CompressionAlgorithm;
#[test]
fn compression_algorithm_accepts_full_names_and_aliases() {
assert_eq!(CompressionAlgorithm::from_config_str("gzip"), CompressionAlgorithm::Gzip);
assert_eq!(CompressionAlgorithm::from_config_str("GZ"), CompressionAlgorithm::Gzip);
assert_eq!(CompressionAlgorithm::from_config_str("zstd"), CompressionAlgorithm::Zstd);
assert_eq!(CompressionAlgorithm::from_config_str(" zst "), CompressionAlgorithm::Zstd);
}
#[test]
fn compression_algorithm_defaults_or_errors_for_invalid_values() {
assert_eq!(CompressionAlgorithm::from_config_str("brotli"), CompressionAlgorithm::default());
assert!("brotli".parse::<CompressionAlgorithm>().is_err());
}
}

View File

@@ -24,17 +24,20 @@
use rustfs_config::observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS, DEFAULT_OBS_LOG_COMPRESS_OLD_FILES,
DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN,
DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES,
DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT,
DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS, DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES,
DEFAULT_OBS_LOG_DRY_RUN, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE,
DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS,
DEFAULT_OBS_LOG_PARALLEL_COMPRESS, DEFAULT_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL,
DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, DEFAULT_OBS_LOG_ZSTD_WORKERS, ENV_OBS_ENDPOINT, ENV_OBS_ENVIRONMENT,
ENV_OBS_LOG_CLEANUP_INTERVAL_SECONDS, ENV_OBS_LOG_COMPRESS_OLD_FILES, ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
ENV_OBS_LOG_DELETE_EMPTY_FILES, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_DRY_RUN, ENV_OBS_LOG_ENDPOINT,
ENV_OBS_LOG_EXCLUDE_PATTERNS, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_LOG_MATCH_MODE, ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES,
ENV_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOGGER_LEVEL,
ENV_OBS_LOGS_EXPORT_ENABLED, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT, ENV_OBS_METRICS_EXPORT_ENABLED,
ENV_OBS_PROFILING_ENDPOINT, ENV_OBS_PROFILING_EXPORT_ENABLED, ENV_OBS_SAMPLE_RATIO, ENV_OBS_SERVICE_NAME,
ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_TRACES_EXPORT_ENABLED, ENV_OBS_USE_STDOUT,
ENV_OBS_LOG_COMPRESSION_ALGORITHM, ENV_OBS_LOG_DELETE_EMPTY_FILES, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_DRY_RUN,
ENV_OBS_LOG_ENDPOINT, ENV_OBS_LOG_EXCLUDE_PATTERNS, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL,
ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_MATCH_MODE, ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES,
ENV_OBS_LOG_MIN_FILE_AGE_SECONDS, ENV_OBS_LOG_PARALLEL_COMPRESS, ENV_OBS_LOG_PARALLEL_WORKERS, ENV_OBS_LOG_ROTATION_TIME,
ENV_OBS_LOG_STDOUT_ENABLED, ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, ENV_OBS_LOG_ZSTD_WORKERS,
ENV_OBS_LOGGER_LEVEL, ENV_OBS_LOGS_EXPORT_ENABLED, ENV_OBS_METER_INTERVAL, ENV_OBS_METRIC_ENDPOINT,
ENV_OBS_METRICS_EXPORT_ENABLED, ENV_OBS_PROFILING_ENDPOINT, ENV_OBS_PROFILING_EXPORT_ENABLED, ENV_OBS_SAMPLE_RATIO,
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_TRACE_ENDPOINT, ENV_OBS_TRACES_EXPORT_ENABLED, ENV_OBS_USE_STDOUT,
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_FILENAME,
@@ -80,6 +83,11 @@ use std::env;
/// let config = OtelConfig::extract_otel_config_from_env(
/// Some("http://otel-collector:4318".to_string())
/// );
///
/// // Compression related env mapping examples:
/// // RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM=zstd
/// // RUSTFS_OBS_LOG_PARALLEL_COMPRESS=true
/// // RUSTFS_OBS_LOG_PARALLEL_WORKERS=6
/// ```
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct OtelConfig {
@@ -146,6 +154,18 @@ pub struct OtelConfig {
pub log_compress_old_files: Option<bool>,
/// Gzip compression level `1``9` (default: `6`).
pub log_gzip_compression_level: Option<u32>,
/// Compression algorithm: `gzip` or `zstd` (default: `zstd`).
pub log_compression_algorithm: Option<String>,
/// Enable crossbeam work-stealing parallel compression (default: `true`).
pub log_parallel_compress: Option<bool>,
/// Worker count for parallel compression (default: `6`).
pub log_parallel_workers: Option<usize>,
/// Zstd compression level `1``21` (default: `8`).
pub log_zstd_compression_level: Option<i32>,
/// Fallback to gzip when zstd compression fails (default: `true`).
pub log_zstd_fallback_to_gzip: Option<bool>,
/// Internal zstdmt workers per job (default: `1`).
pub log_zstd_workers: Option<usize>,
/// Delete compressed archives older than this many days; `0` = keep forever
/// (default: `30`).
pub log_compressed_file_retention_days: Option<u64>,
@@ -254,6 +274,21 @@ impl OtelConfig {
ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL,
DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL as u64,
) as u32),
log_compression_algorithm: Some(get_env_str(
ENV_OBS_LOG_COMPRESSION_ALGORITHM,
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM,
)),
log_parallel_compress: Some(get_env_bool(ENV_OBS_LOG_PARALLEL_COMPRESS, DEFAULT_OBS_LOG_PARALLEL_COMPRESS)),
log_parallel_workers: Some(get_env_usize(ENV_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_PARALLEL_WORKERS)),
log_zstd_compression_level: Some(get_env_u64(
ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL,
DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL as u64,
) as i32),
log_zstd_fallback_to_gzip: Some(get_env_bool(
ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP,
DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP,
)),
log_zstd_workers: Some(get_env_usize(ENV_OBS_LOG_ZSTD_WORKERS, DEFAULT_OBS_LOG_ZSTD_WORKERS)),
log_compressed_file_retention_days: Some(get_env_u64(
ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,

View File

@@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{AppConfig, GlobalError, OtelGuard, SystemObserver, telemetry::init_telemetry};
use crate::{AppConfig, GlobalError, OtelConfig, OtelGuard, SystemObserver, telemetry::init_telemetry};
use std::sync::{Arc, Mutex};
use tokio::sync::OnceCell;
use tracing::{error, info};
use tracing::{error, info, warn};
/// Global guard for OpenTelemetry tracing
static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
@@ -23,11 +23,40 @@ static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
/// Flag indicating if observability metric is enabled
pub(crate) static OBSERVABILITY_METRIC_ENABLED: OnceCell<bool> = OnceCell::const_new();
/// Namespaced metrics for cleaner and rolling logging.
pub(crate) const METRIC_LOG_CLEANER_DELETED_FILES_TOTAL: &str = "rustfs.log_cleaner.deleted_files_total";
pub(crate) const METRIC_LOG_CLEANER_FREED_BYTES_TOTAL: &str = "rustfs.log_cleaner.freed_bytes_total";
pub(crate) const METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS: &str = "rustfs.log_cleaner.compress_duration_seconds";
pub(crate) const METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE: &str = "rustfs.log_cleaner.steal_success_rate";
pub(crate) const METRIC_LOG_CLEANER_RUNS_TOTAL: &str = "rustfs.log_cleaner.runs_total";
pub(crate) const METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL: &str = "rustfs.log_cleaner.run_failures_total";
pub(crate) const METRIC_LOG_CLEANER_ROTATION_TOTAL: &str = "rustfs.log_cleaner.rotation_total";
pub(crate) const METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL: &str = "rustfs.log_cleaner.rotation_failures_total";
pub(crate) const METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS: &str = "rustfs.log_cleaner.rotation_duration_seconds";
pub(crate) const METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES: &str = "rustfs.log_cleaner.active_file_size_bytes";
/// Check whether Observability metric is enabled
pub fn observability_metric_enabled() -> bool {
OBSERVABILITY_METRIC_ENABLED.get().copied().unwrap_or(false)
}
/// Set the global observability metrics flag once.
///
/// When this function is called multiple times, only the first value is kept
/// and later values are ignored to preserve OnceCell semantics.
pub(crate) fn set_observability_metric_enabled(enabled: bool) {
if OBSERVABILITY_METRIC_ENABLED.set(enabled).is_err()
&& let Some(current) = OBSERVABILITY_METRIC_ENABLED.get()
&& *current != enabled
{
warn!(
current = *current,
requested = enabled,
"OBSERVABILITY_METRIC_ENABLED was already initialized; keeping original value"
);
}
}
/// Initialize the observability module
///
/// # Parameters
@@ -51,22 +80,53 @@ pub fn observability_metric_enabled() -> bool {
pub async fn init_obs(endpoint: Option<String>) -> Result<OtelGuard, GlobalError> {
// Load the configuration file
let config = AppConfig::new_with_endpoint(endpoint);
init_obs_with_config(&config.observability).await
}
let otel_guard = init_telemetry(&config.observability)?;
// Server will be created per connection - this ensures isolation
/// Initialize the observability module with an explicit [`OtelConfig`].
///
/// This is the lower-level counterpart to [`init_obs`]: it accepts a fully
/// constructed [`OtelConfig`] rather than building one from an endpoint URL.
/// Useful when the config is already available (e.g., embedded in a larger
/// application config struct) or when unit-testing with custom settings.
///
/// # Parameters
/// - `config`: A pre-built [`OtelConfig`] to drive all telemetry backends.
///
/// # Returns
/// An [`OtelGuard`] that must be kept alive for the lifetime of the
/// application. Dropping it flushes and shuts down all providers.
///
/// # Errors
/// Returns [`GlobalError`] when the underlying telemetry backend fails to
/// initialise (e.g., cannot create the log directory, or an OTLP exporter
/// cannot connect).
///
/// # Example
/// ```no_run
/// use rustfs_obs::{AppConfig, init_obs_with_config};
///
/// # #[tokio::main]
/// # async fn main() {
/// let config = AppConfig::new_with_endpoint(Some("http://localhost:4318".to_string()));
/// let _guard = init_obs_with_config(&config.observability)
/// .await
/// .expect("observability init failed");
/// # }
/// ```
pub async fn init_obs_with_config(config: &OtelConfig) -> Result<OtelGuard, GlobalError> {
let otel_guard = init_telemetry(config)?;
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let obs_result = SystemObserver::init_process_observer().await;
match obs_result {
Ok(_) => {
info!(target: "rustfs::obs::system::metrics","Process observer initialized successfully");
info!(target: "rustfs::obs::system::metrics", "Process observer initialized successfully");
}
Err(e) => {
error!(target: "rustfs::obs::system::metrics","Failed to initialize process observer: {}", e);
error!(target: "rustfs::obs::system::metrics", "Failed to initialize process observer: {}", e);
}
}
});
Ok(otel_guard)
}
@@ -121,9 +181,79 @@ pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GlobalError> {
#[cfg(test)]
mod tests {
use super::*;
const README: &str = include_str!("../README.md");
const GRAFANA_DASHBOARD: &str = include_str!("../../../.docker/observability/grafana/dashboards/rustfs.json");
#[tokio::test]
async fn test_get_uninitialized_guard() {
let result = get_global_guard();
assert!(matches!(result, Err(GlobalError::NotInitialized)));
}
#[test]
fn test_log_cleaner_metric_constants_use_rustfs_prefix() {
let metrics = [
METRIC_LOG_CLEANER_DELETED_FILES_TOTAL,
METRIC_LOG_CLEANER_FREED_BYTES_TOTAL,
METRIC_LOG_CLEANER_COMPRESS_DURATION_SECONDS,
METRIC_LOG_CLEANER_STEAL_SUCCESS_RATE,
METRIC_LOG_CLEANER_RUNS_TOTAL,
METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL,
METRIC_LOG_CLEANER_ROTATION_TOTAL,
METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL,
METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS,
METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES,
];
for metric in metrics {
assert!(
metric.starts_with("rustfs.log_cleaner."),
"metric '{metric}' should use rustfs.log_cleaner.* namespace"
);
}
}
#[test]
fn test_readme_contains_grafana_dashboard_draft_section() {
assert!(README.contains("## Cleaner & Rotation Metrics"));
assert!(README.contains("### Grafana Dashboard JSON Draft (Ready to Import)"));
assert!(README.contains("\"uid\": \"rustfs-log-cleaner\""));
assert!(README.contains("rustfs_log_cleaner_runs_total"));
assert!(README.contains("rustfs_log_cleaner_rotation_failures_total"));
}
#[test]
fn test_readme_contains_promql_templates_section() {
assert!(README.contains("### PromQL Templates"));
assert!(README.contains("Cleanup failure ratio"));
assert!(README.contains("histogram_quantile(0.95"));
assert!(README.contains("max(rustfs_log_cleaner_active_file_size_bytes)"));
}
#[test]
fn test_grafana_dashboard_contains_log_cleaner_row() {
assert!(GRAFANA_DASHBOARD.contains("\"title\": \"Log Cleaner\""));
assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_runs_total"));
assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_rotation_failures_total"));
assert!(GRAFANA_DASHBOARD.contains("rustfs_log_cleaner_active_file_size_bytes"));
}
#[test]
fn test_readme_mentions_deployed_dashboard_path() {
assert!(README.contains(".docker/observability/grafana/dashboards/rustfs.json"));
assert!(README.contains("row title: `Log Cleaner`"));
}
#[test]
fn test_init_obs_with_config_signature_is_correct() {
// Compile-time check: verifies that `init_obs_with_config` is public and
// accepts `&OtelConfig`, matching the README documentation.
// The future is created but intentionally not polled — this avoids
// initialising the global tracing subscriber in a unit-test context.
fn _type_check() {
let config = OtelConfig::default();
let _future = init_obs_with_config(&config);
}
}
}

View File

@@ -25,20 +25,27 @@
//!
//! The function [`init_local_logging`] is the single entry point for both
//! cases; callers do **not** need to distinguish between stdout and file modes.
//!
//! The file-backed mode delegates retention and compression to
//! [`crate::cleaner`], which keeps the logging setup code focused on subscriber
//! construction while still allowing periodic housekeeping in the background.
use super::guard::OtelGuard;
use crate::TelemetryError;
use crate::cleaner::LogCleaner;
use crate::cleaner::types::FileMatchMode;
use crate::cleaner::types::{CompressionAlgorithm, FileMatchMode};
use crate::config::OtelConfig;
use crate::global::OBSERVABILITY_METRIC_ENABLED;
use crate::global::{METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL, METRIC_LOG_CLEANER_RUNS_TOTAL, set_observability_metric_enabled};
use crate::telemetry::filter::build_env_filter;
use crate::telemetry::rolling::{RollingAppender, Rotation};
use metrics::counter;
use rustfs_config::observability::{
DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS, DEFAULT_OBS_LOG_COMPRESS_OLD_FILES, DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN, DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL,
DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS,
DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, DEFAULT_OBS_LOG_DELETE_EMPTY_FILES, DEFAULT_OBS_LOG_DRY_RUN,
DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES,
DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES, DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS, DEFAULT_OBS_LOG_PARALLEL_COMPRESS,
DEFAULT_OBS_LOG_PARALLEL_WORKERS, DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL, DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP,
DEFAULT_OBS_LOG_ZSTD_WORKERS,
};
use rustfs_config::{APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_STDOUT_ENABLED};
use std::sync::Arc;
@@ -118,6 +125,8 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo
let env_filter = build_env_filter(logger_level, None);
let (nb, guard) = tracing_appender::non_blocking(std::io::stdout());
// Keep stdout formatting JSON-shaped even in local-only mode so operators
// can ship the same log schema to external collectors if needed.
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
@@ -138,7 +147,7 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo
.with(fmt_layer)
.init();
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
set_observability_metric_enabled(false);
counter!("rustfs.start.total").increment(1);
info!("Init stdout logging (level: {})", logger_level);
@@ -161,6 +170,10 @@ fn init_stdout_only(_config: &OtelConfig, logger_level: &str, is_production: boo
/// Called by [`init_local_logging`] when a log directory is present.
/// Handles directory creation, permission enforcement (Unix), file appender
/// setup, optional stdout mirror, and log-cleanup task spawning.
///
/// The function intentionally performs all fallible filesystem preparation
/// before registering the subscriber so startup failures are reported early and
/// do not leave partially initialized tracing state behind.
fn init_file_logging_internal(
config: &OtelConfig,
log_directory: &str,
@@ -188,11 +201,9 @@ fn init_file_logging_internal(
.unwrap_or(DEFAULT_LOG_ROTATION_TIME)
.to_lowercase();
// Determine match mode from config, defaulting to Suffix
let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() {
Some("prefix") => FileMatchMode::Prefix,
_ => FileMatchMode::Suffix,
};
// Match mode controls how the rolling filename is recognized later by the
// cleaner. Suffix mode fits timestamp-prefixed filenames especially well.
let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE));
let rotation = match rotation_str.as_str() {
"minutely" => Rotation::Minutely,
@@ -214,7 +225,8 @@ fn init_file_logging_internal(
let env_filter = build_env_filter(logger_level, None);
let span_events = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL };
// File layer writes JSON without ANSI codes.
// File output stays machine-readable and free of ANSI sequences so the
// resulting files are safe to parse or ship to log processors.
let file_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
.with_target(true)
@@ -230,7 +242,8 @@ fn init_file_logging_internal(
.with_span_events(span_events.clone());
// Optional stdout mirror: enabled explicitly via `log_stdout_enabled`, or
// unconditionally in non-production environments.
// unconditionally in non-production environments so developers still see
// immediate terminal output while file rotation remains enabled.
let (stdout_layer, stdout_guard) = if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production {
let (stdout_nb, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
let enable_color = std::io::stdout().is_terminal();
@@ -262,7 +275,7 @@ fn init_file_logging_internal(
.with(stdout_layer)
.init();
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
set_observability_metric_enabled(false);
// ── 5. Start background cleanup task ─────────────────────────────────────
let cleanup_handle = spawn_cleanup_task(config, log_directory, log_filename, keep_files);
@@ -291,6 +304,8 @@ fn init_file_logging_internal(
/// Tightens permissions to `0755` if the directory is more permissive.
/// This prevents world-writable log directories from being a security hazard.
/// No-ops if permissions are already `0755` or stricter.
///
/// The function never broadens permissions; it is strictly a hardening step.
#[cfg(unix)]
pub fn ensure_dir_permissions(log_directory: &str) -> Result<(), TelemetryError> {
use std::fs::Permissions;
@@ -350,6 +365,10 @@ pub(super) fn emit_file_logging_fallback_warning(log_directory: &str, error: &Te
/// Tokio runtime and should be aborted (via the returned `JoinHandle`) when
/// the application shuts down.
///
/// The asynchronous loop itself remains lightweight: each cleanup pass is
/// delegated to `spawn_blocking` because directory traversal, compression, and
/// deletion are inherently blocking filesystem operations.
///
/// # Arguments
/// * `config` - Observability config containing cleanup parameters.
/// * `log_directory` - Directory path of the rolling log files.
@@ -366,16 +385,14 @@ pub fn spawn_cleanup_task(
keep_files: usize,
) -> tokio::task::JoinHandle<()> {
let log_dir = std::path::PathBuf::from(log_directory);
// Use suffix matching for log files like "2026-03-01-06-21.rustfs.log"
// where "rustfs.log" is the suffix.
// Use suffix matching for log files like `2026-03-01-06-21.rustfs.log`
// where `rustfs.log` is the stable suffix generated by the rolling appender.
let file_pattern = config.log_filename.as_deref().unwrap_or(log_filename).to_string();
let active_filename = file_pattern.clone();
// Determine match mode from config, defaulting to Suffix
let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() {
Some("prefix") => FileMatchMode::Prefix,
_ => FileMatchMode::Suffix,
};
// Determine match mode from config, defaulting to the repository-wide
// observability setting when the caller leaves it unset.
let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE));
let max_total_size = config
.log_max_total_size_bytes
@@ -387,6 +404,21 @@ pub fn spawn_cleanup_task(
let gzip_level = config
.log_gzip_compression_level
.unwrap_or(DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL);
let compression_algorithm = CompressionAlgorithm::from_config_str(
config
.log_compression_algorithm
.as_deref()
.unwrap_or(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM),
);
let parallel_compress = config.log_parallel_compress.unwrap_or(DEFAULT_OBS_LOG_PARALLEL_COMPRESS);
let parallel_workers = config.log_parallel_workers.unwrap_or(DEFAULT_OBS_LOG_PARALLEL_WORKERS);
let zstd_level = config
.log_zstd_compression_level
.unwrap_or(DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL);
let zstd_fallback_to_gzip = config
.log_zstd_fallback_to_gzip
.unwrap_or(DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP);
let zstd_workers = config.log_zstd_workers.unwrap_or(DEFAULT_OBS_LOG_ZSTD_WORKERS);
let retention_days = config
.log_compressed_file_retention_days
.unwrap_or(DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS);
@@ -412,6 +444,14 @@ pub fn spawn_cleanup_task(
.max_single_file_size_bytes(max_single_file_size)
.compress_old_files(compress)
.gzip_compression_level(gzip_level)
// Compression behavior stays fully config-driven, but the builder
// clamps unsafe numeric values and preserves sensible defaults.
.compression_algorithm(compression_algorithm)
.parallel_compress(parallel_compress)
.parallel_workers(parallel_workers)
.zstd_compression_level(zstd_level)
.zstd_fallback_to_gzip(zstd_fallback_to_gzip)
.zstd_workers(zstd_workers)
.compressed_file_retention_days(retention_days)
.exclude_patterns(exclude_patterns)
.delete_empty_files(delete_empty)
@@ -420,17 +460,37 @@ pub fn spawn_cleanup_task(
.build(),
);
info!(
compression_algorithm = %compression_algorithm,
parallel_compress,
parallel_workers,
zstd_level,
zstd_fallback_to_gzip,
zstd_workers,
"log cleaner compression profile configured"
);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(cleanup_interval));
loop {
// Wait for the next scheduled tick before dispatching another pass.
// The blocking filesystem work runs on a dedicated blocking thread.
interval.tick().await;
let cleaner_clone = cleaner.clone();
let result = tokio::task::spawn_blocking(move || cleaner_clone.cleanup()).await;
match result {
Ok(Ok(_)) => {} // Success
Ok(Err(e)) => tracing::warn!("Log cleanup failed: {}", e),
Err(e) => tracing::warn!("Log cleanup task panicked: {}", e),
Ok(Ok(_)) => {
counter!(METRIC_LOG_CLEANER_RUNS_TOTAL).increment(1);
}
Ok(Err(e)) => {
counter!(METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL).increment(1);
tracing::warn!("Log cleanup failed: {}", e);
}
Err(e) => {
counter!(METRIC_LOG_CLEANER_RUN_FAILURES_TOTAL).increment(1);
tracing::warn!("Log cleanup task panicked: {}", e);
}
}
}
})
@@ -443,6 +503,7 @@ mod tests {
use tempfile::tempdir;
#[test]
/// Invalid file names should be reported as errors instead of panicking.
fn test_init_file_logging_invalid_filename_does_not_panic() {
let temp_dir = tempdir().expect("create temp dir");
let temp_path = temp_dir.path().to_str().expect("temp dir path is utf-8");

View File

@@ -31,10 +31,14 @@
//!
//! All exporters use **HTTP binary** (Protobuf) encoding with **gzip**
//! compression for efficiency over the wire.
//!
//! If log export is not configured, this module deliberately falls back to the
//! same rolling-file logging path used by the local backend so applications can
//! combine OTLP traces/metrics with on-disk logs.
use crate::cleaner::types::FileMatchMode;
use crate::config::OtelConfig;
use crate::global::OBSERVABILITY_METRIC_ENABLED;
use crate::global::set_observability_metric_enabled;
use crate::telemetry::filter::build_env_filter;
use crate::telemetry::guard::OtelGuard;
// Import helper functions from local.rs (sibling module)
@@ -53,7 +57,7 @@ use opentelemetry_sdk::{
metrics::{PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
};
use rustfs_config::observability::DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES;
use rustfs_config::observability::{DEFAULT_OBS_LOG_MATCH_MODE, DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_STDOUT_ENABLED, DEFAULT_OBS_LOGS_EXPORT_ENABLED,
DEFAULT_OBS_METRICS_EXPORT_ENABLED, DEFAULT_OBS_TRACES_EXPORT_ENABLED, METER_INTERVAL, SAMPLE_RATIO,
@@ -97,6 +101,8 @@ pub(super) fn init_observability_http(
is_production: bool,
) -> Result<OtelGuard, TelemetryError> {
// ── Resource & sampling ──────────────────────────────────────────────────
// Build the common resource once so all enabled signals report the same
// service identity and deployment metadata.
let res = build_resource(config);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME).to_owned();
let use_stdout = config.use_stdout.unwrap_or(!is_production);
@@ -134,8 +140,9 @@ pub(super) fn init_observability_http(
}
});
// If log_endpoint is not explicitly set, fallback to root_ep/v1/logs ONLY if root_ep is present.
// If both are empty, log_ep is empty, which triggers the fallback to file logging logic.
// If `log_endpoint` is not explicitly set, fall back to `root_ep/v1/logs`
// only when a root endpoint exists. An empty result intentionally triggers
// the file-logging path below instead of silently disabling application logs.
let log_ep: String = config
.log_endpoint
.as_deref()
@@ -159,6 +166,8 @@ pub(super) fn init_observability_http(
let profiling_agent = init_profiler(config);
// ── Logger Logic ──────────────────────────────────────────────────────────
// Logging is the only signal that may intentionally route to either OTLP
// or local files depending on configuration completeness.
let mut logger_provider: Option<SdkLoggerProvider> = None;
let mut otel_bridge = None;
let mut file_layer_opt = None; // File layer (File mode)
@@ -180,11 +189,14 @@ pub(super) fn init_observability_http(
.as_ref()
.map(|p| OpenTelemetryTracingBridge::new(p).with_filter(build_env_filter(logger_level, None)));
// Note: We do NOT create a separate `fmt_layer_opt` here; stdout behavior is driven by the provider.
// No separate formatting layer is added here; when OTLP logging is
// active, the OpenTelemetry bridge is the authoritative sink for
// `tracing` events unless local file logging is needed as a fallback.
}
let span_events = if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL };
// ── Case 2: File Logging
// Supplement: If log_directory is set and no OTLP log endpoint is configured, we enable file logging logic.
// If a log directory is configured and OTLP log export is unavailable, use
// the same rolling-file behavior as the local-only telemetry backend.
if let Some(log_directory) = config.log_directory.as_deref().filter(|s| !s.is_empty())
&& logger_provider.is_none()
{
@@ -201,10 +213,7 @@ pub(super) fn init_observability_http(
.as_deref()
.unwrap_or(DEFAULT_LOG_ROTATION_TIME)
.to_lowercase();
let match_mode = match config.log_match_mode.as_deref().map(|s| s.to_lowercase()).as_deref() {
Some("prefix") => FileMatchMode::Prefix,
_ => FileMatchMode::Suffix,
};
let match_mode = FileMatchMode::from_config_str(config.log_match_mode.as_deref().unwrap_or(DEFAULT_OBS_LOG_MATCH_MODE));
let rotation = match rotation_str.as_str() {
"minutely" => Rotation::Minutely,
"hourly" => Rotation::Hourly,
@@ -234,7 +243,6 @@ pub(super) fn init_observability_http(
.with_span_events(span_events.clone())
.with_filter(build_env_filter(logger_level, None));
let cleanup_handle = spawn_cleanup_task(config, log_directory, log_filename, keep_files);
Ok((file_layer, guard, cleanup_handle, rotation_str))
})();
@@ -316,6 +324,8 @@ pub(super) fn init_observability_http(
/// Build an optional [`SdkTracerProvider`] for the given trace endpoint.
///
/// Returns `None` when the endpoint is empty or trace export is disabled.
/// When enabled, the provider is also registered as the global tracer provider
/// and installs the W3C trace-context propagator.
fn build_tracer_provider(
trace_ep: &str,
config: &OtelConfig,
@@ -351,6 +361,10 @@ fn build_tracer_provider(
Ok(Some(provider))
}
/// Convert a configured sample ratio into the SDK sampler strategy.
///
/// Invalid or non-finite ratios fall back to `AlwaysOn` so telemetry does not
/// disappear due to configuration mistakes.
fn build_tracer_sampler(sample_ratio: f64) -> Sampler {
if sample_ratio.is_finite() && (0.0..=1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
@@ -362,6 +376,8 @@ fn build_tracer_sampler(sample_ratio: f64) -> Sampler {
/// Build an optional [`SdkMeterProvider`] for the given metrics endpoint.
///
/// Returns `None` when the endpoint is empty or metric export is disabled.
/// The provider is paired with the crate's metrics recorder so `metrics` crate
/// instruments flow into OpenTelemetry readers.
fn build_meter_provider(
metric_ep: &str,
config: &OtelConfig,
@@ -401,13 +417,14 @@ fn build_meter_provider(
global::set_meter_provider(provider.clone());
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
set_observability_metric_enabled(true);
Ok(Some(provider))
}
/// Build an optional [`SdkLoggerProvider`] for the given log endpoint.
///
/// Returns `None` when the endpoint is empty or log export is disabled.
/// The caller wraps the resulting provider in an OpenTelemetry tracing bridge.
fn build_logger_provider(
log_ep: &str,
config: &OtelConfig,
@@ -434,8 +451,10 @@ fn build_logger_provider(
Ok(Some(builder.build()))
}
/// Starts the Pyroscope continuous profiling agent if `ENV_OBS_PROFILING_ENDPOINT` is set.
/// No-op and returns None on non-unix platforms.
/// Start the Pyroscope continuous profiling agent when profiling is enabled.
///
/// Returns `None` on non-Unix platforms, when the feature is disabled, or when
/// no usable profiling endpoint is configured.
#[cfg(unix)]
fn init_profiler(config: &OtelConfig) -> Option<pyroscope::PyroscopeAgent<pyroscope::pyroscope::PyroscopeAgentRunning>> {
use pyroscope::backend::{BackendConfig, PprofConfig, pprof_backend};
@@ -475,6 +494,9 @@ fn init_profiler(config: &OtelConfig) -> Option<pyroscope::PyroscopeAgent<pyrosc
}
/// Create a stdout periodic metrics reader for the given interval.
///
/// This helper is primarily used for local development and diagnostics when
/// operators want to see exported metric points without an OTLP collector.
fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout::MetricExporter> {
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
.with_interval(Duration::from_secs(interval))
@@ -486,6 +508,7 @@ mod tests {
use super::*;
#[test]
/// Valid ratios should produce trace-id-ratio sampling.
fn test_build_tracer_sampler_uses_trace_ratio_for_valid_values() {
let sampler = build_tracer_sampler(0.0);
assert!(format!("{sampler:?}").contains("TraceIdRatioBased"));
@@ -498,6 +521,7 @@ mod tests {
}
#[test]
/// Invalid ratios should degrade to the safest non-dropping sampler.
fn test_build_tracer_sampler_rejects_invalid_ratio_with_always_on() {
let sampler = build_tracer_sampler(-0.1);
assert!(format!("{sampler:?}").contains("AlwaysOn"));

View File

@@ -19,7 +19,12 @@
//! log files do not grow indefinitely by rotating them when they exceed a configured size.
use crate::cleaner::types::FileMatchMode;
use crate::global::{
METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES, METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS,
METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL, METRIC_LOG_CLEANER_ROTATION_TOTAL,
};
use jiff::Zoned;
use metrics::{counter, gauge, histogram};
use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
@@ -186,6 +191,7 @@ impl RollingAppender {
}
fn roll(&mut self) -> io::Result<()> {
let rotate_started = std::time::Instant::now();
// 1. Close current file first to ensure all buffers are flushed to OS (if any)
// and handle released.
if let Some(mut file) = self.file.take()
@@ -250,6 +256,9 @@ impl RollingAppender {
// This overrides whatever open_file() derived from mtime, ensuring
// we stick to the logical rotation time.
self.last_roll_ts = now.timestamp().as_second();
counter!(METRIC_LOG_CLEANER_ROTATION_TOTAL).increment(1);
histogram!(METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS).record(rotate_started.elapsed().as_secs_f64());
gauge!(METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES).set(self.size as f64);
return Ok(());
}
Err(e) => {
@@ -281,6 +290,8 @@ impl RollingAppender {
"RollingAppender: Failed to rotate log file after {} retries. Error: {:?}",
MAX_RETRIES, last_error
);
counter!(METRIC_LOG_CLEANER_ROTATION_FAILURES_TOTAL).increment(1);
histogram!(METRIC_LOG_CLEANER_ROTATION_DURATION_SECONDS).record(rotate_started.elapsed().as_secs_f64());
// Attempt to re-open existing active file to allow continued writing
self.open_file()?;
@@ -314,6 +325,7 @@ impl Write for RollingAppender {
if let Some(file) = &mut self.file {
let n = file.write(buf)?;
self.size += n as u64;
gauge!(METRIC_LOG_CLEANER_ACTIVE_FILE_SIZE_BYTES).set(self.size as f64);
Ok(n)
} else {
Err(io::Error::other("Failed to open log file"))

12
flake.lock generated
View File

@@ -2,11 +2,11 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1773507054,
"narHash": "sha256-Q8U5VXgrcxmCxPtCCJCIZkcAX3FCZwGh1GNVIXxMND0=",
"lastModified": 1773597492,
"narHash": "sha256-hQ284SkIeNaeyud+LS0WVLX+WL2rxcVZLFEaK0e03zg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e80236013dc8b77aa49ca90e7a12d86f5d8d64c9",
"rev": "a07d4ce6bee67d7c838a8a5796e75dff9caa21ef",
"type": "github"
},
"original": {
@@ -29,11 +29,11 @@
]
},
"locked": {
"lastModified": 1773544328,
"narHash": "sha256-Iv+qez54LAz+isij4APBk31VWA//Go81hwFOXr5iWTw=",
"lastModified": 1773630837,
"narHash": "sha256-zJhgAGnbVKeBMJOb9ctZm4BGS/Rnrz+5lfSXTVah4HQ=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "4f977d776793c8bfbfdd7eca7835847ccc48874e",
"rev": "f600ea449c7b5bb596fa1cf21c871cc5b9e31316",
"type": "github"
},
"original": {

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env bash
#
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -euo pipefail
DASHBOARD_PATH=".docker/observability/grafana/dashboards/rustfs.json"
if ! command -v jq >/dev/null 2>&1; then
echo "ERROR: jq is required by scripts/inspect_dashboard.sh" >&2
exit 1
fi
if [[ ! -f "$DASHBOARD_PATH" ]]; then
echo "ERROR: dashboard file not found: $DASHBOARD_PATH" >&2
exit 1
fi
jq -r '
def pystr:
if . == null then "None" else tostring end;
def pyrepr:
if . == null then "None"
elif type == "string" then
"\u0027"
+ (gsub("\\\\"; "\\\\\\\\")
| gsub("\u0027"; "\\\\\u0027")
| gsub("\n"; "\\\\n")
| gsub("\r"; "\\\\r")
| gsub("\t"; "\\\\t"))
+ "\u0027"
else tostring
end;
"Total panels: \(.panels | length)",
"Dashboard version: \(.version)",
"",
(
.panels[]
| [
(.id // 0),
(.type | pystr),
(.gridPos.y | pystr),
(.gridPos.h | pystr),
(.gridPos.w | pystr),
(.title | pyrepr)
]
| @tsv
)
' "$DASHBOARD_PATH" | {
IFS= read -r total_line
IFS= read -r version_line
IFS= read -r blank_line
printf '%s\n' "$total_line"
printf '%s\n' "$version_line"
printf '%s\n' "$blank_line"
while IFS=$'\t' read -r pid ptype y h w title; do
printf " id=%4d type=%-12s y=%-4s h=%-4s w=%-4s title=%s\n" \
"$pid" "$ptype" "$y" "$h" "$w" "$title"
done
}