Compare commits

...

70 Commits

Author SHA1 Message Date
安正超
ce1f7cfdcb chore(skills): add repository-local workflow skills (#2190) 2026-03-17 22:13:46 +08:00
马登山
c66c6d97ec fix(lifecycle): respect Filter.Prefix and safe delete marker expiry (#2185)
Signed-off-by: likewu <likewu@126.com>
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: likewu <likewu@126.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-17 18:45:38 +08:00
dependabot[bot]
be89b5fc6a build(deps): bump lz4_flex from 0.12.0 to 0.12.1 (#2181)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-17 10:10:41 +08:00
houseme
94cdb89e29 feat(obs): add init_obs_with_config API and signature guard test (#2175)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-03-16 18:17:55 +08:00
heihutu
06dff96c09 chore(deps): update flake.lock (#2173) 2026-03-16 16:01:36 +08:00
安正超
c1d5106acc feat(ci): allow selecting build platforms in build workflow (#2171) 2026-03-15 22:01:44 +08:00
heihutu
0a2411f59c chore(deps): update flake.lock (#2169)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-15 16:10:12 +08:00
houseme
1ede71b881 chore: update nix-flake-update.yml to use FLAKE_UPDATE_TOKEN for user… (#2168) 2026-03-15 14:49:38 +08:00
github-actions[bot]
4fb7059e6f chore(deps): update flake.lock (#2165) 2026-03-15 10:26:05 +08:00
安正超
2ad275ecc3 fix(helm): quote obs stdout configmap value (#2166)
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: 马登山 <Cxymds@qq.com>
2026-03-15 10:11:25 +08:00
Philip Schmid
9179fd5608 fix(helm): merge customAnnotations with class-specific ingress annotations (#2161)
Signed-off-by: Philip Schmid <philip.schmid@protonmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-15 09:22:12 +08:00
LeonWang0735
7f1cdaedad feat(replication): add bandwidth-aware reporting for bucket replication metrics (#2141) 2026-03-15 09:03:10 +08:00
houseme
7f3459f5a8 fix(obs): fixed unresolved import super::local::ensure_dir_permissions (#2164) 2026-03-15 00:33:06 +08:00
yxrxy
d3cff7d033 feat(webdav): add WebDAV protocol gateway (#2158)
Signed-off-by: yxrxy <1532529704@qq.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: 马登山 <Cxymds@qq.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2026-03-14 23:06:53 +08:00
majinghe
f66a90c1b2 fix: fix github action error caused by oltp modification (#2163)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-14 22:15:35 +08:00
majinghe
afcaaf66fc feat: add support for obs enpoint support in helm chart (#2160) 2026-03-14 21:44:44 +08:00
安正超
a1104b45f6 fix(obs): honor target-only rust_log directives (#2159)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-03-14 11:14:46 +08:00
bcdax110
82d9452736 docs: fix incorrect UID in Docker Quick Start of README_ZH (#2149)
Signed-off-by: bcdax110 <1711382287@qq.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-14 09:26:07 +08:00
houseme
6e0f034ad1 refactor(obs): enhance log rotation robustness and refine filter logic (#2155)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-03-14 09:20:35 +08:00
houseme
593a58c161 refactor(obs): optimize logging with custom RollingAppender and improved cleanup (#2151)
Signed-off-by: houseme <housemecn@gmail.com>
Signed-off-by: heihutu <30542132+heihutu@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-03-13 13:20:27 +08:00
houseme
f83bf95b04 feat(ecstore): Skip rustls provider install if already present (#2145)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
2026-03-12 18:02:19 +08:00
安正超
aa88b1976a fix(ecstore): avoid warm tier init panics (#2144) 2026-03-12 13:52:49 +08:00
安正超
e2f741d41f fix(helm): use canonical scanner start delay env (#2142) 2026-03-12 10:06:42 +08:00
安正超
ad54293d7e fix(admin): propagate heal handler background errors (#2124) 2026-03-12 10:06:12 +08:00
安正超
83fb530609 refactor(config): normalize scanner env naming (#2129) 2026-03-11 22:41:41 +08:00
安正超
aa84d34bf8 fix(auth): preserve IAMAuth clone and correct missing-key error (#2123) 2026-03-11 21:59:12 +08:00
安正超
df57f0c033 fix(workers): clamp worker release count (#2122) 2026-03-11 21:59:00 +08:00
安正超
c47dec8549 fix(signer): avoid panics in v2 signing for missing data (#2121) 2026-03-11 21:58:40 +08:00
安正超
fdbe12ec95 fix(scanner): respect configured scan start delay (#2119) 2026-03-11 21:56:48 +08:00
安正超
b2e8078971 fix(policy): avoid unicode panic in variable resolver (#2115) 2026-03-11 21:56:32 +08:00
安正超
ac43a44a00 [codex] fix scanner first cycle startup delay (#2137) 2026-03-11 20:02:01 +08:00
安正超
5625f04697 fix(common): remove panic paths in runtime helpers (#2116)
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
2026-03-11 18:12:37 +08:00
安正超
e1f24f764d fix(credentials): harden masked debug output (#2114)
Signed-off-by: heihutu <30542132+heihutu@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
2026-03-11 15:40:37 +08:00
安正超
7d7e0b2654 fix(utils): harden panic-prone paths (#2113)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
2026-03-11 15:16:03 +08:00
安正超
9908a44c38 fix(protocols): return errors instead of panics for sync signatures (#2120)
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
2026-03-11 11:22:20 +08:00
evan slack
4b480727d6 feat(perf): Add configurable bitrot skip for reads (#2110)
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2026-03-11 10:59:00 +08:00
simon-escapecode
f00d01ec2d fix: resolve silent failure in MQTT bucket event notifications (#2112)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-11 10:08:30 +08:00
dependabot[bot]
7e8c7fa2b2 build(deps): bump quinn-proto from 0.11.13 to 0.11.14 (#2127)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-11 09:30:53 +08:00
安正超
845ad1fa16 fix(obs): avoid panic in telemetry init and clamp sampler boundaries (#2118)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-11 01:32:46 +08:00
安正超
bb4fbf5ae2 fix(notify): ignore disabled targets when sending events (#2117)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-11 00:37:30 +08:00
安正超
3df7105dae fix(server): init event notifier when partial notify configured (#2125) 2026-03-10 23:52:40 +08:00
evan slack
b3da8ae269 feat(scanner): Add dynamic throttling presets (#2095)
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: GatewayJ <835269233@qq.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
Co-authored-by: weisd <im@weisd.in>
2026-03-10 16:12:56 +08:00
majinghe
67e5f5e3c3 feat: add metrics support in helm chart (#2109)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-10 12:08:36 +08:00
majinghe
296efea42f change ghcr username and password name due to github restrict (#2108) 2026-03-09 21:44:35 +08:00
GatewayJ
16946c5a54 fix: allow root to bypass bucket policy deny for policy management APIs (#2102)
Co-authored-by: GatewayJ <8352692332qq.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2026-03-09 20:36:29 +08:00
majinghe
73d29e95dd feat:add docker image support for quay.io and ghcr.io (#2107)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-09 16:22:28 +08:00
dependabot[bot]
e930c5c281 build(deps): bump libc from 0.2.182 to 0.2.183 in the dependencies group (#2106)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-09 16:03:17 +08:00
安正超
9d03029959 fix(iam): sync user cache on load-user notifications (#2104) 2026-03-09 09:36:02 +08:00
loverustfs
a02c354ef5 Fix image url error
Fix image url error

Signed-off-by: loverustfs <hello@rustfs.com>
2026-03-08 23:39:50 +08:00
houseme
60aa47bf61 feat(storage): integrate S3Operation into OperationHelper for unified metrics and audit (#2103) 2026-03-08 17:57:33 +08:00
houseme
8e4a1ef917 refactor(protocols): replace tar with astral-tokio-tar for async processing (#2099)
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
2026-03-08 15:18:15 +08:00
Peter Hamilton
b035d10abb fix(metrics): Remove high cardinality labels causing memory leak (#2098)
Co-authored-by: loverustfs <hello@rustfs.com>
2026-03-08 13:01:11 +08:00
github-actions[bot]
2180e9e7a1 chore(deps): update flake.lock (#2097)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-08 10:51:32 +08:00
evan slack
57e49e6737 feat(obs): Add metric to count all s3 operations (#2088)
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-08 10:19:20 +08:00
Senol Colak
b07383760f Add OpenStack Swift API Support (#2066)
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: Copilot <noreply@github.com>
2026-03-08 01:11:35 +08:00
evan slack
7c94be4e8c fix(obs): Remove high cardinality label on rustfs_api_requests_total (#2087)
Co-authored-by: loverustfs <hello@rustfs.com>
2026-03-07 20:46:33 +08:00
evan slack
d52a10c5fb chore(obs): Improve tracing instrumentation (#2086)
Co-authored-by: loverustfs <hello@rustfs.com>
2026-03-07 20:03:20 +08:00
安正超
8c4735ff88 docs: scope AGENTS instructions by directory (#2083) 2026-03-05 17:25:37 +08:00
LeonWang0735
a0503168d4 fix(heal):heal failed replication via must_replicate instead of check replicate_delete (#2072) 2026-03-05 15:47:36 +08:00
安正超
b73059dcf2 fix(admin): allow non-consoleAdmin self password update (#2082) 2026-03-05 15:47:21 +08:00
weisd
ed18b3da75 Fix data usage cache and scanner (#2074) 2026-03-04 19:55:01 +08:00
houseme
05032cf887 chore: update dependencies and workspace resolver (#2073) 2026-03-04 19:22:54 +08:00
唐小鸭
f89cdfe5b3 update s3s 0.14.0-dev (#2070)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-04 01:07:24 +08:00
houseme
f4b523c236 build(deps): bump the dependencies group with 7 updates (#2069) 2026-03-04 00:42:03 +08:00
安正超
c6209ba59d ci: optimize workflow runtime and remove redundant pipeline work (#2065) 2026-03-03 20:56:37 +08:00
houseme
5e7495a042 build(obs): restrict pyroscope dependency to unix targets (#2064) 2026-03-03 20:41:37 +08:00
evan slack
ac4b13def1 feat(obs): Optional continuous CPU profiling with grafana pyroscope (#2035)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
2026-03-03 15:28:58 +08:00
安正超
08e1f4670b fix: restore default CORS fallback and STS object ACL ownership (#2053)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-03 01:08:50 +08:00
唐小鸭
fff96a0921 fix sse-options (#2056) 2026-03-03 01:08:37 +08:00
唐小鸭
f17725a2ea fix(sse): allow PUT/GET without KMS when no SSE or bucket default (#2054)
Co-authored-by: houseme <housemecn@gmail.com>
2026-03-03 00:44:23 +08:00
236 changed files with 29796 additions and 3376 deletions

View File

@@ -0,0 +1,78 @@
---
name: code-change-verification
description: Verify code changes by identifying correctness, regression, security, and performance risks from diffs or patches, then produce prioritized findings with file/line evidence and concrete fixes. Use when reviewing commits, PRs, and merged patches before/after release.
---
# Code Change Verification
Use this skill to review code changes consistently before merge, before release, and during incident follow-up.
## Quick Start
1. Read the scope: commit, PR, patch, or file list.
2. Map each changed area by risk and user impact.
3. Inspect each risky change in context.
4. Report findings first, ordered by severity.
5. Close with residual risks and verification recommendations.
## Core Workflow
### 1) Scope and assumptions
- Confirm change source (diff, commit, PR, files), target branch, language/runtime, and version.
- If context is missing, state assumptions before deeper analysis.
- Focus only on requested scope; avoid reviewing unrelated files.
### 2) Risk map
- Prioritize in this order:
- Data correctness and user-visible behavior
- API/contract compatibility
- Security and authz/authn boundaries
- Concurrency and lifecycle correctness
- Performance and resource usage
- Give higher priority to stateful paths, migration logic, defaults, and error handling.
### 3) Evidence-based inspection
- Read each modified hunk with neighboring context.
- Trace call paths and call-site expectations.
- Check for:
- invariant breaks and missing guards
- unchecked assumptions and null/empty/error-path handling
- stale tests, fixtures, and configs
- hidden coupling to shared helpers/constants/features
- If a point is uncertain, mark it as an open question instead of guessing.
### 4) Findings-first output
- Order findings by severity:
- P0: critical failure, security breach, or data loss risk
- P1: high-impact regression
- P2: medium risk correctness gap
- P3: low risk/quality debt
- For each finding include:
- Severity
- `path:line` reference
- concise issue statement
- impact and likely failure mode
- specific fix or mitigation
- validation step to confirm
- If no issues exist, explicitly state `No findings` and why.
### 5) Close
- Report assumptions and unknowns.
- Suggest targeted checks (tests, canary checks, logs/metrics, migration validation).
## Output Template
1. Findings
2. No findings (if applicable)
3. Assumptions / Unknowns
4. Recommended verification steps
## Finding Template
- `[P1] Missing timeout for downstream call`
- Location: `path/to/file.rs:123`
- Issue: ...
- Impact: ...
- Fix suggestion: ...
- Validation: ...

View File

@@ -0,0 +1,4 @@
interface:
display_name: "Code Change Verification"
short_description: "Prioritize risks and verify code changes before merge."
default_prompt: "Inspect a patch or diff, identify correctness/security/regression risks, and return prioritized findings with file/line evidence and fixes."

View File

@@ -0,0 +1,88 @@
---
name: pr-creation-checker
description: Prepare PR-ready diffs by validating scope, checking required verification steps, drafting a compliant English PR title/body, and surfacing blockers before opening or updating a pull request in RustFS.
---
# PR Creation Checker
Use this skill before `gh pr create`, before `gh pr edit`, or when reviewing whether a branch is ready for PR.
## Read sources of truth first
- Read `AGENTS.md`.
- Read `.github/pull_request_template.md`.
- Use `Makefile` and `.config/make/` for local quality commands.
- Use `.github/workflows/ci.yml` for CI expectations.
- Do not restate long command matrices or template sections from memory when the files exist.
## Workflow
1. Collect PR context
- Confirm base branch, current branch, change goal, and scope.
- Confirm whether the task is: draft a new PR, update an existing PR, or preflight-check readiness.
- Confirm whether the branch includes only intended changes.
2. Inspect change scope
- Review the diff and summarize what changed.
- Call out unrelated edits, generated artifacts, logs, or secrets as blockers.
- Mark risky areas explicitly: auth, storage, config, network, migrations, breaking changes.
3. Verify readiness requirements
- Require `make pre-commit` before marking the PR ready.
- If `make` is unavailable, use the equivalent commands from `.config/make/`.
- Add scope-specific verification commands when the changed area needs more than the baseline.
- If required checks fail, stop and return `BLOCKED`.
4. Draft PR metadata
- Write the PR title in English using Conventional Commits and keep it within 72 characters.
- If a generic PR workflow suggests a different title format, ignore it and follow the repository rule instead.
- In RustFS, do not use tool-specific prefixes such as `[codex]` when the repository requires Conventional Commits.
- Keep the PR body in English.
- Use the exact section headings from `.github/pull_request_template.md`.
- Fill non-applicable sections with `N/A`.
- Include verification commands in the PR description.
- Do not include local filesystem paths in the PR body unless the user explicitly asks for them.
- Prefer repo-relative paths, command names, and concise summaries over machine-specific paths such as `/Users/...`.
5. Prepare reviewer context
- Summarize why the change exists.
- Summarize what was verified.
- Call out risks, rollout notes, config impact, and rollback notes when applicable.
- Mention assumptions or missing context instead of guessing.
6. Prepare CLI-safe output
- When proposing `gh pr create` or `gh pr edit`, use `--body-file`, never inline `--body` for multiline markdown.
- Return a ready-to-save PR body plus a short title.
- If not ready, return blockers first and list the minimum steps needed to unblock.
## Output format
### Status
- `READY` or `BLOCKED`
### Title
- `<type>(<scope>): <summary>`
### PR Body
- Reproduce the repository template headings exactly.
- Fill every section.
- Omit local absolute paths unless explicitly required.
### Verification
- List each command run.
- State pass/fail.
### Risks
- List breaking changes, config changes, migration impact, or `N/A`.
## Blocker rules
- Return `BLOCKED` if `make pre-commit` has not passed.
- Return `BLOCKED` if the diff contains unrelated changes that are not acknowledged.
- Return `BLOCKED` if required template sections are missing.
- Return `BLOCKED` if the title/body is not in English.
- Return `BLOCKED` if the title does not follow the repository's Conventional Commit rule.
## Reference
- Use [pr-readiness-checklist.md](references/pr-readiness-checklist.md) for a short final pass before opening or editing the PR.

View File

@@ -0,0 +1,4 @@
interface:
display_name: "PR Creation Checker"
short_description: "Draft RustFS-ready PRs with checks, template, and blockers."
default_prompt: "Inspect a branch or diff, verify required PR checks, and produce a compliant English PR title/body plus blockers or readiness status."

View File

@@ -0,0 +1,14 @@
# PR Readiness Checklist
- Confirm the branch is based on current `main`.
- Confirm the diff matches the stated scope.
- Confirm no secrets, logs, temp files, or unrelated refactors are included.
- Confirm `make pre-commit` passed, or document why it could not run.
- Confirm extra verification commands are listed for risky changes.
- Confirm the PR title uses Conventional Commits and stays within 72 characters.
- Confirm the PR title does not use tool-specific prefixes such as `[codex]`.
- Confirm the PR body is in English.
- Confirm the PR body keeps the exact headings from `.github/pull_request_template.md`.
- Confirm non-applicable sections are filled with `N/A`.
- Confirm the PR body does not include local absolute paths unless explicitly required.
- Confirm multiline GitHub CLI commands use `--body-file`.

View File

@@ -0,0 +1,66 @@
---
name: test-coverage-improver
description: Run project coverage checks, rank high-risk gaps, and propose high-impact tests to improve regression confidence for changed and critical code paths before release.
---
# Test Coverage Improver
Use this skill when you need a prioritized, risk-aware plan to improve tests from coverage results.
## Usage assumptions
- Focus scope is either changed lines/files, a module, or the whole repository.
- Coverage artifact must be generated or provided in a supported format.
- If required context is missing, call out assumptions explicitly before proposing work.
## Workflow
1. Define scope and baseline
- Confirm target language, framework, and branch.
- Confirm whether the scope is changed files only or full-repo.
2. Produce coverage snapshot
- Rust: `cargo llvm-cov` (or `cargo tarpaulin`) with existing repo config.
- JavaScript/TypeScript: `npm test -- --coverage` and read `coverage/coverage-final.json`.
- Python: `pytest --cov=<pkg> --cov-report=json` and read `coverage.json`.
- Collect total, per-file, and changed-line coverage.
3. Rank highest-risk gaps
- Prioritize changed code, branch coverage gaps, and low-confidence boundaries.
- Apply the risk rubric in [coverage-prioritization.md](references/coverage-prioritization.md).
- Keep shortlist to 58 gaps.
- For each gap, capture: file, lines, uncovered branches, and estimated risk score.
4. Propose high-impact tests
- For each shortlisted gap, output:
- Intent and expected behavior.
- Normal, edge, and failure scenarios.
- Assertions and side effects to verify.
- Setup needs (fixtures, mocks, integration dependencies).
- Estimated effort (`S/M/L`).
5. Close with validation plan
- State which gaps remain after proposals.
- Provide concrete verification command and acceptance threshold.
- List assumptions or blockers (environment, fixtures, flaky dependencies).
## Output template
### Coverage Snapshot
- total / branch coverage
- changed-file coverage
- top missing regions by size
### Top Gaps (ranked)
- `path:line-range` | risk score | why critical
### Test Proposals
- `path:line-range`
- Test name
- scenarios
- assertions
- effort
### Validation Plan
- command
- pass criteria
- remaining risk

View File

@@ -0,0 +1,4 @@
interface:
display_name: "Test Coverage Improver"
short_description: "Find top uncovered risk areas and propose high-impact tests."
default_prompt: "Run coverage checks, identify largest gaps, and recommend highest-impact test cases to improve risk coverage."

View File

@@ -0,0 +1,25 @@
# Coverage Gap Prioritization Guide
Use this rubric for each uncovered area.
Score = (Criticality × 2) + CoverageDebt + (Volatility × 0.5)
- Criticality:
- 5: authz/authn, data-loss, payment/consistency path
- 4: state mutation, cache invalidation, scheduling
- 3: error handling + fallbacks in user-visible flows
- 2: parsing/format conversion paths
- 1: logging-only or low-impact utilities
- CoverageDebt:
- 0: 05 uncovered lines
- 1: 620 uncovered lines
- 2: 2140 uncovered lines
- 3: 41+ uncovered lines
- Volatility:
- 1: stable legacy code with few recent edits
- 2: changed in last 2 releases
- 3: touched in last 30 days or currently in active PR
Sort by score descending, then by business impact.

View File

@@ -104,7 +104,7 @@ services:
environment:
- TZ=Asia/Shanghai
volumes:
- ../../.docker/observability/loki-config.yaml:/etc/loki/local-config.yaml:ro
- ../../.docker/observability/loki.yaml:/etc/loki/local-config.yaml:ro
- loki-data:/loki
ports:
- "3100:3100"

View File

@@ -0,0 +1,270 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
services:
rustfs:
security_opt:
- "no-new-privileges:true"
image: rustfs/rustfs:latest
container_name: rustfs-server
ports:
- "9000:9000" # S3 API port
- "9001:9001" # Console port
environment:
- RUSTFS_VOLUMES=/data/rustfs
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CORS_ALLOWED_ORIGINS=*
- RUSTFS_CONSOLE_CORS_ALLOWED_ORIGINS=*
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_OBS_LOGGER_LEVEL=info
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4318
- RUSTFS_OBS_PROFILING_ENDPOINT=http://pyroscope:4040
volumes:
- rustfs-data:/data/rustfs
networks:
- otel-network
restart: unless-stopped
healthcheck:
test:
[
"CMD",
"sh",
"-c",
"curl -f http://127.0.0.1:9000/health && curl -f http://127.0.0.1:9001/rustfs/console/health",
]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
depends_on:
otel-collector:
condition: service_started
rustfs-init:
image: alpine
container_name: rustfs-init
volumes:
- rustfs-data:/data
networks:
- otel-network
command: >
sh -c "
chown -R 10001:10001 /data &&
echo 'Volume Permissions fixed' &&
exit 0
"
restart: no
# --- Tracing ---
tempo:
image: grafana/tempo:latest
container_name: tempo
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./tempo.yaml:/etc/tempo.yaml:ro
- tempo-data:/var/tempo
ports:
- "3200:3200" # tempo
- "4317" # otlp grpc
- "4318" # otlp http
networks:
- otel-network
restart: unless-stopped
depends_on:
- redpanda
healthcheck:
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:3200/ready" ]
interval: 10s
timeout: 5s
retries: 3
start_period: 15s
redpanda:
image: redpandadata/redpanda:latest # for tempo ingest
container_name: redpanda
ports:
- "9092:9092"
networks:
- otel-network
restart: unless-stopped
command: >
redpanda start --overprovisioned
--mode=dev-container
--kafka-addr=PLAINTEXT://0.0.0.0:9092
--advertise-kafka-addr=PLAINTEXT://redpanda:9092
jaeger:
image: jaegertracing/jaeger:latest
container_name: jaeger
environment:
- SPAN_STORAGE_TYPE=badger
- BADGER_EPHEMERAL=false
- BADGER_DIRECTORY_VALUE=/badger/data
- BADGER_DIRECTORY_KEY=/badger/key
- COLLECTOR_OTLP_ENABLED=true
volumes:
- ./jaeger.yaml:/etc/jaeger/config.yml
- jaeger-data:/badger
ports:
- "16686:16686" # Web UI
- "14269:14269" # Admin/Metrics
- "4317" # otlp grpc
- "4318" # otlp http
command: [ "--config", "/etc/jaeger/config.yml" ]
networks:
- otel-network
restart: unless-stopped
healthcheck:
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:14269" ]
interval: 10s
timeout: 5s
retries: 3
start_period: 15s
# --- Metrics ---
prometheus:
image: prom/prometheus:latest
container_name: prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
ports:
- "9090:9090"
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--web.enable-otlp-receiver" # Enable OTLP
- "--web.enable-remote-write-receiver" # Enable remote write
- "--enable-feature=promql-experimental-functions" # Enable info()
- "--storage.tsdb.retention.time=30d"
restart: unless-stopped
networks:
- otel-network
healthcheck:
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:9090/-/healthy" ]
interval: 10s
timeout: 5s
retries: 3
# --- Logging ---
loki:
image: grafana/loki:latest
container_name: loki
volumes:
- ./loki.yaml:/etc/loki/loki.yaml:ro
- loki-data:/loki
ports:
- "3100:3100"
command: -config.file=/etc/loki/loki.yaml
networks:
- otel-network
restart: unless-stopped
healthcheck:
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:3100/ready" ]
interval: 15s
timeout: 10s
retries: 5
start_period: 60s
# --- Collection ---
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro
ports:
- "1888:1888" # pprof
- "8888:8888" # Prometheus metrics for Collector
- "8889:8889" # Prometheus metrics for application indicators
- "13133:13133" # health check
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "55679:55679" # zpages
networks:
- otel-network
restart: unless-stopped
depends_on:
- tempo
- jaeger
- prometheus
- loki
healthcheck:
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:13133" ]
interval: 10s
timeout: 5s
retries: 3
# --- Profiles ---
pyroscope:
image: grafana/pyroscope:latest
container_name: pyroscope
ports:
- "4040:4040"
command:
- -self-profiling.disable-push=true
networks:
- otel-network
restart: unless-stopped
# --- Visualization ---
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_SECURITY_ADMIN_USER=admin
volumes:
- ./grafana/provisioning:/etc/grafana/provisioning:ro
- ./grafana/dashboards:/etc/grafana/dashboards:ro
- grafana-data:/var/lib/grafana
networks:
- otel-network
restart: unless-stopped
depends_on:
- prometheus
- tempo
- loki
healthcheck:
test:
[ "CMD", "wget", "--spider", "-q", "http://localhost:3000/api/health" ]
interval: 10s
timeout: 5s
retries: 3
volumes:
rustfs-data:
tempo-data:
jaeger-data:
prometheus-data:
loki-data:
grafana-data:
networks:
otel-network:
driver: bridge
name: "network_otel"
ipam:
config:
- subnet: 172.28.0.0/16
driver_opts:
com.docker.network.enable_ipv6: "true"

View File

@@ -0,0 +1,62 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Docker Compose override file for High Availability Tempo setup
#
# Usage:
# docker-compose -f docker-compose-example-for-rustfs.yml \
# -f docker-compose-tempo-ha-override.yml up
services:
# Override Tempo to use high-availability configuration
tempo:
volumes:
- ./tempo-ha.yaml:/etc/tempo.yaml:ro
- tempo-data:/var/tempo
ports:
- "3200:3200" # Tempo HTTP
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "7946:7946" # Memberlist
- "14250:14250" # Jaeger gRPC
- "14268:14268" # Jaeger Thrift HTTP
- "9411:9411" # Zipkin
environment:
- TEMPO_MEMBERLIST_BIND_PORT=7946
healthcheck:
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:3200/ready" ]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
depends_on:
- redpanda
volumes:
tempo-data:
driver: local
driver_opts:
type: tmpfs
device: tmpfs
o: "size=4g" # Allocate 4GB tmpfs for Tempo data (adjust based on your needs)
# Network configuration remains the same
# networks:
# otel-network:
# driver: bridge
# name: "network_otel"
# ipam:
# config:
# - subnet: 172.28.0.0/16

View File

@@ -13,100 +13,31 @@
# limitations under the License.
services:
rustfs:
security_opt:
- "no-new-privileges:true"
image: rustfs/rustfs:latest
container_name: rustfs-server
ports:
- "9000:9000" # S3 API port
- "9001:9001" # Console port
environment:
- RUSTFS_VOLUMES=/data/rustfs
- RUSTFS_ADDRESS=0.0.0.0:9000
- RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001
- RUSTFS_CONSOLE_ENABLE=true
- RUSTFS_CORS_ALLOWED_ORIGINS=*
- RUSTFS_CONSOLE_CORS_ALLOWED_ORIGINS=*
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_OBS_LOGGER_LEVEL=info
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4318
volumes:
- rustfs-data:/data/rustfs
networks:
- otel-network
restart: unless-stopped
healthcheck:
test:
[
"CMD",
"sh",
"-c",
"curl -f http://127.0.0.1:9000/health && curl -f http://127.0.0.1:9001/rustfs/console/health",
]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
depends_on:
otel-collector:
condition: service_started
rustfs-init:
image: alpine
container_name: rustfs-init
volumes:
- rustfs-data:/data
networks:
- otel-network
command: >
sh -c "
chown -R 10001:10001 /data &&
echo 'Volume Permissions fixed' &&
exit 0
"
restart: no
# --- Tracing ---
tempo:
image: grafana/tempo:latest
container_name: tempo
command: ["-config.file=/etc/tempo.yaml"]
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ./tempo.yaml:/etc/tempo.yaml:ro
- tempo-data:/var/tempo
ports:
- "3200:3200" # tempo
- "4317" # otlp grpc
- "4318" # otlp http
- "4317" # otlp grpc
- "4318" # otlp http
- "7946" # memberlist
networks:
- otel-network
restart: unless-stopped
depends_on:
- redpanda
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:3200/ready"]
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:3200/ready" ]
interval: 10s
timeout: 5s
retries: 3
start_period: 15s
redpanda:
image: redpandadata/redpanda:latest # for tempo ingest
container_name: redpanda
ports:
- "9092:9092"
networks:
- otel-network
restart: unless-stopped
command: >
redpanda start --overprovisioned
--mode=dev-container
--kafka-addr=PLAINTEXT://0.0.0.0:9092
--advertise-kafka-addr=PLAINTEXT://redpanda:9092
jaeger:
image: jaegertracing/jaeger:latest
container_name: jaeger
@@ -124,12 +55,12 @@ services:
- "14269:14269" # Admin/Metrics
- "4317" # otlp grpc
- "4318" # otlp http
command: ["--config", "/etc/jaeger/config.yml"]
command: [ "--config", "/etc/jaeger/config.yml" ]
networks:
- otel-network
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:14269"]
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:14269" ]
interval: 10s
timeout: 5s
retries: 3
@@ -155,7 +86,7 @@ services:
networks:
- otel-network
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:9090/-/healthy"]
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:9090/-/healthy" ]
interval: 10s
timeout: 5s
retries: 3
@@ -175,7 +106,7 @@ services:
- otel-network
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:3100/ready"]
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:3100/ready" ]
interval: 15s
timeout: 10s
retries: 5
@@ -188,12 +119,12 @@ services:
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro
ports:
- "1888:1888" # pprof
- "8888:8888" # Prometheus metrics for Collector
- "8889:8889" # Prometheus metrics for application indicators
- "1888:1888" # pprof
- "8888:8888" # Prometheus metrics for Collector
- "8889:8889" # Prometheus metrics for application indicators
- "13133:13133" # health check
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "55679:55679" # zpages
networks:
- otel-network
@@ -204,11 +135,24 @@ services:
- prometheus
- loki
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:13133"]
test: [ "CMD", "wget", "--spider", "-q", "http://localhost:13133" ]
interval: 10s
timeout: 5s
retries: 3
# --- Profiles ---
pyroscope:
image: grafana/pyroscope:latest
container_name: pyroscope
ports:
- "4040:4040"
command:
- -self-profiling.disable-push=true
networks:
- otel-network
restart: unless-stopped
# --- Visualization ---
grafana:
@@ -231,13 +175,13 @@ services:
- tempo
- loki
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:3000/api/health"]
test:
[ "CMD", "wget", "--spider", "-q", "http://localhost:3000/api/health" ]
interval: 10s
timeout: 5s
retries: 3
volumes:
rustfs-data:
tempo-data:
jaeger-data:
prometheus-data:

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,17 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: 1
providers:

View File

@@ -1,3 +1,17 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: 1
datasources:
@@ -76,3 +90,9 @@ datasources:
spanEndTimeShift: '-1s'
filterByTraceID: true
filterBySpanID: false
- name: Pyroscope
type: grafana-pyroscope-datasource
url: http://pyroscope:4040
jsonData:
minStep: '15s'

View File

@@ -0,0 +1,286 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# High Availability Tempo Configuration for docker-compose-example-for-rustfs.yml
# Features:
# - Distributed architecture with multiple components
# - Kafka-based ingestion for fault tolerance
# - Replication factor of 3 for data resilience
# - Query frontend for load balancing
# - Metrics generation from traces
# - WAL for durability
partition_ring_live_store: true
stream_over_http_enabled: true
server:
http_listen_port: 3200
http_server_read_timeout: 30s
http_server_write_timeout: 30s
grpc_server_max_recv_msg_size: 4194304 # 4MB
grpc_server_max_send_msg_size: 4194304
log_level: info
log_format: json
# Memberlist configuration for distributed mode
memberlist:
node_name: tempo
bind_port: 7946
join_members:
- tempo:7946
retransmit_factor: 4
node_timeout: 15s
retransmit_interval: 300ms
dead_node_reclaim_time: 30s
# Distributor configuration - receives traces and routes to ingesters
distributor:
ingester_write_path_enabled: true
kafka_write_path_enabled: true
rate_limit_bytes: 10MB
rate_limit_enabled: true
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
max_concurrent_streams: 0
max_receive_message_size: 4194304
http:
endpoint: "0.0.0.0:4318"
cors:
allowed_origins:
- "*"
max_age: 86400
jaeger:
protocols:
grpc:
endpoint: "0.0.0.0:14250"
thrift_http:
endpoint: "0.0.0.0:14268"
zipkin:
endpoint: "0.0.0.0:9411"
ring:
kvstore:
store: memberlist
heartbeat_timeout: 5s
replication_factor: 3
heartbeat_interval: 5s
# Ingester configuration - stores traces and querying
ingester:
lifecycler:
address: tempo
ring:
kvstore:
store: memberlist
replication_factor: 3
max_cache_freshness_per_sec: 10s
heartbeat_interval: 5s
heartbeat_timeout: 5s
num_tokens: 128
tokens_file_path: /var/tempo/tokens.json
claim_on_rollout: true
trace_idle_period: 20s
max_block_bytes: 10_000_000
max_block_duration: 10m
chunk_size_bytes: 1_000_000
chunk_encoding: snappy
wal:
checkpoint_duration: 5s
max_wal_blocks: 4
metrics:
enabled: true
level: block
target_info_duration: 15m
# WAL configuration for data durability
wal:
checkpoint_duration: 5s
flush_on_shutdown: true
path: /var/tempo/wal
# Kafka ingestion configuration - for high throughput scenarios
ingest:
enabled: true
kafka:
brokers: [ redpanda:9092 ]
topic: tempo-ingest
encoding: protobuf
consumer_group: tempo-ingest-consumer
session_timeout: 10s
rebalance_timeout: 1m
partition: auto
verbosity: 2
# Query frontend configuration - distributed querying
query_frontend:
compression: gzip
downstream_url: http://localhost:3200
log_queries_longer_than: 5s
cache_uncompressed_bytes: 100MB
max_outstanding_requests_per_tenant: 100
max_query_length: 48h
max_query_lookback: 30d
default_result_cache_ttl: 1m
result_cache:
cache:
enable_fifocache: true
default_validity: 1m
rf1_after: "1999-01-01T00:00:00Z"
mcp_server:
enabled: true
# Querier configuration - queries traces
querier:
frontend_worker:
frontend_address: localhost:3200
grpc_client_config:
max_recv_msg_size: 104857600
max_concurrent_queries: 20
max_metric_bytes_per_trace: 1MB
# Query scheduler configuration - for distributed querying
query_scheduler:
use_scheduler_ring: false
# Metrics generator configuration - generates metrics from traces
metrics_generator:
enabled: true
registry:
enabled: true
external_labels:
source: tempo
cluster: rustfs-docker-ha
environment: production
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
resource_to_telemetry_conversion:
enabled: true
processor:
batch:
timeout: 10s
send_batch_size: 1024
memory_limiter:
check_interval: 5s
limit_mib: 512
spike_limit_mib: 128
processors:
- span-metrics
- local-blocks
- service-graphs
generate_native_histograms: both
# Backend worker configuration
backend_worker:
backend_scheduler_addr: localhost:3200
compaction:
block_retention: 24h
compacted_block_retention: 1h
ring:
kvstore:
store: memberlist
# Backend scheduler configuration
backend_scheduler:
enabled: true
provider:
compaction:
compaction:
block_retention: 24h
compacted_block_retention: 1h
concurrency: 25
v2_out_path: /var/tempo/blocks/compaction
# Storage configuration - local backend with proper retention
storage:
trace:
backend: local
wal:
path: /var/tempo/wal
checkpoint_duration: 5s
flush_on_shutdown: true
local:
path: /var/tempo/blocks
bloom_filter_false_positive: 0.05
bloom_shift: 4
index:
downsample_bytes: 1000000
page_size_bytes: 0
cache_size_bytes: 0
pool:
max_workers: 400
queue_depth: 10000
# Compactor configuration - manages block compaction
compactor:
compaction:
block_retention: 168h # 7 days
compacted_block_retention: 1h
concurrency: 25
v2_out_path: /var/tempo/blocks/compaction
shard_count: 32
max_block_bytes: 107374182400 # 100GB
max_compaction_objects: 6000000
max_time_per_tenant: 5m
block_size_bytes: 107374182400
ring:
kvstore:
store: memberlist
heartbeat_interval: 5s
heartbeat_timeout: 5s
# Limits configuration - rate limiting and quotas
limits:
max_traces_per_user: 10000
max_bytes_per_trace: 10485760 # 10MB
max_search_bytes_per_trace: 0
forgiving_oversize_traces: true
rate_limit_bytes: 10MB
rate_limit_enabled: true
ingestion_burst_size_bytes: 20MB
ingestion_rate_limit_bytes: 10MB
max_bytes_per_second: 10485760
metrics_generator_max_active_series: 10000
metrics_generator_max_churned_series: 10000
metrics_generator_forta_out_of_order_ttl: 5m
# Override configuration
overrides:
defaults:
metrics_generator:
processors:
- span-metrics
- local-blocks
- service-graphs
generate_native_histograms: both
max_active_series: 10000
max_churned_series: 10000
# Usage reporting configuration
usage_report:
reporting_enabled: false
# Tracing configuration for debugging
tracing:
enabled: true
jaeger:
sampler:
name: probabilistic
param: 0.1
reporter_log_spans: false

View File

@@ -19,9 +19,16 @@ server:
http_listen_port: 3200
log_level: info
memberlist:
node_name: tempo
bind_port: 7946
join_members:
- tempo:7946
# Distributor configuration - receives traces and writes directly to ingesters
distributor:
ingester_write_path_enabled: false
kafka_write_path_enabled: true
ingester_write_path_enabled: true
kafka_write_path_enabled: false
receivers:
otlp:
protocols:
@@ -29,10 +36,21 @@ distributor:
endpoint: "tempo:4317"
http:
endpoint: "tempo:4318"
#log_received_spans:
# enabled: true
# log_discarded_spans:
# enabled: true
ring:
kvstore:
store: memberlist
# Ingester configuration - consumes from Kafka and stores traces
ingester:
lifecycler:
ring:
kvstore:
store: memberlist
replication_factor: 1
tokens_file_path: /var/tempo/tokens.json
trace_idle_period: 10s
max_block_bytes: 1_000_000
max_block_duration: 5m
backend_scheduler:
provider:
@@ -49,7 +67,8 @@ backend_worker:
store: memberlist
querier:
query_live_store: true
frontend_worker:
frontend_address: tempo:3200
metrics_generator:
registry:
@@ -78,17 +97,28 @@ storage:
overrides:
defaults:
metrics_generator:
processors: ["span-metrics", "service-graphs", "local-blocks"]
processors: [ "span-metrics", "service-graphs", "local-blocks" ]
generate_native_histograms: both
ingest:
enabled: true
kafka:
address: redpanda:9092
topic: tempo-ingest
enabled: false
# Disabled because using direct ingester write path
# If you want Kafka path, enable this and set:
# kafka:
# brokers: [redpanda:9092]
# topic: tempo-ingest
# encoding: protobuf
# consumer_group: tempo-ingest-consumer
block_builder:
consume_cycle_duration: 30s
compactor:
compaction:
block_retention: 168h # 7 days
ring:
kvstore:
store: memberlist
usage_report:
reporting_enabled: false

30
.github/AGENTS.md vendored Normal file
View File

@@ -0,0 +1,30 @@
# GitHub Workflow Instructions
Applies to `.github/` and repository pull-request operations.
## Pull Requests
- PR titles and descriptions must be in English.
- Use `.github/pull_request_template.md` for every PR body.
- Keep all template section headings.
- Use `N/A` for non-applicable sections.
- Include verification commands in the PR details.
- For `gh pr create` and `gh pr edit`, always write markdown body to a file and pass `--body-file`.
- Do not use multiline inline `--body`; backticks and shell expansion can corrupt content or trigger unintended commands.
- Recommended pattern:
- `cat > /tmp/pr_body.md <<'EOF'`
- `...markdown...`
- `EOF`
- `gh pr create ... --body-file /tmp/pr_body.md`
## CI Alignment
When changing CI-sensitive behavior, keep local validation aligned with `.github/workflows/ci.yml`.
Current `test-and-lint` gate includes:
- `cargo nextest run --all --exclude e2e_test`
- `cargo test --all --doc`
- `cargo fmt --all --check`
- `cargo clippy --all-targets --all-features -- -D warnings`
- `./scripts/check_layer_dependencies.sh`

View File

@@ -23,6 +23,7 @@
#
# Manual Parameters:
# - build_docker: Build and push Docker images (default: true)
# - platforms: Comma-separated platform IDs or 'all' (default: all)
name: Build and Release
@@ -44,22 +45,6 @@ on:
- "**/*.svg"
- ".gitignore"
- ".dockerignore"
pull_request:
branches: [ main ]
paths-ignore:
- "**.md"
- "**.txt"
- ".github/**"
- "docs/**"
- "deploy/**"
- "scripts/dev_*.sh"
- "LICENSE*"
- "README*"
- "**/*.png"
- "**/*.jpg"
- "**/*.svg"
- ".gitignore"
- ".dockerignore"
schedule:
- cron: "0 0 * * 0" # Weekly on Sunday at midnight UTC
workflow_dispatch:
@@ -69,6 +54,11 @@ on:
required: false
default: true
type: boolean
platforms:
description: "Comma-separated targets or 'all' (e.g. linux-x86_64-musl,macos-aarch64)"
required: false
default: "all"
type: string
permissions:
contents: read
@@ -154,63 +144,70 @@ jobs:
echo " - Is prerelease: $is_prerelease"
# Build RustFS binaries
prepare-platform-matrix:
name: Prepare Platform Matrix
runs-on: ubicloud-standard-2
outputs:
matrix: ${{ steps.select.outputs.matrix }}
selected: ${{ steps.select.outputs.selected }}
steps:
- name: Select target platforms
id: select
shell: bash
run: |
set -euo pipefail
selected="${{ github.event_name == 'workflow_dispatch' && github.event.inputs.platforms || 'all' }}"
selected="$(echo "${selected}" | tr -d '[:space:]')"
if [[ -z "${selected}" ]]; then
selected="all"
fi
all='{"include":[
{"target_id":"linux-x86_64-musl","os":"ubicloud-standard-2","target":"x86_64-unknown-linux-musl","cross":false,"platform":"linux","rustflags":""},
{"target_id":"linux-aarch64-musl","os":"ubicloud-standard-2","target":"aarch64-unknown-linux-musl","cross":true,"platform":"linux","rustflags":""},
{"target_id":"linux-x86_64-gnu","os":"ubicloud-standard-2","target":"x86_64-unknown-linux-gnu","cross":false,"platform":"linux","rustflags":""},
{"target_id":"linux-aarch64-gnu","os":"ubicloud-standard-2","target":"aarch64-unknown-linux-gnu","cross":true,"platform":"linux","rustflags":""},
{"target_id":"macos-aarch64","os":"macos-latest","target":"aarch64-apple-darwin","cross":false,"platform":"macos","rustflags":""},
{"target_id":"macos-x86_64","os":"macos-latest","target":"x86_64-apple-darwin","cross":false,"platform":"macos","rustflags":""},
{"target_id":"windows-x86_64","os":"windows-latest","target":"x86_64-pc-windows-msvc","cross":false,"platform":"windows","rustflags":""}
]}'
if [[ "${selected}" == "all" ]]; then
matrix="$(jq -c . <<<"${all}")"
else
unknown="$(jq -rn --arg selected "${selected}" --argjson all "${all}" '
($selected | split(",") | map(select(length > 0))) as $req
| ($all.include | map(.target_id)) as $known
| [$req[] | select(( $known | index(.) ) == null)]
')"
if [[ "$(jq 'length' <<<"${unknown}")" -gt 0 ]]; then
echo "Unknown platforms: $(jq -r 'join(\",\")' <<<"${unknown}")" >&2
echo "Allowed: $(jq -r '.include[].target_id' <<<"${all}" | paste -sd ',' -)" >&2
exit 1
fi
matrix="$(jq -c --arg selected "${selected}" '
($selected | split(",") | map(select(length > 0))) as $req
| .include |= map(select(.target_id as $id | ($req | index($id))))
' <<<"${all}")"
fi
echo "selected=${selected}" >> "$GITHUB_OUTPUT"
echo "matrix=${matrix}" >> "$GITHUB_OUTPUT"
echo "Selected platforms: ${selected}"
build-rustfs:
name: Build RustFS
needs: [ build-check ]
if: needs.build-check.outputs.should_build == 'true'
needs: [ build-check, prepare-platform-matrix ]
if: needs.build-check.outputs.should_build == 'true' && needs.prepare-platform-matrix.result == 'success'
runs-on: ${{ matrix.os }}
timeout-minutes: 60
env:
RUSTFLAGS: ${{ matrix.rustflags }}
strategy:
fail-fast: false
matrix:
include:
# Linux builds
# Use x86-64-v2 (SSE4.2 baseline) instead of native to ensure distributed
# binaries run on older x86_64 CPUs (e.g. Intel Celeron/Atom, Synology NAS).
# See: https://github.com/rustfs/rustfs/issues/1838
- os: ubicloud-standard-2
target: x86_64-unknown-linux-musl
cross: false
platform: linux
rustflags: ''
- os: ubicloud-standard-2
target: aarch64-unknown-linux-musl
cross: true
platform: linux
rustflags: ''
- os: ubicloud-standard-2
target: x86_64-unknown-linux-gnu
cross: false
platform: linux
rustflags: ''
- os: ubicloud-standard-2
target: aarch64-unknown-linux-gnu
cross: true
platform: linux
rustflags: ''
# macOS builds
- os: macos-latest
target: aarch64-apple-darwin
cross: false
platform: macos
rustflags: ''
- os: macos-latest
target: x86_64-apple-darwin
cross: false
platform: macos
rustflags: ''
# Windows builds (temporarily disabled)
- os: windows-latest
target: x86_64-pc-windows-msvc
cross: false
platform: windows
rustflags: ''
#- os: windows-latest
# target: aarch64-pc-windows-msvc
# cross: true
# platform: windows
matrix: ${{ fromJson(needs.prepare-platform-matrix.outputs.matrix) }}
steps:
- name: Checkout repository
uses: actions/checkout@v6

View File

@@ -91,6 +91,8 @@ jobs:
typos:
name: Typos
needs: skip-check
if: needs.skip-check.outputs.should_skip != 'true'
runs-on: ubicloud-standard-2
steps:
- uses: actions/checkout@v6
@@ -116,9 +118,6 @@ jobs:
github-token: ${{ secrets.GITHUB_TOKEN }}
cache-save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-nextest
uses: taiki-e/install-action@nextest
- name: Run tests
run: |
cargo nextest run --all --exclude e2e_test
@@ -133,9 +132,40 @@ jobs:
- name: Check layered dependencies
run: ./scripts/check_layer_dependencies.sh
build-rustfs-debug-binary:
name: Build RustFS Debug Binary
needs: skip-check
if: needs.skip-check.outputs.should_skip != 'true'
runs-on: ubicloud-standard-4
timeout-minutes: 30
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Setup Rust environment
uses: ./.github/actions/setup
with:
rust-version: stable
cache-shared-key: ci-rustfs-debug-binary-${{ hashFiles('**/Cargo.lock') }}
cache-save-if: ${{ github.ref == 'refs/heads/main' }}
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build debug binary
run: |
touch rustfs/build.rs
cargo build -p rustfs --bins --jobs 2
- name: Upload debug binary
uses: actions/upload-artifact@v6
with:
name: rustfs-debug-binary
path: target/debug/rustfs
if-no-files-found: error
retention-days: 1
e2e-tests:
name: End-to-End Tests
needs: skip-check
needs: [ skip-check, build-rustfs-debug-binary ]
if: needs.skip-check.outputs.should_skip != 'true'
runs-on: ubicloud-standard-2
timeout-minutes: 30
@@ -148,13 +178,17 @@ jobs:
rm -rf /tmp/rustfs
rm -f /tmp/rustfs.log
- name: Setup Rust environment
uses: ./.github/actions/setup
- name: Download debug binary
uses: actions/download-artifact@v7
with:
rust-version: stable
cache-shared-key: ci-e2e-${{ hashFiles('**/Cargo.lock') }}
cache-save-if: ${{ github.ref == 'refs/heads/main' }}
github-token: ${{ secrets.GITHUB_TOKEN }}
name: rustfs-debug-binary
path: target/debug
- name: Make binary executable
run: chmod +x ./target/debug/rustfs
- name: Setup Rust toolchain for s3s-e2e installation
uses: dtolnay/rust-toolchain@stable
- name: Install s3s-e2e test tool
uses: taiki-e/cache-cargo-install-action@v2
@@ -163,12 +197,6 @@ jobs:
git: https://github.com/s3s-project/s3s.git
rev: 4a04a670cf41274d9be9ab65dc36f4aa3f92fbad
- name: Build debug binary
run: |
touch rustfs/build.rs
# Limit concurrency to prevent OOM
cargo build -p rustfs --bins --jobs 2
- name: Run end-to-end tests
run: |
s3s-e2e --version
@@ -184,7 +212,7 @@ jobs:
s3-implemented-tests:
name: S3 Implemented Tests
needs: skip-check
needs: [ skip-check, build-rustfs-debug-binary ]
if: needs.skip-check.outputs.should_skip != 'true'
runs-on: ubicloud-standard-4
timeout-minutes: 60
@@ -192,18 +220,14 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v6
- name: Setup Rust environment
uses: ./.github/actions/setup
- name: Download debug binary
uses: actions/download-artifact@v7
with:
rust-version: stable
cache-shared-key: ci-s3tests-${{ hashFiles('**/Cargo.lock') }}
cache-save-if: ${{ github.ref == 'refs/heads/main' }}
github-token: ${{ secrets.GITHUB_TOKEN }}
name: rustfs-debug-binary
path: target/debug
- name: Build debug binary
run: |
touch rustfs/build.rs
cargo build -p rustfs --bins --jobs 2
- name: Make binary executable
run: chmod +x ./target/debug/rustfs
- name: Run implemented s3-tests
run: |

View File

@@ -66,6 +66,7 @@ env:
CARGO_TERM_COLOR: always
REGISTRY_DOCKERHUB: rustfs/rustfs
REGISTRY_GHCR: ghcr.io/${{ github.repository }}
REGISTRY_QUAY: quay.io/${{ secrets.QUAY_USERNAME }}/rustfs
DOCKER_PLATFORMS: linux/amd64,linux/arm64
jobs:
@@ -162,14 +163,7 @@ jobs:
if [[ "$version" == *"alpha"* ]] || [[ "$version" == *"beta"* ]] || [[ "$version" == *"rc"* ]]; then
build_type="prerelease"
is_prerelease=true
# TODO: Temporary change - currently allows alpha versions to also create latest tags
# After the version is stable, you need to remove the following line and restore the original logic (latest is created only for stable versions)
if [[ "$version" == *"alpha"* ]]; then
create_latest=true
echo "🧪 Building Docker image for prerelease: $version (temporarily allowing creation of latest tag)"
else
echo "🧪 Building Docker image for prerelease: $version"
fi
echo "🧪 Building Docker image for prerelease: $version"
else
build_type="release"
create_latest=true
@@ -215,14 +209,7 @@ jobs:
v*alpha*|v*beta*|v*rc*|*alpha*|*beta*|*rc*)
build_type="prerelease"
is_prerelease=true
# TODO: Temporary change - currently allows alpha versions to also create latest tags
# After the version is stable, you need to remove the if block below and restore the original logic.
if [[ "$input_version" == *"alpha"* ]]; then
create_latest=true
echo "🧪 Building with prerelease version: $input_version (temporarily allowing creation of latest tag)"
else
echo "🧪 Building with prerelease version: $input_version"
fi
echo "🧪 Building with prerelease version: $input_version"
;;
# Release versions (match after prereleases, more general)
v[0-9]*|[0-9]*.*.*)
@@ -287,12 +274,19 @@ jobs:
username: ${{ env.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
# - name: Login to GitHub Container Registry
# uses: docker/login-action@v3
# with:
# registry: ghcr.io
# username: ${{ github.actor }}
# password: ${{ secrets.GITHUB_TOKEN }}
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_PASSWORD }}
- name: Login to Quay.io
uses: docker/login-action@v3
with:
registry: quay.io
username: ${{ secrets.QUAY_USERNAME }}
password: ${{ secrets.QUAY_PASSWORD }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -338,14 +332,15 @@ jobs:
# Generate tags based on build type
# Only support release and prerelease builds (no development builds)
TAGS="${{ env.REGISTRY_DOCKERHUB }}:${VERSION}${VARIANT_SUFFIX}"
TAG_BASE="${VERSION}${VARIANT_SUFFIX}"
TAGS="${{ env.REGISTRY_DOCKERHUB }}:$TAG_BASE,${{ env.REGISTRY_GHCR }}:$TAG_BASE,${{ env.REGISTRY_QUAY }}:$TAG_BASE"
# Add channel tags for prereleases and latest for stable
if [[ "$CREATE_LATEST" == "true" ]]; then
# TODO: Temporary change - the current alpha version will also create the latest tag
# After the version is stabilized, the logic here remains unchanged, but the upstream CREATE_LATEST setting needs to be restored.
# Stable release (and temporary alpha versions)
TAGS="$TAGS,${{ env.REGISTRY_DOCKERHUB }}:latest${VARIANT_SUFFIX}"
TAGS="$TAGS,${{ env.REGISTRY_DOCKERHUB }}:latest${VARIANT_SUFFIX},${{ env.REGISTRY_GHCR }}:latest${VARIANT_SUFFIX},${{ env.REGISTRY_QUAY }}:latest${VARIANT_SUFFIX}"
elif [[ "$BUILD_TYPE" == "prerelease" ]]; then
# Prerelease channel tags (alpha, beta, rc)
if [[ "$VERSION" == *"alpha"* ]]; then
@@ -357,7 +352,7 @@ jobs:
fi
if [[ -n "$CHANNEL" ]]; then
TAGS="$TAGS,${{ env.REGISTRY_DOCKERHUB }}:${CHANNEL}${VARIANT_SUFFIX}"
TAGS="$TAGS,${{ env.REGISTRY_DOCKERHUB }}:${CHANNEL}${VARIANT_SUFFIX},${{ env.REGISTRY_GHCR }}:${CHANNEL}${VARIANT_SUFFIX},${{ env.REGISTRY_QUAY }}:${CHANNEL}${VARIANT_SUFFIX}"
fi
fi
@@ -441,10 +436,8 @@ jobs:
"prerelease")
echo "🧪 Prerelease Docker image has been built with ${VERSION} tags"
echo "⚠️ This is a prerelease image - use with caution"
# TODO: Temporary change - alpha versions currently create the latest tag
# After the version is stable, you need to restore the following prompt information
if [[ "$VERSION" == *"alpha"* ]] && [[ "$CREATE_LATEST" == "true" ]]; then
echo "🏷️ Latest tag has been created for alpha version (temporary measures)"
if [[ "$CREATE_LATEST" == "true" ]]; then
echo "🏷️ Latest tag has been explicitly created for prerelease"
else
echo "🚫 Latest tag NOT created for prerelease"
fi

View File

@@ -50,6 +50,11 @@ env:
RUST_LOG: info
PLATFORM: linux/amd64
BUILDX_CACHE_SCOPE: rustfs-e2e-s3tests-source
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event.inputs['test-mode'] || 'single' }}
cancel-in-progress: true
defaults:
run:
@@ -57,12 +62,25 @@ defaults:
jobs:
s3tests-single:
if: github.event.inputs.test-mode == 'single'
if: github.event.inputs['test-mode'] == 'single'
runs-on: ubicloud-standard-2
timeout-minutes: 120
steps:
- uses: actions/checkout@v6
- name: Cache pip downloads
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-e2e-s3tests-${{ hashFiles('.github/workflows/e2e-s3tests.yml') }}
restore-keys: |
${{ runner.os }}-pip-e2e-s3tests-
- name: Install Python tools
run: |
python3 -m pip install --user --upgrade pip awscurl tox
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
- name: Enable buildx
uses: docker/setup-buildx-action@v3
@@ -70,8 +88,8 @@ jobs:
run: |
DOCKER_BUILDKIT=1 docker buildx build --load \
--platform ${PLATFORM} \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--cache-from type=gha,scope=${BUILDX_CACHE_SCOPE} \
--cache-to type=gha,mode=max,scope=${BUILDX_CACHE_SCOPE} \
-t rustfs-ci \
-f Dockerfile.source .
@@ -121,9 +139,6 @@ jobs:
- name: Provision s3-tests alt user (required by suite)
run: |
python3 -m pip install --user --upgrade pip awscurl
export PATH="$HOME/.local/bin:$PATH"
# Admin API requires AWS SigV4 signing. awscurl is used by RustFS codebase as well.
awscurl \
--service s3 \
@@ -156,8 +171,6 @@ jobs:
- name: Prepare s3-tests
run: |
python3 -m pip install --user --upgrade pip tox
export PATH="$HOME/.local/bin:$PATH"
git clone --depth 1 https://github.com/ceph/s3-tests.git s3-tests
- name: Run ceph s3-tests (debug friendly)
@@ -211,12 +224,25 @@ jobs:
path: artifacts/**
s3tests-multi:
if: github.event_name == 'workflow_dispatch' && github.event.inputs.test-mode == 'multi'
if: github.event_name == 'workflow_dispatch' && github.event.inputs['test-mode'] == 'multi'
runs-on: ubicloud-standard-2
timeout-minutes: 150
steps:
- uses: actions/checkout@v6
- name: Cache pip downloads
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-e2e-s3tests-${{ hashFiles('.github/workflows/e2e-s3tests.yml') }}
restore-keys: |
${{ runner.os }}-pip-e2e-s3tests-
- name: Install Python tools
run: |
python3 -m pip install --user --upgrade pip awscurl tox
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
- name: Enable buildx
uses: docker/setup-buildx-action@v3
@@ -224,8 +250,8 @@ jobs:
run: |
DOCKER_BUILDKIT=1 docker buildx build --load \
--platform ${PLATFORM} \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--cache-from type=gha,scope=${BUILDX_CACHE_SCOPE} \
--cache-to type=gha,mode=max,scope=${BUILDX_CACHE_SCOPE} \
-t rustfs-ci \
-f Dockerfile.source .
@@ -337,9 +363,6 @@ jobs:
- name: Provision s3-tests alt user (required by suite)
run: |
python3 -m pip install --user --upgrade pip awscurl
export PATH="$HOME/.local/bin:$PATH"
awscurl \
--service s3 \
--region "${S3_REGION}" \
@@ -368,8 +391,6 @@ jobs:
- name: Prepare s3-tests
run: |
python3 -m pip install --user --upgrade pip tox
export PATH="$HOME/.local/bin:$PATH"
git clone --depth 1 https://github.com/ceph/s3-tests.git s3-tests
- name: Run ceph s3-tests (multi, debug friendly)

View File

@@ -31,11 +31,11 @@ jobs:
update-flake:
name: Update flake.lock
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 45
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Install Nix
uses: DeterminateSystems/determinate-nix-action@v3
@@ -46,14 +46,18 @@ jobs:
id: update
uses: DeterminateSystems/update-flake-lock@main
with:
git-author-name: heihutu
git-author-email: heihutu@gmail.com
git-committer-name: heihutu
git-committer-email: heihutu@gmail.com
pr-title: "chore(deps): update flake.lock"
pr-labels: |
dependencies
nix
automated
commit-msg: "chore(deps): update flake.lock"
pr-reviewers: houseme, heihutu
token: ${{ secrets.GITHUB_TOKEN }}
pr-reviewers: houseme, overtrue, majinghe
token: ${{ secrets.FLAKE_UPDATE_TOKEN }}
- name: Log PR details
if: steps.update.outputs.pull-request-number

View File

@@ -43,7 +43,7 @@ jobs:
nix-validation:
name: Nix Build & Check
runs-on: ubuntu-latest
timeout-minutes: 30
timeout-minutes: 45
steps:
- name: Checkout repository
uses: actions/checkout@v6
@@ -57,7 +57,7 @@ jobs:
- name: Setup Magic Nix Cache
uses: DeterminateSystems/magic-nix-cache-action@v13
- name: Setup Flake Checker
uses: DeterminateSystems/flake-checker-action@v12

3
.gitignore vendored
View File

@@ -45,3 +45,6 @@ docs
# nix stuff
result*
*.gz
rustfs-webdav.code-workspace
.aiexclude

142
AGENTS.md
View File

@@ -1,103 +1,67 @@
# Repository Guidelines
# RustFS Agent Instructions (Global)
This file provides guidance for AI agents and developers working with code in this repository.
This root file keeps repository-wide rules only.
Use the nearest subdirectory `AGENTS.md` for path-specific guidance.
## Project Overview
## Rule Precedence
RustFS is a high-performance distributed object storage software built with Rust, providing S3-compatible APIs and advanced features like data lakes, AI, and big data support. It's designed as an alternative to MinIO with better performance and a more business-friendly Apache 2.0 license.
1. System/developer instructions.
2. This file (global defaults).
3. The nearest `AGENTS.md` in the current path (more specific scope wins).
## ⚠️ Pre-Commit Checklist (MANDATORY)
If repo-level instructions conflict, follow the nearest file and keep behavior aligned with CI.
## Communication and Language
- Respond in the same language used by the requester.
- Keep source code, comments, commit messages, and PR title/body in English.
## Sources of Truth
- Workspace layout and crate membership: `Cargo.toml` (`[workspace].members`)
- Local quality commands: `Makefile` and `.config/make/`
- CI quality gates: `.github/workflows/ci.yml`
- PR template: `.github/pull_request_template.md`
Avoid duplicating long crate lists or command matrices in instruction files.
Reference the source files above instead.
## Mandatory Before Commit
Run and pass:
**Before EVERY commit, you MUST run and pass ALL of the following:**
```bash
cargo fmt --all --check # Code formatting
cargo clippy --all-targets --all-features -- -D warnings # Lints
cargo test --workspace --exclude e2e_test # Unit tests
make pre-commit
```
Or simply run `make pre-commit` which covers all checks. **DO NOT commit if any check fails.**
## Communication Rules
If `make` is unavailable, run the equivalent checks defined under `.config/make/`.
Do not commit when required checks fail.
- Respond to the user in the same language used by the user.
- Code and documentation must be written in English only.
- **Pull Request titles and descriptions must be written in English** to ensure consistency and accessibility for all contributors.
## Git and PR Baseline
## Project Structure
- Use feature branches based on the latest `main`.
- Follow Conventional Commits, with subject length <= 72 characters.
- Keep PR title and description in English.
- Use `.github/pull_request_template.md` and keep all section headings.
- Use `N/A` for non-applicable template sections.
- Include verification commands in the PR description.
- When using `gh pr create`/`gh pr edit`, use `--body-file` instead of inline `--body` for multiline markdown.
The workspace root hosts shared dependencies in `Cargo.toml`. The service binary lives under `rustfs/src/main.rs`, while reusable crates sit in `crates/` (including `ecstore`, `iam`, `kms`, `madmin`, `s3select-api`, `s3select-query`, `config`, `crypto`, `lock`, `filemeta`, `rio`, `common`, `protos`, `audit-logger`, `notify`, `obs`, `workers`, `appauth`, `ahm`, `mcp`, `signer`, `checksums`, `utils`, `zip`, `targets`, and `e2e_test`). Deployment manifests are under `deploy/`, Docker assets sit at the root, and automation lives in `scripts/`.
## Security Baseline
### Core Architecture
- Never commit secrets, credentials, or key material.
- Use environment variables or vault tooling for sensitive configuration.
- For localhost-sensitive tests, verify proxy settings to avoid traffic leakage.
- **Main Binary (`rustfs/`):** Entry point at `rustfs/src/main.rs`, includes admin, auth, config, server, storage, license management, and profiling modules.
- **Key Crates:** Cargo workspace with 25+ crates supporting S3-compatible APIs, erasure coding storage, IAM, KMS, S3 Select, and observability.
- **Build System:** Custom `build-rustfs.sh` script, multi-architecture Docker builds, Make/Just task runners, cross-compilation support.
## Scoped Guidance in This Repository
## Build, Test, and Development Commands
### Quick Commands
- `cargo check --all-targets` - Fast validation
- `cargo build --release` or `make build` - Release build
- `./build-rustfs.sh --dev` - Development build with debug symbols
- `make pre-commit` - Run all quality checks (fmt, clippy, check, test)
### Testing
- `cargo test --workspace --exclude e2e_test` - Unit tests
- `cargo test --package e2e_test` - E2E tests
- For KMS tests: `NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= cargo test --package e2e_test test_local_kms_end_to_end -- --nocapture --test-threads=1`
### Cross-Platform Builds
- `./build-rustfs.sh --platform x86_64-unknown-linux-musl` - Build for musl
- `./build-rustfs.sh --platform aarch64-unknown-linux-gnu` - Build for ARM64
- `make build-cross-all` - Build all supported architectures
## Coding Style & Safety Requirements
- **Formatting:** Follow `rustfmt.toml` (130-column width). Use `snake_case` for items, `PascalCase` for types, `SCREAMING_SNAKE_CASE` for constants.
- **Safety:** `unsafe_code = "deny"` enforced at workspace level. Never use `unwrap()`, `expect()`, or panic-inducing code except in tests.
- **Error Handling:** Prefer `anyhow` for applications, `thiserror` for libraries. Use proper error handling with `Result<T, E>` and `Option<T>`.
- **Async:** Keep async code non-blocking. Offload CPU-heavy work with `tokio::task::spawn_blocking` when necessary.
- **Language:** Code comments, function names, variable names, and all text in source files must be in English only.
## Testing Guidelines
Co-locate unit tests with their modules and give behavior-led names. Integration suites belong in each crate's `tests/` directory, while exhaustive end-to-end scenarios live in `crates/e2e_test/`. When fixing bugs or adding features, include regression tests that capture the new behavior.
## KMS (Key Management Service)
- **Implementation:** Complete with Local and Vault backends, auto-configures on startup with `--kms-enable` flag.
- **Encryption:** Full S3-compatible server-side encryption (SSE-S3, SSE-KMS, SSE-C).
- **Testing:** Comprehensive E2E tests in `crates/e2e_test/src/kms/`. Requires proxy bypass for local testing.
- **Key Files:** `crates/kms/`, `rustfs/src/storage/ecfs.rs`, `rustfs/src/admin/handlers/kms*.rs`
## Environment Variables
- Global configuration environment variables must use flat `RUSTFS_*` names (no module segments), such as `RUSTFS_REGION`, `RUSTFS_ADDRESS`, `RUSTFS_VOLUMES`, and `RUSTFS_LICENSE`.
- `RUSTFS_SCANNER_ENABLED` - Enable/disable background data scanner (default: true)
- `RUSTFS_HEAL_ENABLED` - Enable/disable auto-heal functionality (default: true)
- Deprecated aliases (for pre-beta compatibility) are documented in [the config module README](crates/config/README.md#environment-variable-naming-conventions) and must log warnings when used.
- For KMS tests: `NO_PROXY=127.0.0.1,localhost` and clear proxy environment variables
## Commit & Pull Request Guidelines
Work on feature branches (e.g., `feat/...`) after syncing `main`. Follow Conventional Commits under 72 characters. Each commit must compile, format cleanly, and pass `make pre-commit`.
**Pull Request Requirements:**
- PR titles and descriptions **MUST be written in English**
- Open PRs with a concise summary, note verification commands, link relevant issues
- Follow the PR template format and fill in all required sections
- Wait for reviewer approval before merging
## Security & Configuration Tips
Do not commit secrets or cloud credentials; prefer environment variables or vault tooling. Review IAM- and KMS-related changes with a second maintainer. Confirm proxy settings before running sensitive tests to avoid leaking traffic outside localhost.
## Important Reminders
- Always compile after code changes: Use `cargo build` to catch errors early
- Don't bypass tests: All functionality must be properly tested, not worked around
- Use proper error handling: Never use `unwrap()` or `expect()` in production code (except tests)
- Do what has been asked; nothing more, nothing less
- NEVER create files unless they're absolutely necessary for achieving your goal
- ALWAYS prefer editing an existing file to creating a new one
- NEVER proactively create documentation files (*.md) or README files unless explicitly requested
- NEVER commit PR description files (e.g., PR_DESCRIPTION.md): These are temporary reference files for creating pull requests and should remain local only
- `.github/AGENTS.md`
- `crates/AGENTS.md`
- `crates/config/AGENTS.md`
- `crates/ecstore/AGENTS.md`
- `crates/e2e_test/AGENTS.md`
- `crates/iam/AGENTS.md`
- `crates/kms/AGENTS.md`
- `crates/policy/AGENTS.md`
- `rustfs/src/admin/AGENTS.md`
- `rustfs/src/storage/AGENTS.md`

View File

@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- **Helm Ingress**: `customAnnotations` are now merged with class-specific annotations (nginx/traefik) instead of being ignored when `ingress.className` is set.
### Added
- **OpenStack Keystone Authentication Integration**: Full support for OpenStack Keystone authentication via X-Auth-Token headers
- Tower-based middleware (`KeystoneAuthLayer`) self-contained within `rustfs-keystone` crate

817
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -39,6 +39,7 @@ members = [
"crates/protocols", # Protocol implementations (FTPS, SFTP, etc.)
"crates/protos", # Protocol buffer definitions
"crates/rio", # Rust I/O utilities and abstractions
"crates/s3-common", # Common utilities and data structures for S3 compatibility
"crates/s3select-api", # S3 Select API interface
"crates/s3select-query", # S3 Select query engine
"crates/scanner", # Scanner for data integrity checks and health monitoring
@@ -49,7 +50,7 @@ members = [
"crates/workers", # Worker thread pools and task scheduling
"crates/zip", # ZIP file handling and compression
]
resolver = "2"
resolver = "3"
[workspace.package]
edition = "2024"
@@ -94,6 +95,7 @@ rustfs-obs = { path = "crates/obs", version = "0.0.5" }
rustfs-policy = { path = "crates/policy", version = "0.0.5" }
rustfs-protos = { path = "crates/protos", version = "0.0.5" }
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
rustfs-s3-common = { path = "crates/s3-common", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
rustfs-scanner = { path = "crates/scanner", version = "0.0.5" }
@@ -122,8 +124,8 @@ http = "1.4.0"
http-body = "1.0.1"
http-body-util = "0.1.3"
reqwest = { version = "0.13.2", default-features = false, features = ["rustls", "charset", "http2", "system-proxy", "stream", "json", "blocking", "query", "form"] }
socket2 = { version = "0.6.2", features = ["all"] }
tokio = { version = "1.49.0", features = ["fs", "rt-multi-thread"] }
socket2 = { version = "0.6.3", features = ["all"] }
tokio = { version = "1.50.0", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "aws-lc-rs"] }
tokio-stream = { version = "0.1.18" }
tokio-test = "0.4.5"
@@ -142,7 +144,7 @@ flatbuffers = "25.12.19"
form_urlencoded = "1.2.2"
prost = "0.14.3"
quick-xml = "0.39.2"
rmcp = { version = "0.17.0" }
rmcp = { version = "1.2.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
serde = { version = "1.0.228", features = ["derive"] }
@@ -160,7 +162,7 @@ hmac = { version = "0.13.0-rc.5" }
jsonwebtoken = { version = "10.3.0", features = ["aws_lc_rs"] }
openidconnect = { version = "4.0", default-features = false }
pbkdf2 = "0.13.0-rc.9"
rsa = { version = "0.10.0-rc.16" }
rsa = { version = "0.10.0-rc.17" }
rustls = { version = "0.23.37", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] }
rustls-pki-types = "1.14.0"
sha1 = "0.11.0-rc.5"
@@ -171,7 +173,7 @@ zeroize = { version = "1.8.2", features = ["derive"] }
# Time and Date
chrono = { version = "0.4.44", features = ["serde"] }
humantime = "2.3.0"
jiff = { version = "0.2.22", features = ["serde"] }
jiff = { version = "0.2.23", features = ["serde"] }
time = { version = "0.3.47", features = ["std", "parsing", "formatting", "macros", "serde"] }
# Utilities and Tools
@@ -180,35 +182,39 @@ arc-swap = "1.8.2"
astral-tokio-tar = "0.5.6"
atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.14" }
aws-credential-types = { version = "1.2.13" }
aws-sdk-s3 = { version = "1.124.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-smithy-http-client = { version = "1.1.11", default-features = false, features = ["default-client", "rustls-aws-lc"] }
aws-smithy-types = { version = "1.4.5" }
aws-config = { version = "1.8.15" }
aws-credential-types = { version = "1.2.14" }
aws-sdk-s3 = { version = "1.126.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-smithy-http-client = { version = "1.1.12", default-features = false, features = ["default-client", "rustls-aws-lc"] }
aws-smithy-types = { version = "1.4.6" }
backtrace = "0.3.76"
base64 = "0.22.1"
base64-simd = "0.8.0"
brotli = "8.0.2"
cfg-if = "1.0.4"
clap = { version = "4.5.60", features = ["derive", "env"] }
clap = { version = "4.6.0", features = ["derive", "env"] }
const-str = { version = "1.1.0", features = ["std", "proc"] }
convert_case = "0.11.0"
criterion = { version = "0.8", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "52.2.0"
crossbeam-channel = "0.5.15"
crossbeam-deque = "0.8.6"
crossbeam-utils = "0.8.21"
datafusion = "52.3.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.9"
glob = "0.3.3"
google-cloud-storage = "1.8.0"
google-cloud-auth = "1.6.0"
google-cloud-storage = "1.9.0"
google-cloud-auth = "1.7.0"
hashbrown = { version = "0.16.1", features = ["serde", "rayon"] }
hex = "0.4.3"
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
ipnetwork = { version = "0.21.1", features = ["serde"] }
lazy_static = "1.5.0"
libc = "0.2.182"
libc = "0.2.183"
libsystemd = "0.7.2"
local-ip-address = "0.6.10"
lz4 = "1.28.1"
@@ -224,6 +230,7 @@ object_store = "0.12.5"
parking_lot = "0.12.5"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
percent-encoding = "2.3.2"
pin-project-lite = "0.2.17"
pretty_assertions = "1.4.1"
rand = { version = "0.10.0", features = ["serde"] }
@@ -235,36 +242,36 @@ rumqttc = { version = "0.25.1" }
rustix = { version = "1.1.4", features = ["fs"] }
rust-embed = { version = "8.11.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.13.0", features = ["minio"] }
s3s = { git = "https://github.com/s3s-project/s3s", rev = "c2dc7b16535659904d4efff52c558fc039be1ef3", features = ["minio"] }
serial_test = "3.4.0"
shadow-rs = { version = "1.7.0", default-features = false }
shadow-rs = { version = "1.7.1", default-features = false }
siphasher = "1.0.2"
smallvec = { version = "1.15.1", features = ["serde"] }
smartstring = "1.0.1"
snafu = "0.8.9"
snafu = "0.9.0"
snap = "1.1.1"
starshard = { version = "1.1.0", features = ["rayon", "async", "serde"] }
strum = { version = "0.28.0", features = ["derive"] }
sysinfo = "0.38.3"
sysinfo = "0.38.4"
temp-env = "0.3.6"
tempfile = "3.26.0"
tempfile = "3.27.0"
test-case = "3.3.1"
thiserror = "2.0.18"
tracing = { version = "0.1.44" }
tracing-appender = "0.2.4"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "time"] }
tracing-subscriber = { version = "0.3.23", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.8"
urlencoding = "2.1.3"
uuid = { version = "1.21.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
uuid = { version = "1.22.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.6.1", features = ["serde"] }
windows = { version = "0.62.2" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "8.1.0"
zip = "8.2.0"
zstd = "0.13.3"
# Observability and Metrics
@@ -275,6 +282,7 @@ opentelemetry-otlp = { version = "0.31.0", features = ["gzip-http", "reqwest-rus
opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry-stdout = { version = "0.31.0" }
pyroscope = { version = "2.0.0", features = ["backend-pprof-rs"] }
# FTP and SFTP
libunftp = { version = "0.23.0", features = ["experimental"] }
@@ -282,6 +290,9 @@ unftp-core = "0.1.0"
suppaftp = { version = "8.0.2", features = ["tokio", "tokio-rustls-aws-lc-rs"] }
rcgen = "0.14.7"
# WebDAV
dav-server = "0.11.0"
# Performance Analysis and Memory Profiling
mimalloc = "0.1"
# Use tikv-jemallocator as memory allocator and enable performance analysis
@@ -291,7 +302,9 @@ tikv-jemalloc-ctl = { version = "0.6", features = ["use_std", "stats", "profilin
# Used to generate pprof-compatible memory profiling data and support symbolization and flame graphs
jemalloc_pprof = { version = "0.8.2", features = ["symbolize", "flamegraph"] }
# Used to generate CPU performance analysis data and flame diagrams
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }
# pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }
# Pyroscope uses a patched pprof, until they merge back upstream, replace all references. Otherwise, two pprof libs with symbol collision.
pprof = { package = "pprof-pyroscope-fork", version = "0.1500.3", features = ["flamegraph", "protobuf-codec"] }
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rustfs-mcp"]

View File

@@ -1,4 +1,4 @@
[![RustFS](https://github.com/user-attachments/assets/3ba82e75-2f2d-4415-a4aa-1e4ffe9f22fd)](https://rustfs.com)
[![RustFS](https://repository-images.githubusercontent.com/722597620/0fa936a2-8164-4f53-867f-def4beb64b21)](https://rustfs.com)
<p align="center">RustFS is a high-performance, distributed object storage system built in Rust.</p>
@@ -42,6 +42,7 @@ Unlike other storage systems, RustFS is released under the permissible Apache 2.
- **High Performance**: Built with Rust to ensure maximum speed and resource efficiency.
- **Distributed Architecture**: Scalable and fault-tolerant design suitable for large-scale deployments.
- **S3 Compatibility**: Seamless integration with existing S3-compatible applications and tools.
- **OpenStack Swift API**: Native support for Swift protocol with Keystone authentication.
- **OpenStack Keystone Integration**: Native support for OpenStack Keystone authentication with X-Auth-Token headers.
- **Data Lake Support**: Optimized for high-throughput big data and AI workloads.
- **Open Source**: Licensed under Apache 2.0, encouraging unrestricted community contributions and commercial usage.
@@ -56,6 +57,7 @@ Unlike other storage systems, RustFS is released under the permissible Apache 2.
| **Event Notifications** | ✅ Available | **Distributed Mode** | 🚧 Under Testing |
| **K8s Helm Charts** | ✅ Available | **RustFS KMS** | 🚧 Under Testing |
| **Keystone Auth** | ✅ Available | **Multi-Tenancy** | ✅ Available |
| **Swift API** | ✅ Available | **Swift Metadata Ops** | 🚧 Partial |
## RustFS vs MinIO Performance

View File

@@ -100,7 +100,7 @@ curl -O https://rustfs.com/install_rustfs.sh && bash install_rustfs.sh
### 2\. Docker 快速启动 (选项 2)
RustFS 容器以非 root 用户 `rustfs` (UID `10001`) 运行。如果您使用 Docker 的 `-v` 参数挂载宿主机目录,请务必确保宿主机目录的所有者已更改为 `1000`,否则会遇到权限拒绝错误。
RustFS 容器以非 root 用户 `rustfs` (UID `10001`) 运行。如果您使用 Docker 的 `-v` 参数挂载宿主机目录,请务必确保宿主机目录的所有者已更改为 `10001`,否则会遇到权限拒绝错误。
```bash
# 创建数据和日志目录

View File

@@ -42,6 +42,8 @@ GAE = "GAE"
# s3-tests original test names (cannot be changed)
nonexisted = "nonexisted"
consts = "consts"
# Swift API - company/product names
Hashi = "Hashi" # HashiCorp
[files]
extend-exclude = []

20
crates/AGENTS.md Normal file
View File

@@ -0,0 +1,20 @@
# Crates Instructions
Applies to all paths under `crates/`.
## Library Design
- Treat crate code as reusable library code by default.
- Prefer `thiserror` for library-facing error types.
- Do not use `unwrap()`, `expect()`, or panic-driven control flow outside tests.
## Testing
- Keep unit tests close to the module they test.
- Keep integration tests under each crate's `tests/` directory.
- Add regression tests for bug fixes and behavior changes.
## Async and Performance
- Keep async paths non-blocking.
- Move CPU-heavy operations out of async hot paths with `tokio::task::spawn_blocking` when appropriate.

View File

@@ -29,6 +29,7 @@ categories = ["web-programming", "development-tools", "asynchronous", "api-bindi
rustfs-targets = { workspace = true }
rustfs-config = { workspace = true, features = ["audit", "constants"] }
rustfs-ecstore = { workspace = true }
rustfs-s3-common = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
const-str = { workspace = true }

View File

@@ -14,7 +14,7 @@
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use serde::{Deserialize, Serialize};
use serde_json::Value;

View File

@@ -458,10 +458,7 @@ impl DataUsageEntry {
self.size += other.size;
if let Some(o_rep) = &other.replication_stats {
if self.replication_stats.is_none() {
self.replication_stats = Some(ReplicationAllStats::default());
}
let s_rep = self.replication_stats.as_mut().unwrap();
let s_rep = self.replication_stats.get_or_insert_with(ReplicationAllStats::default);
s_rep.targets.clear();
s_rep.replica_size += o_rep.replica_size;
s_rep.replica_count += o_rep.replica_count;
@@ -586,7 +583,7 @@ impl DataUsageCache {
return Some(root);
}
let mut flat = self.flatten(&root);
if flat.replication_stats.is_some() && flat.replication_stats.as_ref().unwrap().empty() {
if flat.replication_stats.as_ref().is_some_and(|stats| stats.empty()) {
flat.replication_stats = None;
}
Some(flat)
@@ -679,7 +676,9 @@ impl DataUsageCache {
leaves.sort_by(|a, b| a.objects.cmp(&b.objects));
while remove > 0 && !leaves.is_empty() {
let e = leaves.first().unwrap();
let Some(e) = leaves.first() else {
break;
};
let candidate = e.path.clone();
if candidate == *path && !compact_self {
break;
@@ -703,12 +702,9 @@ impl DataUsageCache {
}
pub fn total_children_rec(&self, path: &str) -> usize {
let root = self.find(path);
if root.is_none() {
let Some(root) = self.find(path) else {
return 0;
}
let root = root.unwrap();
};
if root.children.is_empty() {
return 0;
}
@@ -721,31 +717,36 @@ impl DataUsageCache {
}
pub fn merge(&mut self, o: &DataUsageCache) {
let mut existing_root = self.root();
let other_root = o.root();
if existing_root.is_none() && other_root.is_none() {
return;
}
if other_root.is_none() {
return;
}
if existing_root.is_none() {
let Some(mut existing_root) = self.root() else {
if o.root().is_none() {
return;
}
*self = o.clone();
return;
}
if o.info.last_update.gt(&self.info.last_update) {
};
let Some(other_root) = o.root() else {
return;
};
if o.info.last_update > self.info.last_update {
self.info.last_update = o.info.last_update;
}
existing_root.as_mut().unwrap().merge(other_root.as_ref().unwrap());
self.cache.insert(hash_path(&self.info.name).key(), existing_root.unwrap());
let e_hash = self.root_hash();
for key in other_root.as_ref().unwrap().children.iter() {
let entry = &o.cache[key];
existing_root.merge(&other_root);
self.cache.insert(hash_path(&self.info.name).key(), existing_root);
let root_hash = self.root_hash();
for key in other_root.children.iter() {
let Some(entry) = o.cache.get(key) else {
continue;
};
let flat = o.flatten(entry);
let mut existing = self.cache[key].clone();
existing.merge(&flat);
self.replace_hashed(&DataUsageHash(key.clone()), &Some(e_hash.clone()), &existing);
if let Some(existing) = self.cache.get_mut(key) {
existing.merge(&flat);
} else {
self.replace_hashed(&DataUsageHash(key.clone()), &Some(root_hash.clone()), &flat);
}
}
}
@@ -1141,10 +1142,12 @@ impl DataUsageInfo {
self.buckets_count = self.buckets_usage.len() as u64;
// Update last update time
if let Some(other_update) = other.last_update
&& (self.last_update.is_none() || other_update > self.last_update.unwrap())
{
self.last_update = Some(other_update);
if let Some(other_update) = other.last_update {
match self.last_update {
None => self.last_update = Some(other_update),
Some(self_update) if other_update > self_update => self.last_update = Some(other_update),
_ => {}
}
}
}
}
@@ -1285,4 +1288,59 @@ mod tests {
assert_eq!(summary1.total_size, 300);
assert_eq!(summary1.versions, 15);
}
#[test]
fn test_data_usage_cache_merge_adds_missing_child() {
let mut base = DataUsageCache::default();
base.info.name = "bucket".to_string();
base.replace("bucket", "", DataUsageEntry::default());
let mut other = DataUsageCache::default();
other.info.name = "bucket".to_string();
let child = DataUsageEntry {
size: 42,
..Default::default()
};
other.replace("bucket/child", "bucket", child);
base.merge(&other);
let root = base.find("bucket").expect("root bucket should exist");
assert_eq!(root.size, 0);
let child_entry = base.find("bucket/child").expect("merged child should be added");
assert_eq!(child_entry.size, 42);
}
#[test]
fn test_data_usage_cache_merge_accumulates_existing_child() {
let mut base = DataUsageCache::default();
base.info.name = "bucket".to_string();
base.replace(
"bucket/child",
"bucket",
DataUsageEntry {
size: 10,
objects: 1,
..Default::default()
},
);
let mut other = DataUsageCache::default();
other.info.name = "bucket".to_string();
other.replace(
"bucket/child",
"bucket",
DataUsageEntry {
size: 20,
objects: 2,
..Default::default()
},
);
base.merge(&other);
let child_entry = base.find("bucket/child").expect("child should remain after merge");
assert_eq!(child_entry.size, 30);
assert_eq!(child_entry.objects, 3);
}
}

View File

@@ -296,12 +296,13 @@ type HealResponseSender = broadcast::Sender<HealChannelResponse>;
static GLOBAL_HEAL_RESPONSE_SENDER: OnceLock<HealResponseSender> = OnceLock::new();
/// Initialize global heal channel
pub fn init_heal_channel() -> HealChannelReceiver {
pub fn init_heal_channel() -> Result<HealChannelReceiver, &'static str> {
let (tx, rx) = mpsc::unbounded_channel();
GLOBAL_HEAL_CHANNEL_SENDER
.set(tx)
.expect("Heal channel sender already initialized");
rx
if GLOBAL_HEAL_CHANNEL_SENDER.set(tx).is_ok() {
Ok(rx)
} else {
Err("Heal channel sender already initialized")
}
}
/// Get global heal channel sender

View File

@@ -155,10 +155,7 @@ impl LastMinuteLatency {
}
pub fn add(&mut self, t: &Duration) {
let sec = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let sec = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.forward_to(sec);
let win_idx = sec % 60;
self.totals[win_idx as usize].add(t);
@@ -174,10 +171,7 @@ impl LastMinuteLatency {
pub fn get_total(&mut self) -> AccElem {
let mut res = AccElem::default();
let sec = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let sec = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
self.forward_to(sec);
for elem in self.totals.iter() {
res.merge(elem);

28
crates/config/AGENTS.md Normal file
View File

@@ -0,0 +1,28 @@
# Config Crate Instructions
Applies to `crates/config/`.
## Environment Variable Naming
- Global configuration variables must use flat `RUSTFS_*` names.
- Do not introduce module-segmented names such as `RUSTFS_CONFIG_*`.
Canonical examples:
- `RUSTFS_REGION`
- `RUSTFS_ADDRESS`
- `RUSTFS_VOLUMES`
- `RUSTFS_LICENSE`
- `RUSTFS_SCANNER_ENABLED`
- `RUSTFS_HEAL_ENABLED`
## Compatibility
- Deprecated aliases must keep warning behavior.
- Document aliases in `crates/config/README.md`.
- Any alias change should include tests for both canonical and deprecated forms.
## Source of Truth
- Constants: `crates/config/src/constants/app.rs`
- Naming conventions: `crates/config/README.md#environment-variable-naming-conventions`

View File

@@ -49,6 +49,12 @@ Current guidance:
- Deprecated example:
- `RUSTFS_ENABLE_SCANNER` -> `RUSTFS_SCANNER_ENABLED`
- `RUSTFS_ENABLE_HEAL` -> `RUSTFS_HEAL_ENABLED`
- `RUSTFS_DATA_SCANNER_START_DELAY_SECS` -> `RUSTFS_SCANNER_START_DELAY_SECS`
## Scanner environment aliases
- `RUSTFS_SCANNER_START_DELAY_SECS` (canonical)
- `RUSTFS_DATA_SCANNER_START_DELAY_SECS` (deprecated alias for compatibility)
## 📄 License

View File

@@ -131,6 +131,12 @@ pub const ENV_RUSTFS_ADDRESS: &str = "RUSTFS_ADDRESS";
/// Environment variable for server volumes.
pub const ENV_RUSTFS_VOLUMES: &str = "RUSTFS_VOLUMES";
/// Environment variable for server tls path.
pub const ENV_RUSTFS_TLS_PATH: &str = "RUSTFS_TLS_PATH";
/// Default value for the server TLS path if `ENV_RUSTFS_TLS_PATH` is not set.
pub const DEFAULT_RUSTFS_TLS_PATH: &str = "";
/// Default port for rustfs
/// This is the default port for rustfs.
/// This is used to bind the server to a specific port.
@@ -221,6 +227,12 @@ pub const DEFAULT_OBS_METRICS_EXPORT_ENABLED: bool = true;
/// Environment variable: RUSTFS_OBS_LOGS_EXPORT_ENABLED
pub const DEFAULT_OBS_LOGS_EXPORT_ENABLED: bool = true;
/// Default profiling export enabled
/// It is used to enable or disable exporting profiles
/// Default value: true
/// Environment variable: RUSTFS_OBS_PROFILING_EXPORT_ENABLED
pub const DEFAULT_OBS_PROFILING_EXPORT_ENABLED: bool = true;
/// Default log local logging enabled for rustfs
/// This is the default log local logging enabled for rustfs.
/// It is used to enable or disable local logging of the application.

View File

@@ -167,3 +167,16 @@ pub const DEFAULT_OBJECT_CACHE_TTI_SECS: u64 = 120;
///
/// Default is set to 5 hits.
pub const DEFAULT_OBJECT_HOT_MIN_HITS_TO_EXTEND: usize = 5;
/// Skip bitrot hash verification on GetObject reads.
///
/// When enabled, GetObject reads skip the per-shard hash
/// computation and comparison, reducing CPU usage on the read path.
/// The background scanner still performs full integrity verification.
/// Does not affect writes, heals, or scanner operations.
///
/// Default is false (verify on every read, matching pre-existing behavior).
pub const ENV_OBJECT_GET_SKIP_BITROT_VERIFY: &str = "RUSTFS_OBJECT_GET_SKIP_BITROT_VERIFY";
/// Default: bitrot verification is enabled on GetObject reads (do not skip).
pub const DEFAULT_OBJECT_GET_SKIP_BITROT_VERIFY: bool = false;

View File

@@ -45,3 +45,15 @@ pub const ENV_FTPS_CERTS_DIR: &str = "RUSTFS_FTPS_CERTS_DIR";
pub const ENV_FTPS_CA_FILE: &str = "RUSTFS_FTPS_CA_FILE";
pub const ENV_FTPS_PASSIVE_PORTS: &str = "RUSTFS_FTPS_PASSIVE_PORTS";
pub const ENV_FTPS_EXTERNAL_IP: &str = "RUSTFS_FTPS_EXTERNAL_IP";
/// Default WebDAV server bind address
pub const DEFAULT_WEBDAV_ADDRESS: &str = "0.0.0.0:8080";
/// WebDAV environment variable names
pub const ENV_WEBDAV_ENABLE: &str = "RUSTFS_WEBDAV_ENABLE";
pub const ENV_WEBDAV_ADDRESS: &str = "RUSTFS_WEBDAV_ADDRESS";
pub const ENV_WEBDAV_TLS_ENABLED: &str = "RUSTFS_WEBDAV_TLS_ENABLED";
pub const ENV_WEBDAV_CERTS_DIR: &str = "RUSTFS_WEBDAV_CERTS_DIR";
pub const ENV_WEBDAV_CA_FILE: &str = "RUSTFS_WEBDAV_CA_FILE";
pub const ENV_WEBDAV_MAX_BODY_SIZE: &str = "RUSTFS_WEBDAV_MAX_BODY_SIZE";
pub const ENV_WEBDAV_REQUEST_TIMEOUT: &str = "RUSTFS_WEBDAV_REQUEST_TIMEOUT";

View File

@@ -12,17 +12,107 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/// Environment variable name that specifies the data scanner start delay in seconds.
/// - Purpose: Define the delay between data scanner operations.
use std::time::Duration;
/// Canonical environment variable name that specifies the scanner start delay in seconds.
/// If set, this overrides the cycle interval derived from `RUSTFS_SCANNER_SPEED`.
/// - Unit: seconds (u64).
/// - Valid values: any positive integer.
/// - Semantics: This delay controls how frequently the data scanner checks for and processes data; shorter delays lead to more responsive scanning but may increase system load.
/// - Example: `export RUSTFS_DATA_SCANNER_START_DELAY_SECS=10`
/// - Note: Choose an appropriate delay that balances scanning responsiveness with overall system performance.
/// - Example: `export RUSTFS_SCANNER_START_DELAY_SECS=10`
pub const ENV_SCANNER_START_DELAY_SECS: &str = "RUSTFS_SCANNER_START_DELAY_SECS";
/// Deprecated compatibility alias for scanner start delay.
/// Prefer `RUSTFS_SCANNER_START_DELAY_SECS`.
#[deprecated(note = "Use RUSTFS_SCANNER_START_DELAY_SECS instead")]
pub const ENV_DATA_SCANNER_START_DELAY_SECS: &str = "RUSTFS_DATA_SCANNER_START_DELAY_SECS";
/// Default data scanner start delay in seconds if not specified in the environment variable.
/// - Value: 10 seconds.
/// - Rationale: This default interval provides a reasonable balance between scanning responsiveness and system load for most deployments.
/// - Adjustments: Users may modify this value via the `RUSTFS_DATA_SCANNER_START_DELAY_SECS` environment variable based on their specific scanning requirements and system performance.
pub const DEFAULT_DATA_SCANNER_START_DELAY_SECS: u64 = 60;
/// Environment variable that selects the scanner speed preset.
/// Valid values: `fastest`, `fast`, `default`, `slow`, `slowest`.
/// Controls the sleep factor, maximum sleep duration, and cycle interval.
/// - Example: `export RUSTFS_SCANNER_SPEED=slow`
pub const ENV_SCANNER_SPEED: &str = "RUSTFS_SCANNER_SPEED";
/// Default scanner speed preset.
pub const DEFAULT_SCANNER_SPEED: &str = "default";
/// Environment variable that controls whether the scanner sleeps between operations.
/// When `true` (default), the scanner throttles itself. When `false`, it runs at full speed.
/// - Example: `export RUSTFS_SCANNER_IDLE_MODE=false`
pub const ENV_SCANNER_IDLE_MODE: &str = "RUSTFS_SCANNER_IDLE_MODE";
/// Default scanner idle mode.
pub const DEFAULT_SCANNER_IDLE_MODE: bool = true;
/// Scanner speed preset controlling throttling behavior.
///
/// Each preset defines three parameters:
/// - **sleep_factor**: Multiplier applied to elapsed work time to compute inter-object sleep.
/// - **max_sleep**: Upper bound on any single throttle sleep.
/// - **cycle_interval**: Base delay between scan cycles.
///
/// | Preset | Factor | Max Sleep | Cycle Interval |
/// |-----------|--------|-----------|----------------|
/// | `fastest` | 0 | 0 | 1 second |
/// | `fast` | 1x | 100ms | 1 minute |
/// | `default` | 2x | 1 second | 1 minute |
/// | `slow` | 10x | 15 seconds| 1 minute |
/// | `slowest` | 100x | 15 seconds| 30 minutes |
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ScannerSpeed {
Fastest,
Fast,
#[default]
Default,
Slow,
Slowest,
}
impl ScannerSpeed {
pub fn sleep_factor(self) -> f64 {
match self {
Self::Fastest => 0.0,
Self::Fast => 1.0,
Self::Default => 2.0,
Self::Slow => 10.0,
Self::Slowest => 100.0,
}
}
pub fn max_sleep(self) -> Duration {
match self {
Self::Fastest => Duration::ZERO,
Self::Fast => Duration::from_millis(100),
Self::Default => Duration::from_secs(1),
Self::Slow | Self::Slowest => Duration::from_secs(15),
}
}
pub fn cycle_interval(self) -> Duration {
match self {
Self::Fastest => Duration::from_secs(1),
Self::Fast | Self::Default | Self::Slow => Duration::from_secs(60),
Self::Slowest => Duration::from_secs(30 * 60),
}
}
pub fn from_env_str(s: &str) -> Self {
match s.trim().to_ascii_lowercase().as_str() {
"fastest" => Self::Fastest,
"fast" => Self::Fast,
"slow" => Self::Slow,
"slowest" => Self::Slowest,
_ => Self::Default,
}
}
}
impl std::fmt::Display for ScannerSpeed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Fastest => write!(f, "fastest"),
Self::Fast => write!(f, "fast"),
Self::Default => write!(f, "default"),
Self::Slow => write!(f, "slow"),
Self::Slowest => write!(f, "slowest"),
}
}
}

View File

@@ -22,6 +22,7 @@ pub const ENV_OBS_ENDPOINT: &str = "RUSTFS_OBS_ENDPOINT";
pub const ENV_OBS_TRACE_ENDPOINT: &str = "RUSTFS_OBS_TRACE_ENDPOINT";
pub const ENV_OBS_METRIC_ENDPOINT: &str = "RUSTFS_OBS_METRIC_ENDPOINT";
pub const ENV_OBS_LOG_ENDPOINT: &str = "RUSTFS_OBS_LOG_ENDPOINT";
pub const ENV_OBS_PROFILING_ENDPOINT: &str = "RUSTFS_OBS_PROFILING_ENDPOINT";
pub const ENV_OBS_USE_STDOUT: &str = "RUSTFS_OBS_USE_STDOUT";
pub const ENV_OBS_SAMPLE_RATIO: &str = "RUSTFS_OBS_SAMPLE_RATIO";
pub const ENV_OBS_METER_INTERVAL: &str = "RUSTFS_OBS_METER_INTERVAL";
@@ -33,12 +34,12 @@ pub const ENV_OBS_ENVIRONMENT: &str = "RUSTFS_OBS_ENVIRONMENT";
pub const ENV_OBS_TRACES_EXPORT_ENABLED: &str = "RUSTFS_OBS_TRACES_EXPORT_ENABLED";
pub const ENV_OBS_METRICS_EXPORT_ENABLED: &str = "RUSTFS_OBS_METRICS_EXPORT_ENABLED";
pub const ENV_OBS_LOGS_EXPORT_ENABLED: &str = "RUSTFS_OBS_LOGS_EXPORT_ENABLED";
pub const ENV_OBS_PROFILING_EXPORT_ENABLED: &str = "RUSTFS_OBS_PROFILING_EXPORT_ENABLED";
pub const ENV_OBS_LOGGER_LEVEL: &str = "RUSTFS_OBS_LOGGER_LEVEL";
pub const ENV_OBS_LOG_STDOUT_ENABLED: &str = "RUSTFS_OBS_LOG_STDOUT_ENABLED";
pub const ENV_OBS_LOG_DIRECTORY: &str = "RUSTFS_OBS_LOG_DIRECTORY";
pub const ENV_OBS_LOG_FILENAME: &str = "RUSTFS_OBS_LOG_FILENAME";
pub const ENV_OBS_LOG_ROTATION_SIZE_MB: &str = "RUSTFS_OBS_LOG_ROTATION_SIZE_MB";
pub const ENV_OBS_LOG_ROTATION_TIME: &str = "RUSTFS_OBS_LOG_ROTATION_TIME";
pub const ENV_OBS_LOG_KEEP_FILES: &str = "RUSTFS_OBS_LOG_KEEP_FILES";
@@ -47,6 +48,12 @@ pub const ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_TOTAL_SIZ
pub const ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: &str = "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES";
pub const ENV_OBS_LOG_COMPRESS_OLD_FILES: &str = "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES";
pub const ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL";
pub const ENV_OBS_LOG_COMPRESSION_ALGORITHM: &str = "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM";
pub const ENV_OBS_LOG_PARALLEL_COMPRESS: &str = "RUSTFS_OBS_LOG_PARALLEL_COMPRESS";
pub const ENV_OBS_LOG_PARALLEL_WORKERS: &str = "RUSTFS_OBS_LOG_PARALLEL_WORKERS";
pub const ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL: &str = "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL";
pub const ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: &str = "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP";
pub const ENV_OBS_LOG_ZSTD_WORKERS: &str = "RUSTFS_OBS_LOG_ZSTD_WORKERS";
pub const ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: &str = "RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS";
pub const ENV_OBS_LOG_EXCLUDE_PATTERNS: &str = "RUSTFS_OBS_LOG_EXCLUDE_PATTERNS";
pub const ENV_OBS_LOG_DELETE_EMPTY_FILES: &str = "RUSTFS_OBS_LOG_DELETE_EMPTY_FILES";
@@ -60,13 +67,24 @@ pub const DEFAULT_OBS_LOG_MAX_TOTAL_SIZE_BYTES: u64 = 2 * 1024 * 1024 * 1024; //
pub const DEFAULT_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES: u64 = 0; // No single file limit
pub const DEFAULT_OBS_LOG_COMPRESS_OLD_FILES: bool = true;
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_LEVEL: u32 = 6;
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP: &str = "gzip";
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD: &str = "zstd";
pub const DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM: &str = DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD;
pub const DEFAULT_OBS_LOG_PARALLEL_COMPRESS: bool = true;
pub const DEFAULT_OBS_LOG_PARALLEL_WORKERS: usize = 6;
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_LEVEL: i32 = 8;
pub const DEFAULT_OBS_LOG_ZSTD_FALLBACK_TO_GZIP: bool = true;
pub const DEFAULT_OBS_LOG_ZSTD_WORKERS: usize = 1;
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION: &str = "gz";
pub const DEFAULT_OBS_LOG_GZIP_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_GZIP_COMPRESSION_EXTENSION);
pub const DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: u64 = 30;
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION: &str = "zst";
pub const DEFAULT_OBS_LOG_ZSTD_COMPRESSION_ALL_EXTENSION: &str = concat!(".", DEFAULT_OBS_LOG_ZSTD_COMPRESSION_EXTENSION);
pub const DEFAULT_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS: u64 = 30; // Retain compressed files for 30 days
pub const DEFAULT_OBS_LOG_DELETE_EMPTY_FILES: bool = true;
pub const DEFAULT_OBS_LOG_MIN_FILE_AGE_SECONDS: u64 = 3600; // 1 hour
pub const DEFAULT_OBS_LOG_CLEANUP_INTERVAL_SECONDS: u64 = 1800; // 0.5 hours
pub const DEFAULT_OBS_LOG_DRY_RUN: bool = false;
pub const DEFAULT_OBS_LOG_MATCH_MODE_PREFIX: &str = "prefix";
pub const DEFAULT_OBS_LOG_MATCH_MODE: &str = "suffix";
/// Default values for observability configuration
@@ -90,6 +108,7 @@ mod tests {
assert_eq!(ENV_OBS_TRACE_ENDPOINT, "RUSTFS_OBS_TRACE_ENDPOINT");
assert_eq!(ENV_OBS_METRIC_ENDPOINT, "RUSTFS_OBS_METRIC_ENDPOINT");
assert_eq!(ENV_OBS_LOG_ENDPOINT, "RUSTFS_OBS_LOG_ENDPOINT");
assert_eq!(ENV_OBS_PROFILING_ENDPOINT, "RUSTFS_OBS_PROFILING_ENDPOINT");
assert_eq!(ENV_OBS_USE_STDOUT, "RUSTFS_OBS_USE_STDOUT");
assert_eq!(ENV_OBS_SAMPLE_RATIO, "RUSTFS_OBS_SAMPLE_RATIO");
assert_eq!(ENV_OBS_METER_INTERVAL, "RUSTFS_OBS_METER_INTERVAL");
@@ -100,17 +119,23 @@ mod tests {
assert_eq!(ENV_OBS_LOG_STDOUT_ENABLED, "RUSTFS_OBS_LOG_STDOUT_ENABLED");
assert_eq!(ENV_OBS_LOG_DIRECTORY, "RUSTFS_OBS_LOG_DIRECTORY");
assert_eq!(ENV_OBS_LOG_FILENAME, "RUSTFS_OBS_LOG_FILENAME");
assert_eq!(ENV_OBS_LOG_ROTATION_SIZE_MB, "RUSTFS_OBS_LOG_ROTATION_SIZE_MB");
assert_eq!(ENV_OBS_LOG_ROTATION_TIME, "RUSTFS_OBS_LOG_ROTATION_TIME");
assert_eq!(ENV_OBS_LOG_KEEP_FILES, "RUSTFS_OBS_LOG_KEEP_FILES");
assert_eq!(ENV_OBS_TRACES_EXPORT_ENABLED, "RUSTFS_OBS_TRACES_EXPORT_ENABLED");
assert_eq!(ENV_OBS_METRICS_EXPORT_ENABLED, "RUSTFS_OBS_METRICS_EXPORT_ENABLED");
assert_eq!(ENV_OBS_LOGS_EXPORT_ENABLED, "RUSTFS_OBS_LOGS_EXPORT_ENABLED");
assert_eq!(ENV_OBS_PROFILING_EXPORT_ENABLED, "RUSTFS_OBS_PROFILING_EXPORT_ENABLED");
// Test log cleanup related env keys
assert_eq!(ENV_OBS_LOG_MAX_TOTAL_SIZE_BYTES, "RUSTFS_OBS_LOG_MAX_TOTAL_SIZE_BYTES");
assert_eq!(ENV_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES, "RUSTFS_OBS_LOG_MAX_SINGLE_FILE_SIZE_BYTES");
assert_eq!(ENV_OBS_LOG_COMPRESS_OLD_FILES, "RUSTFS_OBS_LOG_COMPRESS_OLD_FILES");
assert_eq!(ENV_OBS_LOG_GZIP_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_GZIP_COMPRESSION_LEVEL");
assert_eq!(ENV_OBS_LOG_COMPRESSION_ALGORITHM, "RUSTFS_OBS_LOG_COMPRESSION_ALGORITHM");
assert_eq!(ENV_OBS_LOG_PARALLEL_COMPRESS, "RUSTFS_OBS_LOG_PARALLEL_COMPRESS");
assert_eq!(ENV_OBS_LOG_PARALLEL_WORKERS, "RUSTFS_OBS_LOG_PARALLEL_WORKERS");
assert_eq!(ENV_OBS_LOG_ZSTD_COMPRESSION_LEVEL, "RUSTFS_OBS_LOG_ZSTD_COMPRESSION_LEVEL");
assert_eq!(ENV_OBS_LOG_ZSTD_FALLBACK_TO_GZIP, "RUSTFS_OBS_LOG_ZSTD_FALLBACK_TO_GZIP");
assert_eq!(ENV_OBS_LOG_ZSTD_WORKERS, "RUSTFS_OBS_LOG_ZSTD_WORKERS");
assert_eq!(
ENV_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS,
"RUSTFS_OBS_LOG_COMPRESSED_FILE_RETENTION_DAYS"
@@ -129,6 +154,10 @@ mod tests {
assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_STAGING, "staging");
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_GZIP, "gzip");
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM_ZSTD, "zstd");
assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE_PREFIX, "prefix");
assert_eq!(DEFAULT_OBS_LOG_MATCH_MODE, "suffix");
assert_eq!(DEFAULT_OBS_LOG_COMPRESSION_ALGORITHM, "zstd");
}
}

View File

@@ -233,15 +233,18 @@ impl<'a> fmt::Debug for Masked<'a> {
match self.0 {
None => Ok(()),
Some(s) => {
let len = s.len();
let len = s.chars().count();
if len == 0 {
Ok(())
} else if len == 1 {
write!(f, "***")
} else if len == 2 {
write!(f, "{}***|{}", &s[0..1], len)
let first = s.chars().next().ok_or(fmt::Error)?;
write!(f, "{}***|{}", first, len)
} else {
write!(f, "{}***{}|{}", &s[0..1], &s[len - 1..], len)
let first = s.chars().next().ok_or(fmt::Error)?;
let last = s.chars().last().ok_or(fmt::Error)?;
write!(f, "{}***{}|{}", first, last, len)
}
}
}
@@ -482,5 +485,10 @@ mod tests {
// Test longer string
assert_eq!(format!("{:?}", Masked(Some("secretpassword"))), "s***d|14");
// Test Unicode input should not panic and should keep character boundary
assert_eq!(format!("{:?}", Masked(Some(""))), "***");
assert_eq!(format!("{:?}", Masked(Some("中文"))), "中***|2");
assert_eq!(format!("{:?}", Masked(Some("中文测试"))), "中***试|4");
}
}

26
crates/e2e_test/AGENTS.md Normal file
View File

@@ -0,0 +1,26 @@
# E2E Test Crate Instructions
Applies to `crates/e2e_test/`.
## Test Reliability
- Keep end-to-end tests deterministic and environment-aware.
- Prefer readiness checks and explicit polling over fixed sleep-based timing assumptions.
- Ensure tests isolate resources and clean up temporary state.
## Environment Safety
- For local KMS-related E2E runs, keep proxy bypass settings:
- `NO_PROXY=127.0.0.1,localhost`
- clear `HTTP_PROXY` and `HTTPS_PROXY`
- Do not hardcode machine-specific paths or credentials.
## Scope and Cost
- Place exhaustive integration behavior here; keep unit behavior in source crates.
- Keep new E2E scenarios focused and avoid redundant overlap with existing suites.
## Suggested Validation
- `cargo test --package e2e_test`
- Full gate before commit: `make pre-commit`

View File

@@ -1,6 +1,6 @@
# Protocol E2E Tests
FTPS protocol end-to-end tests for RustFS.
FTPS and WebDAV protocol end-to-end tests for RustFS.
## Prerequisites
@@ -19,11 +19,21 @@ brew install sshpass openssh
## Running Tests
Run all protocol tests:
Run all protocol tests (FTPS + WebDAV):
```bash
RUSTFS_BUILD_FEATURES=ftps,webdav cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture
```
Run FTPS tests only:
```bash
RUSTFS_BUILD_FEATURES=ftps cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture
```
Run WebDAV tests only:
```bash
RUSTFS_BUILD_FEATURES=webdav cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture
```
## Test Coverage
### FTPS Tests
@@ -38,3 +48,13 @@ RUSTFS_BUILD_FEATURES=ftps cargo test --package e2e_test test_protocol_core_suit
- cdup
- rmdir delete bucket
### WebDAV Tests
- PROPFIND at root (list buckets)
- MKCOL (create bucket)
- PUT (upload file)
- GET (download file)
- PROPFIND on bucket (list objects)
- DELETE file
- DELETE bucket
- Authentication failure test

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol tests for FTPS
//! Protocol tests for FTPS and WebDAV
pub mod ftps_core;
pub mod test_env;
pub mod test_runner;
pub mod webdav_core;

View File

@@ -16,6 +16,7 @@
use crate::common::init_logging;
use crate::protocols::ftps_core::test_ftps_core_operations;
use crate::protocols::webdav_core::test_webdav_core_operations;
use std::time::Instant;
use tokio::time::{Duration, sleep};
use tracing::{error, info};
@@ -59,9 +60,14 @@ struct TestDefinition {
impl ProtocolTestSuite {
/// Create default test suite
pub fn new() -> Self {
let tests = vec![TestDefinition {
name: "test_ftps_core_operations".to_string(),
}];
let tests = vec![
TestDefinition {
name: "test_ftps_core_operations".to_string(),
},
TestDefinition {
name: "test_webdav_core_operations".to_string(),
},
];
Self { tests }
}
@@ -83,6 +89,10 @@ impl ProtocolTestSuite {
info!("=== Starting FTPS Module Test ===");
"FTPS core operations (put, ls, mkdir, rmdir, delete)"
}
"test_webdav_core_operations" => {
info!("=== Starting WebDAV Core Test ===");
"WebDAV core operations (MKCOL, PUT, GET, DELETE, PROPFIND)"
}
_ => "",
};
@@ -121,6 +131,7 @@ impl ProtocolTestSuite {
async fn run_single_test(&self, test_def: &TestDefinition) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match test_def.name.as_str() {
"test_ftps_core_operations" => test_ftps_core_operations().await.map_err(|e| e.into()),
"test_webdav_core_operations" => test_webdav_core_operations().await.map_err(|e| e.into()),
_ => Err(format!("Test {} not implemented", test_def.name).into()),
}
}

View File

@@ -0,0 +1,207 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Core WebDAV tests
use crate::common::rustfs_binary_path;
use crate::protocols::test_env::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, ProtocolTestEnvironment};
use anyhow::Result;
use base64::Engine;
use reqwest::Client;
use tokio::process::Command;
use tracing::info;
// Fixed WebDAV port for testing
const WEBDAV_PORT: u16 = 9080;
const WEBDAV_ADDRESS: &str = "127.0.0.1:9080";
/// Create HTTP client with basic auth
fn create_client() -> Client {
Client::builder()
.danger_accept_invalid_certs(true)
.build()
.expect("Failed to create HTTP client")
}
/// Get basic auth header value
fn basic_auth_header() -> String {
let credentials = format!("{}:{}", DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY);
let encoded = base64::engine::general_purpose::STANDARD.encode(credentials);
format!("Basic {}", encoded)
}
/// Test WebDAV: MKCOL (create bucket), PUT, GET, DELETE, PROPFIND operations
pub async fn test_webdav_core_operations() -> Result<()> {
let env = ProtocolTestEnvironment::new().map_err(|e| anyhow::anyhow!("{}", e))?;
// Start server manually
info!("Starting WebDAV server on {}", WEBDAV_ADDRESS);
let binary_path = rustfs_binary_path();
let mut server_process = Command::new(&binary_path)
.env("RUSTFS_WEBDAV_ENABLE", "true")
.env("RUSTFS_WEBDAV_ADDRESS", WEBDAV_ADDRESS)
.env("RUSTFS_WEBDAV_TLS_ENABLED", "false") // No TLS for testing
.arg(&env.temp_dir)
.spawn()?;
// Ensure server is cleaned up even on failure
let result = async {
// Wait for server to be ready
ProtocolTestEnvironment::wait_for_port_ready(WEBDAV_PORT, 30)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let client = create_client();
let auth_header = basic_auth_header();
let base_url = format!("http://{}", WEBDAV_ADDRESS);
// Test PROPFIND at root (list buckets)
info!("Testing WebDAV: PROPFIND at root (list buckets)");
let resp = client
.request(reqwest::Method::from_bytes(b"PROPFIND").unwrap(), &base_url)
.header("Authorization", &auth_header)
.header("Depth", "1")
.send()
.await?;
assert!(
resp.status().is_success() || resp.status().as_u16() == 207,
"PROPFIND at root should succeed, got: {}",
resp.status()
);
info!("PASS: PROPFIND at root successful");
// Test MKCOL (create bucket)
let bucket_name = "webdav-test-bucket";
info!("Testing WebDAV: MKCOL (create bucket '{}')", bucket_name);
let resp = client
.request(reqwest::Method::from_bytes(b"MKCOL").unwrap(), format!("{}/{}", base_url, bucket_name))
.header("Authorization", &auth_header)
.send()
.await?;
assert!(
resp.status().is_success() || resp.status().as_u16() == 201,
"MKCOL should succeed, got: {}",
resp.status()
);
info!("PASS: MKCOL bucket '{}' successful", bucket_name);
// Test PUT (upload file)
let filename = "test-file.txt";
let file_content = "Hello, WebDAV!";
info!("Testing WebDAV: PUT (upload file '{}')", filename);
let resp = client
.put(format!("{}/{}/{}", base_url, bucket_name, filename))
.header("Authorization", &auth_header)
.body(file_content)
.send()
.await?;
assert!(
resp.status().is_success() || resp.status().as_u16() == 201,
"PUT should succeed, got: {}",
resp.status()
);
info!("PASS: PUT file '{}' successful", filename);
// Test GET (download file)
info!("Testing WebDAV: GET (download file '{}')", filename);
let resp = client
.get(format!("{}/{}/{}", base_url, bucket_name, filename))
.header("Authorization", &auth_header)
.send()
.await?;
assert!(resp.status().is_success(), "GET should succeed, got: {}", resp.status());
let downloaded_content = resp.text().await?;
assert_eq!(downloaded_content, file_content, "Downloaded content should match uploaded content");
info!("PASS: GET file '{}' successful, content matches", filename);
// Test PROPFIND on bucket (list objects)
info!("Testing WebDAV: PROPFIND on bucket (list objects)");
let resp = client
.request(reqwest::Method::from_bytes(b"PROPFIND").unwrap(), format!("{}/{}", base_url, bucket_name))
.header("Authorization", &auth_header)
.header("Depth", "1")
.send()
.await?;
assert!(
resp.status().is_success() || resp.status().as_u16() == 207,
"PROPFIND on bucket should succeed, got: {}",
resp.status()
);
let body = resp.text().await?;
assert!(body.contains(filename), "File should appear in PROPFIND response");
info!("PASS: PROPFIND on bucket successful, file '{}' found", filename);
// Test DELETE file
info!("Testing WebDAV: DELETE file '{}'", filename);
let resp = client
.delete(format!("{}/{}/{}", base_url, bucket_name, filename))
.header("Authorization", &auth_header)
.send()
.await?;
assert!(
resp.status().is_success() || resp.status().as_u16() == 204,
"DELETE file should succeed, got: {}",
resp.status()
);
info!("PASS: DELETE file '{}' successful", filename);
// Verify file is deleted
info!("Testing WebDAV: Verify file is deleted");
let resp = client
.get(format!("{}/{}/{}", base_url, bucket_name, filename))
.header("Authorization", &auth_header)
.send()
.await?;
assert!(
resp.status().as_u16() == 404,
"GET deleted file should return 404, got: {}",
resp.status()
);
info!("PASS: Verified file '{}' is deleted", filename);
// Test DELETE bucket
info!("Testing WebDAV: DELETE bucket '{}'", bucket_name);
let resp = client
.delete(format!("{}/{}", base_url, bucket_name))
.header("Authorization", &auth_header)
.send()
.await?;
assert!(
resp.status().is_success() || resp.status().as_u16() == 204,
"DELETE bucket should succeed, got: {}",
resp.status()
);
info!("PASS: DELETE bucket '{}' successful", bucket_name);
// Test authentication failure
info!("Testing WebDAV: Authentication failure");
let resp = client
.request(reqwest::Method::from_bytes(b"PROPFIND").unwrap(), &base_url)
.header("Authorization", "Basic aW52YWxpZDppbnZhbGlk") // invalid:invalid
.send()
.await?;
assert_eq!(resp.status().as_u16(), 401, "Invalid auth should return 401, got: {}", resp.status());
info!("PASS: Authentication failure test successful");
info!("WebDAV core tests passed");
Ok(())
}
.await;
// Always cleanup server process
let _ = server_process.kill().await;
let _ = server_process.wait().await;
result
}

28
crates/ecstore/AGENTS.md Normal file
View File

@@ -0,0 +1,28 @@
# ECStore Crate Instructions
Applies to `crates/ecstore/`.
## Data Durability and Integrity
- Do not weaken quorum checks, bitrot checks, or metadata validation paths.
- Treat any change affecting read/write/repair correctness as high risk and test accordingly.
- Prefer explicit failure over silent data corruption or implicit success.
## Performance-Critical Paths
- Be careful with allocations and locking in hot paths.
- Keep network and disk operations async-friendly; avoid introducing unnecessary blocking.
- Benchmark-sensitive changes should include measurable rationale.
## Cross-Module Coordination
- Validate behavior impacts on:
- `rustfs/src/storage/`
- `crates/filemeta/`
- `crates/heal/`
- `crates/checksums/`
## Suggested Validation
- `cargo test -p rustfs-ecstore`
- Full gate before commit: `make pre-commit`

View File

@@ -44,6 +44,7 @@ rustfs-credentials = { workspace = true }
rustfs-common.workspace = true
rustfs-policy.workspace = true
rustfs-protos.workspace = true
rustfs-s3-common = { workspace = true }
async-trait.workspace = true
bytes.workspace = true
byteorder = { workspace = true }

View File

@@ -39,6 +39,7 @@ pub async fn create_bitrot_reader(
length: usize,
shard_size: usize,
checksum_algo: HashAlgorithm,
skip_verify: bool,
) -> disk::error::Result<Option<BitrotReader<Box<dyn AsyncRead + Send + Sync + Unpin>>>> {
// Calculate the total length to read, including the checksum overhead
let length = length.div_ceil(shard_size) * checksum_algo.size() + length;
@@ -47,13 +48,18 @@ pub async fn create_bitrot_reader(
// Use inline data
let mut rd = Cursor::new(data.to_vec());
rd.set_position(offset as u64);
let reader = BitrotReader::new(Box::new(rd) as Box<dyn AsyncRead + Send + Sync + Unpin>, shard_size, checksum_algo);
let reader = BitrotReader::new(
Box::new(rd) as Box<dyn AsyncRead + Send + Sync + Unpin>,
shard_size,
checksum_algo,
skip_verify,
);
Ok(Some(reader))
} else if let Some(disk) = disk {
// Read from disk
match disk.read_file_stream(bucket, path, offset, length - offset).await {
Ok(rd) => {
let reader = BitrotReader::new(rd, shard_size, checksum_algo);
let reader = BitrotReader::new(rd, shard_size, checksum_algo, skip_verify);
Ok(Some(reader))
}
Err(e) => Err(e),
@@ -116,7 +122,7 @@ mod tests {
let checksum_algo = HashAlgorithm::HighwayHash256;
let result =
create_bitrot_reader(Some(test_data), None, "test-bucket", "test-path", 0, 0, shard_size, checksum_algo).await;
create_bitrot_reader(Some(test_data), None, "test-bucket", "test-path", 0, 0, shard_size, checksum_algo, false).await;
assert!(result.is_ok());
assert!(result.unwrap().is_some());
@@ -127,7 +133,8 @@ mod tests {
let shard_size = 16;
let checksum_algo = HashAlgorithm::HighwayHash256;
let result = create_bitrot_reader(None, None, "test-bucket", "test-path", 0, 1024, shard_size, checksum_algo).await;
let result =
create_bitrot_reader(None, None, "test-bucket", "test-path", 0, 1024, shard_size, checksum_algo, false).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());

View File

@@ -14,11 +14,16 @@
use crate::bucket::bandwidth::reader::BucketOptions;
use ratelimit::{Error as RatelimitError, Ratelimiter};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::warn;
/// BETA_BUCKET is the weight used to calculate exponential moving average
const BETA_BUCKET: f64 = 0.1;
#[derive(Clone)]
pub struct BucketThrottle {
limiter: Arc<Mutex<Ratelimiter>>,
@@ -71,25 +76,202 @@ impl BucketThrottle {
}
}
#[derive(Debug)]
pub struct BucketMeasurement {
bytes_since_last_window: AtomicU64,
start_time: Mutex<Option<Instant>>,
exp_moving_avg: Mutex<f64>,
}
impl BucketMeasurement {
pub fn new(init_time: Instant) -> Self {
Self {
bytes_since_last_window: AtomicU64::new(0),
start_time: Mutex::new(Some(init_time)),
exp_moving_avg: Mutex::new(0.0),
}
}
pub fn increment_bytes(&self, bytes: u64) {
self.bytes_since_last_window.fetch_add(bytes, Ordering::Relaxed);
}
pub fn update_exponential_moving_average(&self, end_time: Instant) {
let mut start_time = self.start_time.lock().unwrap_or_else(|e| {
warn!("bucket measurement start_time mutex poisoned, recovering");
e.into_inner()
});
let previous_start = *start_time;
*start_time = Some(end_time);
let Some(prev_start) = previous_start else {
return;
};
if prev_start > end_time {
return;
}
let duration = end_time.duration_since(prev_start);
if duration.is_zero() {
return;
}
let bytes_since_last_window = self.bytes_since_last_window.swap(0, Ordering::Relaxed);
let increment = bytes_since_last_window as f64 / duration.as_secs_f64();
let mut exp_moving_avg = self.exp_moving_avg.lock().unwrap_or_else(|e| {
warn!("bucket measurement exp_moving_avg mutex poisoned, recovering");
e.into_inner()
});
if *exp_moving_avg == 0.0 {
*exp_moving_avg = increment;
return;
}
*exp_moving_avg = exponential_moving_average(BETA_BUCKET, *exp_moving_avg, increment);
}
pub fn get_exp_moving_avg_bytes_per_second(&self) -> f64 {
*self.exp_moving_avg.lock().unwrap_or_else(|e| {
warn!("bucket measurement exp_moving_avg mutex poisoned, recovering");
e.into_inner()
})
}
}
fn exponential_moving_average(beta: f64, previous_avg: f64, increment_avg: f64) -> f64 {
(1f64 - beta) * increment_avg + beta * previous_avg
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BandwidthDetails {
pub limit_bytes_per_sec: i64,
pub current_bandwidth_bytes_per_sec: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketBandwidthReport {
pub bucket_stats: HashMap<BucketOptions, BandwidthDetails>,
}
pub struct Monitor {
t_lock: RwLock<HashMap<BucketOptions, BucketThrottle>>,
m_lock: RwLock<HashMap<BucketOptions, BucketMeasurement>>,
pub node_count: u64,
}
impl Monitor {
pub fn new(num_nodes: u64) -> Arc<Self> {
let node_cnt = num_nodes.max(1);
Arc::new(Monitor {
let m = Arc::new(Monitor {
t_lock: RwLock::new(HashMap::new()),
m_lock: RwLock::new(HashMap::new()),
node_count: node_cnt,
})
});
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let weak = Arc::downgrade(&m);
handle.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(2));
loop {
interval.tick().await;
let Some(monitor) = weak.upgrade() else { break };
monitor.update_moving_avg();
}
});
}
m
}
pub fn init_measurement(&self, opts: &BucketOptions) {
let mut guard = self.m_lock.write().unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock write poisoned, recovering");
e.into_inner()
});
guard
.entry(opts.clone())
.or_insert_with(|| BucketMeasurement::new(Instant::now()));
}
pub fn update_measurement(&self, opts: &BucketOptions, bytes: u64) {
{
let guard = self.m_lock.read().unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock read poisoned, recovering");
e.into_inner()
});
if let Some(measurement) = guard.get(opts) {
measurement.increment_bytes(bytes);
return;
}
}
// Miss path: write lock + insert once, then increment.
let mut guard = self.m_lock.write().unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock write poisoned, recovering");
e.into_inner()
});
// Double-check after lock upgrade in case another thread inserted it.
let measurement = guard
.entry(opts.clone())
.or_insert_with(|| BucketMeasurement::new(Instant::now()));
measurement.increment_bytes(bytes);
}
pub fn update_moving_avg(&self) {
let now = Instant::now();
let guard = self.m_lock.read().unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock read poisoned, recovering");
e.into_inner()
});
for measurement in guard.values() {
measurement.update_exponential_moving_average(now);
}
}
pub fn get_report(&self, select_bucket: impl Fn(&str) -> bool) -> BucketBandwidthReport {
let t_guard = self.t_lock.read().unwrap_or_else(|e| {
warn!("bucket monitor throttle rwlock read poisoned, recovering");
e.into_inner()
});
let m_guard = self.m_lock.read().unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock read poisoned, recovering");
e.into_inner()
});
let mut bucket_stats = HashMap::new();
for (opts, throttle) in t_guard.iter() {
if !select_bucket(&opts.name) {
continue;
}
let mut current_bandwidth_bytes_per_sec = 0.0;
if let Some(measurement) = m_guard.get(opts) {
current_bandwidth_bytes_per_sec = measurement.get_exp_moving_avg_bytes_per_second();
}
bucket_stats.insert(
opts.clone(),
BandwidthDetails {
limit_bytes_per_sec: throttle.node_bandwidth_per_sec * self.node_count as i64,
current_bandwidth_bytes_per_sec,
},
);
}
BucketBandwidthReport { bucket_stats }
}
pub fn delete_bucket(&self, bucket: &str) {
self.t_lock
.write()
.unwrap_or_else(|e| {
warn!("bucket monitor rwlock write poisoned, recovering");
warn!("bucket monitor throttle rwlock write poisoned, recovering");
e.into_inner()
})
.retain(|opts, _| opts.name != bucket);
self.m_lock
.write()
.unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock write poisoned, recovering");
e.into_inner()
})
.retain(|opts, _| opts.name != bucket);
@@ -103,7 +285,14 @@ impl Monitor {
self.t_lock
.write()
.unwrap_or_else(|e| {
warn!("bucket monitor rwlock write poisoned, recovering");
warn!("bucket monitor throttle rwlock write poisoned, recovering");
e.into_inner()
})
.remove(&opts);
self.m_lock
.write()
.unwrap_or_else(|e| {
warn!("bucket monitor measurement rwlock write poisoned, recovering");
e.into_inner()
})
.remove(&opts);
@@ -113,7 +302,7 @@ impl Monitor {
self.t_lock
.read()
.unwrap_or_else(|e| {
warn!("bucket monitor rwlock read poisoned, recovering");
warn!("bucket monitor throttle rwlock read poisoned, recovering");
e.into_inner()
})
.get(opts)
@@ -160,7 +349,7 @@ impl Monitor {
self.t_lock
.write()
.unwrap_or_else(|e| {
warn!("bucket monitor rwlock write poisoned, recovering");
warn!("bucket monitor throttle rwlock write poisoned, recovering");
e.into_inner()
})
.insert(opts, throttle);
@@ -174,7 +363,7 @@ impl Monitor {
self.t_lock
.read()
.unwrap_or_else(|e| {
warn!("bucket monitor rwlock read poisoned, recovering");
warn!("bucket monitor throttle rwlock read poisoned, recovering");
e.into_inner()
})
.contains_key(&opt)
@@ -184,6 +373,7 @@ impl Monitor {
#[cfg(test)]
mod tests {
use super::*;
use std::panic::{AssertUnwindSafe, catch_unwind};
#[test]
fn test_set_and_get_throttle_with_node_split() {
@@ -318,4 +508,82 @@ mod tests {
assert_eq!(t2.burst(), 500);
assert_eq!(t2.node_bandwidth_per_sec, 500);
}
#[test]
fn test_bucket_measurement_recovers_from_poisoned_mutexes() {
let measurement = BucketMeasurement::new(Instant::now());
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = measurement.start_time.lock().unwrap();
panic!("poison start_time mutex");
}));
measurement.increment_bytes(64);
measurement.update_exponential_moving_average(Instant::now() + Duration::from_secs(1));
let _ = catch_unwind(AssertUnwindSafe(|| {
let _guard = measurement.exp_moving_avg.lock().unwrap();
panic!("poison exp_moving_avg mutex");
}));
measurement.increment_bytes(32);
measurement.update_exponential_moving_average(Instant::now() + Duration::from_secs(2));
let value = measurement.get_exp_moving_avg_bytes_per_second();
assert!(value.is_finite());
assert!(value >= 0.0);
}
#[test]
fn test_get_report_limit_and_current_bandwidth_after_measurement() {
let monitor = Monitor::new(4);
monitor.set_bandwidth_limit("b1", "arn1", 400);
let opts = BucketOptions {
name: "b1".to_string(),
replication_arn: "arn1".to_string(),
};
monitor.init_measurement(&opts);
monitor.update_measurement(&opts, 500);
monitor.update_measurement(&opts, 500);
std::thread::sleep(Duration::from_millis(110));
monitor.update_moving_avg();
let report = monitor.get_report(|name| name == "b1");
let details = report.bucket_stats.get(&opts).expect("report should contain b1/arn1");
assert_eq!(details.limit_bytes_per_sec, 400);
assert!(
details.current_bandwidth_bytes_per_sec > 0.0,
"current_bandwidth should be positive after update_measurement and update_moving_avg"
);
assert!(
details.current_bandwidth_bytes_per_sec < 20000.0,
"current_bandwidth should be in reasonable range"
);
}
#[test]
fn test_get_report_select_bucket_filters() {
let monitor = Monitor::new(2);
monitor.set_bandwidth_limit("b1", "arn1", 100);
monitor.set_bandwidth_limit("b2", "arn2", 200);
let opts_b1 = BucketOptions {
name: "b1".to_string(),
replication_arn: "arn1".to_string(),
};
let opts_b2 = BucketOptions {
name: "b2".to_string(),
replication_arn: "arn2".to_string(),
};
monitor.init_measurement(&opts_b1);
monitor.init_measurement(&opts_b2);
let report_all = monitor.get_report(|_| true);
assert_eq!(report_all.bucket_stats.len(), 2);
let report_b1 = monitor.get_report(|name| name == "b1");
assert_eq!(report_b1.bucket_stats.len(), 1);
assert_eq!(report_b1.bucket_stats.get(&opts_b1).unwrap().limit_bytes_per_sec, 100);
let report_b2 = monitor.get_report(|name| name == "b2");
assert_eq!(report_b2.bucket_stats.len(), 1);
assert_eq!(report_b2.bucket_stats.get(&opts_b2).unwrap().limit_bytes_per_sec, 200);
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use crate::bucket::bandwidth::monitor::Monitor;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -20,7 +21,7 @@ use tokio::io::{AsyncRead, ReadBuf};
use tokio::time::Sleep;
use tracing::{debug, warn};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BucketOptions {
pub name: String,
pub replication_arn: String,
@@ -54,6 +55,9 @@ impl<R> MonitoredReader<R> {
header_size = opts.header_size,
"MonitoredReader created"
);
if throttle.is_some() {
m.init_measurement(&opts.bucket_options);
}
MonitoredReader {
r,
m,
@@ -117,7 +121,15 @@ impl<R: AsyncRead + Unpin> AsyncRead for MonitoredReader<R> {
}
}
poll_limited_read(&mut this.r, cx, buf, need, &mut this.temp_buf)
let filled_before = buf.filled().len();
let result = poll_limited_read(&mut this.r, cx, buf, need, &mut this.temp_buf);
if let Poll::Ready(Ok(())) = result {
let read_bytes = buf.filled().len().saturating_sub(filled_before) as u64;
if read_bytes > 0 {
this.m.update_measurement(&this.opts.bucket_options, read_bytes);
}
}
result
}
}

View File

@@ -804,7 +804,7 @@ impl BucketTargetSys {
}
async fn build_aws_s3_http_client_from_tls_path() -> Option<aws_sdk_s3::config::SharedHttpClient> {
let tls_path = std::env::var("RUSTFS_TLS_PATH").ok()?;
let tls_path = rustfs_utils::get_env_str(rustfs_config::ENV_RUSTFS_TLS_PATH, rustfs_config::DEFAULT_RUSTFS_TLS_PATH);
if tls_path.is_empty() {
return None;
}
@@ -862,7 +862,6 @@ fn should_force_path_style(target: &BucketTarget) -> bool {
// Explicit path-style or legacy boolean-like values.
"path" | "on" | "true" => true,
// `auto` and empty are defaulted to path-style for custom S3-compatible endpoints.
// RustFS/MinIO-style deployments typically do not configure virtual-hosted-style routing.
"auto" | "" => true,
// Unknown values: prefer compatibility with S3-compatible services.
_ => true,

View File

@@ -28,7 +28,6 @@ use crate::client::object_api_utils::new_getobjectreader;
use crate::error::Error;
use crate::error::StorageError;
use crate::error::{error_resp_to_object_err, is_err_object_not_found, is_err_version_not_found, is_network_or_host_down};
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id};
@@ -45,6 +44,7 @@ use rustfs_common::data_usage::TierStats;
use rustfs_common::heal_channel::rep_has_active_rules;
use rustfs_common::metrics::{IlmAction, Metrics};
use rustfs_filemeta::{NULL_VERSION_ID, RestoreStatusOps, is_restored_object_on_disk};
use rustfs_s3_common::EventName;
use rustfs_utils::path::encode_dir_object;
use rustfs_utils::string::strings_has_prefix_fold;
use s3s::Body;
@@ -471,7 +471,7 @@ impl TransitionState {
}
pub async fn init(api: Arc<ECStore>) {
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
let max_workers = env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
@@ -569,14 +569,14 @@ impl TransitionState {
pub async fn update_workers_inner(api: Arc<ECStore>, n: i64) {
let mut n = n;
if n == 0 {
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
let max_workers = env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
n = max_workers;
}
// Allow environment override of maximum workers
let absolute_max = std::env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
let absolute_max = env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(32);
@@ -603,7 +603,7 @@ impl TransitionState {
}
pub async fn init_background_expiry(api: Arc<ECStore>) {
let mut workers = std::env::var("RUSTFS_MAX_EXPIRY_WORKERS")
let mut workers = env::var("RUSTFS_MAX_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get(), 16));
@@ -615,7 +615,7 @@ pub async fn init_background_expiry(api: Arc<ECStore>) {
}
if workers == 0 {
workers = std::env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
workers = env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(8);
@@ -689,13 +689,13 @@ pub async fn expire_transitioned_object(
//let tags = LcAuditEvent::new(src, lcEvent).Tags();
if lc_event.action == IlmAction::DeleteRestoredAction {
opts.transition.expire_restored = true;
match api.delete_object(&oi.bucket, &oi.name, opts).await {
return match api.delete_object(&oi.bucket, &oi.name, opts).await {
Ok(dobj) => {
//audit_log_lifecycle(*oi, ILMExpiry, tags, traceFn);
return Ok(dobj);
Ok(dobj)
}
Err(err) => return Err(std::io::Error::other(err)),
}
Err(err) => Err(std::io::Error::other(err)),
};
}
let ret = delete_object_from_remote_tier(
@@ -732,7 +732,7 @@ pub async fn expire_transitioned_object(
..Default::default()
};
send_event(EventArgs {
event_name: event_name.as_ref().to_string(),
event_name: event_name.to_string(),
bucket_name: obj_info.bucket.clone(),
object: obj_info,
user_agent: "Internal: [ILM-Expiry]".to_string(),
@@ -847,8 +847,8 @@ pub async fn post_restore_opts(version_id: &str, bucket: &str, object: &str) ->
}
}
Ok(ObjectOptions {
versioned: versioned,
version_suspended: version_suspended,
versioned,
version_suspended,
version_id: Some(vid.to_string()),
..Default::default()
})
@@ -1033,12 +1033,12 @@ pub async fn eval_action_from_lifecycle(
let lock_enabled = if let Some(lr) = lr { lr.mode.is_some() } else { false };
match event.action {
lifecycle::IlmAction::DeleteAllVersionsAction | lifecycle::IlmAction::DelMarkerDeleteAllVersionsAction => {
IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => {
if lock_enabled {
return lifecycle::Event::default();
}
}
lifecycle::IlmAction::DeleteVersionAction | lifecycle::IlmAction::DeleteRestoredVersionAction => {
IlmAction::DeleteVersionAction | IlmAction::DeleteRestoredVersionAction => {
if oi.version_id.is_none() {
return lifecycle::Event::default();
}
@@ -1139,12 +1139,12 @@ pub async fn apply_expiry_on_non_transitioned_objects(
event_name = EventName::ObjectRemovedDeleteMarkerCreated;
}
match lc_event.action {
lifecycle::IlmAction::DeleteAllVersionsAction => event_name = EventName::ObjectRemovedDeleteAllVersions,
lifecycle::IlmAction::DelMarkerDeleteAllVersionsAction => event_name = EventName::ILMDelMarkerExpirationDelete,
IlmAction::DeleteAllVersionsAction => event_name = EventName::ObjectRemovedDeleteAllVersions,
IlmAction::DelMarkerDeleteAllVersionsAction => event_name = EventName::LifecycleDelMarkerExpirationDelete,
_ => (),
}
send_event(EventArgs {
event_name: event_name.as_ref().to_string(),
event_name: event_name.to_string(),
bucket_name: dobj.bucket.clone(),
object: dobj,
user_agent: "Internal: [ILM-Expiry]".to_string(),
@@ -1152,7 +1152,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
..Default::default()
});
if lc_event.action != lifecycle::IlmAction::NoneAction {
if lc_event.action != IlmAction::NoneAction {
let mut num_versions = 1_u64;
if lc_event.action.delete_all() {
num_versions = oi.num_versions as u64;
@@ -1172,15 +1172,15 @@ pub async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &
pub async fn apply_lifecycle_action(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
let mut success = false;
match event.action {
lifecycle::IlmAction::DeleteVersionAction
| lifecycle::IlmAction::DeleteAction
| lifecycle::IlmAction::DeleteRestoredAction
| lifecycle::IlmAction::DeleteRestoredVersionAction
| lifecycle::IlmAction::DeleteAllVersionsAction
| lifecycle::IlmAction::DelMarkerDeleteAllVersionsAction => {
IlmAction::DeleteVersionAction
| IlmAction::DeleteAction
| IlmAction::DeleteRestoredAction
| IlmAction::DeleteRestoredVersionAction
| IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction => {
success = apply_expiry_rule(event, src, oi).await;
}
lifecycle::IlmAction::TransitionAction | lifecycle::IlmAction::TransitionVersionAction => {
IlmAction::TransitionAction | IlmAction::TransitionVersionAction => {
success = apply_transition_rule(event, src, oi).await;
}
_ => (),

View File

@@ -20,8 +20,8 @@
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition, TransitionStorageClass,
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, LifecycleRuleAndOperator,
NoncurrentVersionTransition, ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition, TransitionStorageClass,
};
use std::cmp::Ordering;
use std::collections::HashMap;
@@ -134,6 +134,25 @@ impl RuleValidate for LifecycleRule {
}
}
fn lifecycle_rule_prefix(rule: &LifecycleRule) -> Option<&str> {
// Prefer a non-empty legacy prefix; treat an empty legacy prefix as if it were not set
if let Some(p) = rule.prefix.as_deref() {
if !p.is_empty() {
return Some(p);
}
}
let Some(filter) = rule.filter.as_ref() else {
return None;
};
if let Some(p) = filter.prefix.as_deref() {
return Some(p);
}
filter.and.as_ref().and_then(|and| and.prefix.as_deref())
}
#[async_trait::async_trait]
pub trait Lifecycle {
async fn has_transition(&self) -> bool;
@@ -177,8 +196,11 @@ impl Lifecycle for BucketLifecycleConfiguration {
continue;
}
let rule_prefix = &rule.prefix.clone().unwrap_or_default();
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
let rule_prefix = lifecycle_rule_prefix(rule).unwrap_or("");
if !prefix.is_empty()
&& !rule_prefix.is_empty()
&& !prefix.starts_with(rule_prefix)
&& !rule_prefix.starts_with(prefix)
{
continue;
}
@@ -297,8 +319,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
if rule.status.as_str() == ExpirationStatus::DISABLED {
continue;
}
if let Some(prefix) = rule.prefix.clone() {
if !obj.name.starts_with(prefix.as_str()) {
if let Some(rule_prefix) = lifecycle_rule_prefix(rule) {
if !obj.name.starts_with(rule_prefix) {
continue;
}
}
@@ -414,55 +436,22 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(ref lc_rules) = self.filter_rules(obj).await {
for rule in lc_rules.iter() {
if obj.expired_object_deletemarker() {
if obj.is_latest && obj.expired_object_deletemarker() {
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(mod_time, days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
}
}
}
if obj.is_latest {
if let Some(ref expiration) = rule.expiration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
if expiration.expired_object_delete_marker.is_some_and(|v| v) {
if let Some(due) = expiration.next_due(obj) {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
// Stop after scheduling an expired delete-marker event.
break;
}
continue;
}
}
}
@@ -694,8 +683,16 @@ impl LifecycleCalculate for LifecycleExpiration {
if !obj.is_latest || !obj.delete_marker {
return None;
}
// Check date first (date-based expiration takes priority over days).
// A zero unix timestamp means "not set" (default value) and is skipped.
if let Some(ref date) = self.date {
let expiry_date = OffsetDateTime::from(date.clone());
if expiry_date.unix_timestamp() != 0 {
return Some(expiry_date);
}
}
match self.days {
Some(days) => Some(expected_expiry_time(obj.mod_time.unwrap(), days)),
Some(days) => obj.mod_time.map(|mod_time| expected_expiry_time(mod_time, days)),
None => None,
}
}
@@ -860,6 +857,7 @@ impl Default for TransitionOptions {
#[cfg(test)]
mod tests {
use super::*;
use s3s::dto::LifecycleRuleFilter;
#[tokio::test]
async fn validate_rejects_non_positive_expiration_days() {
@@ -1074,4 +1072,208 @@ mod tests {
assert_eq!(err.to_string(), ERR_LIFECYCLE_INVALID_RULE_STATUS);
}
#[tokio::test]
async fn filter_rules_respects_filter_prefix() {
let mut filter = LifecycleRuleFilter::default();
filter.prefix = Some("prefix".to_string());
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(30),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: Some(filter),
id: Some("rule".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let match_obj = ObjectOpts {
name: "prefix/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let matched = lc.filter_rules(&match_obj).await.unwrap();
assert_eq!(matched.len(), 1);
let non_match_obj = ObjectOpts {
name: "other/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let not_matched = lc.filter_rules(&non_match_obj).await.unwrap();
assert_eq!(not_matched.len(), 0);
}
#[tokio::test]
async fn filter_rules_respects_filter_and_prefix() {
let mut filter = LifecycleRuleFilter::default();
let mut and = LifecycleRuleAndOperator::default();
and.prefix = Some("prefix".to_string());
filter.and = Some(and);
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(30),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: Some(filter),
id: Some("rule-and-prefix".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let match_obj = ObjectOpts {
name: "prefix/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let matched = lc.filter_rules(&match_obj).await.unwrap();
assert_eq!(matched.len(), 1);
let non_match_obj = ObjectOpts {
name: "other/file".to_string(),
mod_time: Some(OffsetDateTime::from_unix_timestamp(1_000_000).unwrap()),
is_latest: true,
..Default::default()
};
let not_matched = lc.filter_rules(&non_match_obj).await.unwrap();
assert_eq!(not_matched.len(), 0);
}
#[tokio::test]
async fn expired_object_delete_marker_requires_single_version() {
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(1),
expired_object_delete_marker: Some(true),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: None,
id: Some("rule-expired-del-marker".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let opts = ObjectOpts {
name: "obj".to_string(),
mod_time: Some(base_time),
is_latest: true,
delete_marker: true,
num_versions: 2,
version_id: Some(Uuid::new_v4()),
..Default::default()
};
let now = base_time + Duration::days(2);
let event = lc.eval_inner(&opts, now, 0).await;
assert_eq!(event.action, IlmAction::NoneAction);
}
#[tokio::test]
async fn expired_object_delete_marker_deletes_only_delete_marker_after_due() {
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
days: Some(1),
expired_object_delete_marker: Some(true),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: None,
id: Some("rule-expired-del-marker".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let opts = ObjectOpts {
name: "obj".to_string(),
mod_time: Some(base_time),
is_latest: true,
delete_marker: true,
num_versions: 1,
version_id: Some(Uuid::new_v4()),
..Default::default()
};
let now = base_time + Duration::days(2);
let event = lc.eval_inner(&opts, now, 0).await;
assert_eq!(event.action, IlmAction::DeleteVersionAction);
assert_eq!(event.due, Some(expected_expiry_time(base_time, 1)));
}
#[tokio::test]
async fn expired_object_delete_marker_date_based_not_yet_due() {
// A date-based rule that has not yet reached its expiry date must not
// trigger immediate deletion (unwrap_or(now) must not override the date).
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
let future_date = base_time + Duration::days(10);
let lc = BucketLifecycleConfiguration {
rules: vec![LifecycleRule {
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
expiration: Some(LifecycleExpiration {
date: Some(future_date.into()),
expired_object_delete_marker: Some(true),
..Default::default()
}),
abort_incomplete_multipart_upload: None,
filter: None,
id: Some("rule-date-del-marker".to_string()),
noncurrent_version_expiration: None,
noncurrent_version_transitions: None,
prefix: None,
transitions: None,
}],
};
let opts = ObjectOpts {
name: "obj".to_string(),
mod_time: Some(base_time),
is_latest: true,
delete_marker: true,
num_versions: 1,
version_id: Some(Uuid::new_v4()),
..Default::default()
};
// now is before the configured date — must not schedule deletion
let now_before = base_time + Duration::days(5);
let event_before = lc.eval_inner(&opts, now_before, 0).await;
assert_eq!(event_before.action, IlmAction::NoneAction);
// now is after the configured date — must schedule deletion
let now_after = base_time + Duration::days(11);
let event_after = lc.eval_inner(&opts, now_after, 0).await;
assert_eq!(event_after.action, IlmAction::DeleteVersionAction);
assert_eq!(event_after.due, Some(future_date));
}
}

View File

@@ -372,10 +372,17 @@ impl BucketMetadata {
Ok(())
}
fn parse_all_configs(&mut self, _api: Arc<ECStore>) -> Result<()> {
fn parse_policy_config(&mut self) -> Result<()> {
if !self.policy_config_json.is_empty() {
self.policy_config = Some(serde_json::from_slice(&self.policy_config_json)?);
} else {
self.policy_config = None;
}
Ok(())
}
fn parse_all_configs(&mut self, _api: Arc<ECStore>) -> Result<()> {
self.parse_policy_config()?;
if !self.notification_config_xml.is_empty() {
self.notification_config = Some(deserialize::<NotificationConfiguration>(&self.notification_config_xml)?);
}
@@ -666,4 +673,21 @@ mod test {
println!(" - Lifecycle config size: {} bytes", deserialized_bm.lifecycle_config_xml.len());
println!(" - Serialized buffer size: {} bytes", buf.len());
}
/// After policy deletion (policy_config_json cleared), parse_policy_config sets policy_config to None.
#[test]
fn test_parse_policy_config_clears_cache_when_json_empty() {
let policy_json = r#"{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"s3:GetObject","Resource":"arn:aws:s3:::b/*"}]}"#;
let mut bm = BucketMetadata::new("b");
bm.policy_config_json = policy_json.as_bytes().to_vec();
bm.parse_policy_config().unwrap();
assert!(bm.policy_config.is_some(), "policy_config should be set when JSON non-empty");
bm.policy_config_json.clear();
bm.parse_policy_config().unwrap();
assert!(
bm.policy_config.is_none(),
"policy_config should be None after JSON cleared (e.g. policy deleted)"
);
}
}

View File

@@ -23,4 +23,5 @@ pub use config::*;
pub use datatypes::*;
pub use replication_pool::*;
pub use replication_resyncer::*;
pub use replication_state::BucketStats;
pub use rule::*;

View File

@@ -56,8 +56,7 @@ use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use tracing::{info, instrument, warn};
// Worker limits
pub const WORKER_MAX_LIMIT: usize = 500;
@@ -796,6 +795,7 @@ impl<S: StorageAPI> ReplicationPool<S> {
}
/// Load bucket replication resync statuses into memory
#[instrument(skip(cancellation_token))]
async fn load_resync(self: Arc<Self>, buckets: &[String], cancellation_token: CancellationToken) -> Result<(), EcstoreError> {
// TODO: add leader_lock
// Make sure only one node running resync on the cluster

View File

@@ -18,6 +18,7 @@ use crate::bucket::bucket_target_sys::{
};
use crate::bucket::metadata_sys;
use crate::bucket::replication::ResyncStatusType;
use crate::bucket::replication::replication_pool::GLOBAL_REPLICATION_STATS;
use crate::bucket::replication::{ObjectOpts, ReplicationConfigurationExt as _};
use crate::bucket::tagging::decode_tags_to_map;
use crate::bucket::target::BucketTargets;
@@ -26,7 +27,6 @@ use crate::client::api_get_options::{AdvancedGetOptions, StatObjectOptions};
use crate::config::com::save_config;
use crate::disk::BUCKET_META_PREFIX;
use crate::error::{Error, Result, is_err_object_not_found, is_err_version_not_found};
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::global::get_global_bucket_monitor;
@@ -41,6 +41,10 @@ use aws_smithy_types::body::SdkBody;
use byteorder::ByteOrder;
use futures::future::join_all;
use futures::stream::StreamExt;
use headers::{
AMZ_OBJECT_LOCK_LEGAL_HOLD, AMZ_OBJECT_LOCK_MODE, AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, AMZ_SERVER_SIDE_ENCRYPTION,
AMZ_STORAGE_CLASS, AMZ_TAG_COUNT, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LANGUAGE, CONTENT_TYPE,
};
use http::HeaderMap;
use http_body::Frame;
use http_body_util::StreamBody;
@@ -51,6 +55,7 @@ use rustfs_filemeta::{
ReplicationType, ReplicationWorkerOperation, ResyncDecision, ResyncTargetDecision, VersionPurgeStatusType,
get_replication_state, parse_replicate_decision, replication_statuses_map, target_reset_header, version_purge_statuses_map,
};
use rustfs_s3_common::EventName;
use rustfs_utils::http::{
AMZ_BUCKET_REPLICATION_STATUS, AMZ_OBJECT_TAGGING, AMZ_TAGGING_DIRECTIVE, CONTENT_ENCODING, HeaderExt as _,
RESERVED_METADATA_PREFIX, RESERVED_METADATA_PREFIX_LOWER, RUSTFS_REPLICATION_ACTUAL_OBJECT_SIZE,
@@ -73,7 +78,7 @@ use tokio::task::JoinSet;
use tokio::time::Duration as TokioDuration;
use tokio_util::io::ReaderStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{error, info, instrument, warn};
const REPLICATION_DIR: &str = ".replication";
const RESYNC_FILE_NAME: &str = "resync.bin";
@@ -294,6 +299,7 @@ impl ReplicationResyncer {
// TODO: Metrics
}
#[instrument(skip(cancellation_token, storage))]
pub async fn resync_bucket<S: StorageAPI>(
self: Arc<Self>,
cancellation_token: CancellationToken,
@@ -569,7 +575,7 @@ impl ReplicationResyncer {
return;
}
let worker_idx = sip_hash(&roi.name, RESYNC_WORKER_COUNT, &DEFAULT_SIP_HASH_KEY) as usize;
let worker_idx = sip_hash(&roi.name, RESYNC_WORKER_COUNT, &DEFAULT_SIP_HASH_KEY);
if let Err(err) = worker_txs[worker_idx].send(roi).await {
error!("Failed to send object info to worker: {}", err);
@@ -590,6 +596,10 @@ impl ReplicationResyncer {
}
}
fn heal_should_use_check_replicate_delete(oi: &ObjectInfo) -> bool {
oi.delete_marker || (!oi.replication_status.is_empty() && oi.replication_status != ReplicationStatusType::Failed)
}
pub async fn get_heal_replicate_object_info(oi: &ObjectInfo, rcfg: &ReplicationConfig) -> ReplicateObjectInfo {
let mut oi = oi.clone();
let mut user_defined = oi.user_defined.clone();
@@ -617,7 +627,7 @@ pub async fn get_heal_replicate_object_info(oi: &ObjectInfo, rcfg: &ReplicationC
}
}
let dsc = if oi.delete_marker || !oi.replication_status.is_empty() {
let dsc = if heal_should_use_check_replicate_delete(&oi) {
check_replicate_delete(
oi.bucket.as_str(),
&ObjectToDelete {
@@ -1082,7 +1092,7 @@ pub async fn check_replicate_delete(
}
/// Check if the user-defined metadata contains SSEC encryption headers
fn is_ssec_encrypted(user_defined: &std::collections::HashMap<String, String>) -> bool {
fn is_ssec_encrypted(user_defined: &HashMap<String, String>) -> bool {
user_defined.contains_key(SSEC_ALGORITHM_HEADER)
|| user_defined.contains_key(SSEC_KEY_HEADER)
|| user_defined.contains_key(SSEC_KEY_MD5_HEADER)
@@ -1219,7 +1229,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
Ok(None) => {
warn!("No replication config found for bucket: {}", bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1238,7 +1248,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
Err(err) => {
warn!("replication config for bucket: {} error: {}", bucket, err);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1271,7 +1281,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
bucket, dobj.target_arn, err
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1299,7 +1309,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
bucket, dobj.delete_object.object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1324,7 +1334,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
bucket, dobj.delete_object.object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1365,7 +1375,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
let Some(tgt_client) = BucketTargetSys::get().get_remote_target_client(&bucket, &tgt_entry.arn).await else {
warn!("failed to get target for bucket:{:?}, arn:{:?}", &bucket, &tgt_entry.arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1396,7 +1406,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
Err(e) => {
error!("replicate_delete task failed: {}", e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1433,9 +1443,13 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
)
};
for tgt in rinfos.targets.iter() {
if tgt.replication_status != tgt.prev_replication_status {
// TODO: update global replication status
if let Some(stats) = GLOBAL_REPLICATION_STATS.get() {
for tgt in rinfos.targets.iter() {
if tgt.replication_status != tgt.prev_replication_status {
stats
.update(&bucket, tgt, tgt.replication_status.clone(), tgt.prev_replication_status.clone())
.await;
}
}
}
@@ -1449,9 +1463,9 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
}
let event_name = if replication_status == ReplicationStatusType::Completed {
EventName::ObjectReplicationComplete.as_ref().to_string()
EventName::ObjectReplicationComplete.to_string()
} else {
EventName::ObjectReplicationFailed.as_ref().to_string()
EventName::ObjectReplicationFailed.to_string()
};
match storage
@@ -1504,7 +1518,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
Ok(None) => {
warn!("replicate force-delete: no replication config for bucket:{}", bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1520,7 +1534,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
Err(err) => {
warn!("replicate force-delete: replication config error bucket:{} error:{}", bucket, err);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1546,7 +1560,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
bucket, object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1569,7 +1583,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
bucket, object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1599,7 +1613,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
let Some(tgt_client) = BucketTargetSys::get().get_remote_target_client(bucket, &arn).await else {
warn!("replicate force-delete: failed to get target client bucket:{} arn:{}", bucket, arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1620,7 +1634,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
if BucketTargetSys::get().is_offline(&tgt_client.to_url()).await {
error!("replicate force-delete: target offline bucket:{} arn:{}", bucket, tgt_client.arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationFailed.as_ref().to_string(),
event_name: EventName::ObjectReplicationFailed.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1656,7 +1670,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
bucket, object_name, tgt_client.arn, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationFailed.as_ref().to_string(),
event_name: EventName::ObjectReplicationFailed.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1789,7 +1803,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
Ok(None) => {
warn!("No replication config found for bucket: {}", bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1801,7 +1815,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
Err(err) => {
error!("Failed to get replication config for bucket {}: {}", bucket, err);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1827,7 +1841,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
let Some(tgt_client) = BucketTargetSys::get().get_remote_target_client(&bucket, &arn).await else {
warn!("failed to get target for bucket:{:?}, arn:{:?}", &bucket, &arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1861,7 +1875,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
Err(e) => {
error!("replicate_object task failed: {}", e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1891,13 +1905,21 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
object_info = u;
}
// TODO: update stats
if let Some(stats) = GLOBAL_REPLICATION_STATS.get() {
for tgt in &rinfos.targets {
if tgt.replication_status != tgt.prev_replication_status {
stats
.update(&bucket, tgt, tgt.replication_status.clone(), tgt.prev_replication_status.clone())
.await;
}
}
}
}
let event_name = if replication_status == ReplicationStatusType::Completed {
EventName::ObjectReplicationComplete.as_ref().to_string()
EventName::ObjectReplicationComplete.to_string()
} else {
EventName::ObjectReplicationFailed.as_ref().to_string()
EventName::ObjectReplicationFailed.to_string()
};
send_event(EventArgs {
@@ -1909,9 +1931,17 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
..Default::default()
});
if rinfos.replication_status() != ReplicationStatusType::Completed {
// TODO: update stats
// pool
if rinfos.replication_status() != ReplicationStatusType::Completed
&& roi.replication_status_internal == rinfos.replication_status_internal()
&& let Some(stats) = GLOBAL_REPLICATION_STATS.get()
{
for tgt in &rinfos.targets {
if tgt.replication_status != tgt.prev_replication_status {
stats
.update(&bucket, tgt, tgt.replication_status.clone(), tgt.prev_replication_status.clone())
.await;
}
}
}
}
@@ -1952,7 +1982,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if BucketTargetSys::get().is_offline(&tgt_client.to_url()).await {
warn!("target is offline: {}", tgt_client.to_url());
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1983,7 +2013,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
warn!("failed to get object reader for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2005,7 +2035,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
Err(e) => {
warn!("failed to get actual size for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2019,7 +2049,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if tgt_client.bucket.is_empty() {
warn!("target bucket is empty: {}", tgt_client.bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2076,7 +2106,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
bucket, tgt_client.arn, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2149,7 +2179,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if BucketTargetSys::get().is_offline(&tgt_client.to_url()).await {
warn!("target is offline: {}", tgt_client.to_url());
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2179,7 +2209,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if !is_err_object_not_found(&e) || is_err_version_not_found(&e) {
warn!("failed to get object reader for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2210,7 +2240,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
Err(e) => {
warn!("failed to get actual size for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2226,7 +2256,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if tgt_client.bucket.is_empty() {
warn!("target bucket is empty: {}", tgt_client.bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2257,9 +2287,8 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if replication_action == ReplicationAction::None {
if self.op_type == ReplicationType::ExistingObject
&& object_info.mod_time
> oi.last_modified.map(|dt| {
time::OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(time::OffsetDateTime::UNIX_EPOCH)
})
> oi.last_modified
.map(|dt| OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(OffsetDateTime::UNIX_EPOCH))
&& object_info.version_id.is_none()
{
warn!(
@@ -2269,7 +2298,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
tgt_client.to_url()
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info.clone(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2309,7 +2338,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
warn!("failed to head object for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2325,7 +2354,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
warn!("failed to head object for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2355,7 +2384,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
bucket, tgt_client.arn, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2426,19 +2455,19 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
// Standard headers that needs to be extracted from User metadata.
static STANDARD_HEADERS: &[&str] = &[
headers::CONTENT_TYPE,
headers::CACHE_CONTROL,
headers::CONTENT_ENCODING,
headers::CONTENT_LANGUAGE,
headers::CONTENT_DISPOSITION,
headers::AMZ_STORAGE_CLASS,
headers::AMZ_OBJECT_TAGGING,
headers::AMZ_BUCKET_REPLICATION_STATUS,
headers::AMZ_OBJECT_LOCK_MODE,
headers::AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE,
headers::AMZ_OBJECT_LOCK_LEGAL_HOLD,
headers::AMZ_TAG_COUNT,
headers::AMZ_SERVER_SIDE_ENCRYPTION,
CONTENT_TYPE,
CACHE_CONTROL,
CONTENT_ENCODING,
CONTENT_LANGUAGE,
CONTENT_DISPOSITION,
AMZ_STORAGE_CLASS,
AMZ_OBJECT_TAGGING,
AMZ_BUCKET_REPLICATION_STATUS,
AMZ_OBJECT_LOCK_MODE,
AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE,
AMZ_OBJECT_LOCK_LEGAL_HOLD,
AMZ_TAG_COUNT,
AMZ_SERVER_SIDE_ENCRYPTION,
];
fn calc_put_object_header_size(put_opts: &PutObjectOptions) -> usize {
@@ -2572,7 +2601,7 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
meta.insert(REPLICATION_SSEC_CHECKSUM_HEADER.to_string(), encoded);
} else {
// Get checksum metadata for non-SSE-C objects
let (cs_meta, is_mp) = object_info.decrypt_checksums(0, &http::HeaderMap::new())?;
let (cs_meta, is_mp) = object_info.decrypt_checksums(0, &HeaderMap::new())?;
is_multipart = is_mp;
// Set object checksum metadata
@@ -2644,24 +2673,24 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
// Use case-insensitive lookup for headers
let lk_map = object_info.user_defined.clone();
if let Some(lang) = lk_map.lookup(headers::CONTENT_LANGUAGE) {
if let Some(lang) = lk_map.lookup(CONTENT_LANGUAGE) {
put_op.content_language = lang.to_string();
}
if let Some(cd) = lk_map.lookup(headers::CONTENT_DISPOSITION) {
if let Some(cd) = lk_map.lookup(CONTENT_DISPOSITION) {
put_op.content_disposition = cd.to_string();
}
if let Some(v) = lk_map.lookup(headers::CACHE_CONTROL) {
if let Some(v) = lk_map.lookup(CACHE_CONTROL) {
put_op.cache_control = v.to_string();
}
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_MODE) {
if let Some(v) = lk_map.lookup(AMZ_OBJECT_LOCK_MODE) {
let mode = v.to_string().to_uppercase();
put_op.mode = Some(aws_sdk_s3::types::ObjectLockRetentionMode::from(mode.as_str()));
}
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE) {
if let Some(v) = lk_map.lookup(AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE) {
put_op.retain_until_date =
OffsetDateTime::parse(v, &Rfc3339).map_err(|e| Error::other(format!("Failed to parse retain until date: {}", e)))?;
// set retention timestamp in opts
@@ -2675,7 +2704,7 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
};
}
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_LEGAL_HOLD) {
if let Some(v) = lk_map.lookup(AMZ_OBJECT_LOCK_LEGAL_HOLD) {
let hold = v.to_uppercase();
put_op.legalhold = Some(ObjectLockLegalHoldStatus::from(hold.as_str()));
// set legalhold timestamp in opts
@@ -2860,7 +2889,7 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
&& oi1.mod_time
> oi2
.last_modified
.map(|dt| time::OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(time::OffsetDateTime::UNIX_EPOCH))
.map(|dt| OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(OffsetDateTime::UNIX_EPOCH))
&& oi1.version_id.is_none()
{
return ReplicationAction::None;
@@ -2879,7 +2908,7 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
|| oi1.mod_time
!= oi2
.last_modified
.map(|dt| time::OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(time::OffsetDateTime::UNIX_EPOCH))
.map(|dt| OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(OffsetDateTime::UNIX_EPOCH))
{
return ReplicationAction::All;
}
@@ -2965,6 +2994,7 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
#[test]
fn test_part_range_spec_from_actual_size() {
@@ -2979,4 +3009,71 @@ mod tests {
assert!(part_range_spec_from_actual_size(0, 0).is_err());
assert!(part_range_spec_from_actual_size(0, -1).is_err());
}
#[test]
fn test_heal_should_use_check_replicate_delete_failed_non_delete_marker() {
let oi = ObjectInfo {
bucket: "b".to_string(),
name: "obj".to_string(),
delete_marker: false,
replication_status: ReplicationStatusType::Failed,
..Default::default()
};
assert!(
!heal_should_use_check_replicate_delete(&oi),
"Failed non-delete-marker object must use must_replicate path so it can be re-queued for heal"
);
}
#[test]
fn test_heal_should_use_check_replicate_delete_pending_uses_delete_path() {
let oi = ObjectInfo {
bucket: "b".to_string(),
name: "obj".to_string(),
delete_marker: false,
replication_status: ReplicationStatusType::Pending,
..Default::default()
};
assert!(
heal_should_use_check_replicate_delete(&oi),
"Pending (non-Failed) status with non-empty replication uses check_replicate_delete path"
);
}
#[test]
fn test_heal_should_use_check_replicate_delete_delete_marker() {
let oi = ObjectInfo {
bucket: "b".to_string(),
name: "obj".to_string(),
delete_marker: true,
replication_status: ReplicationStatusType::Failed,
..Default::default()
};
assert!(
heal_should_use_check_replicate_delete(&oi),
"Delete marker always uses check_replicate_delete path"
);
}
#[tokio::test]
async fn test_get_heal_replicate_object_info_failed_object_returns_heal_roi() {
let oi = ObjectInfo {
bucket: "test-bucket".to_string(),
name: "key".to_string(),
delete_marker: false,
replication_status: ReplicationStatusType::Failed,
version_id: Some(Uuid::nil()),
mod_time: Some(OffsetDateTime::now_utc()),
..Default::default()
};
let rcfg = ReplicationConfig::new(None, None);
let roi = get_heal_replicate_object_info(&oi, &rcfg).await;
assert_eq!(roi.replication_status, ReplicationStatusType::Failed);
assert_eq!(roi.op_type, ReplicationType::Heal);
assert!(
roi.dsc.replicate_any() || roi.dsc.targets_map.is_empty(),
"With no replication config, dsc may be empty; with config, replicate_any() would be true and queueing would occur"
);
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use crate::error::Error;
use crate::global::get_global_bucket_monitor;
use rustfs_filemeta::{ReplicatedTargetInfo, ReplicationStatusType, ReplicationType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -585,6 +586,8 @@ pub struct BucketReplicationStat {
pub latency: LatencyStats,
pub xfer_rate_lrg: XferStats,
pub xfer_rate_sml: XferStats,
pub bandwidth_limit_bytes_per_sec: i64,
pub current_bandwidth_bytes_per_sec: f64,
}
impl BucketReplicationStat {
@@ -1019,6 +1022,9 @@ impl ReplicationStats {
latency: stat.latency.merge(&old_stat.latency),
xfer_rate_lrg: lrg,
xfer_rate_sml: sml,
bandwidth_limit_bytes_per_sec: stat.bandwidth_limit_bytes_per_sec,
current_bandwidth_bytes_per_sec: stat.current_bandwidth_bytes_per_sec
+ old_stat.current_bandwidth_bytes_per_sec,
};
tot_replicated_size += stat.replicated_size;
@@ -1069,24 +1075,43 @@ impl ReplicationStats {
// In actual implementation, statistics would be obtained from cluster
// This is simplified to get from local cache
let cache = self.cache.read().await;
if let Some(stats) = cache.get(bucket) {
BucketStats {
uptime: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64,
replication_stats: stats.clone_stats(),
queue_stats: Default::default(),
proxy_stats: ProxyMetric::default(),
}
let mut replication_stats = if let Some(stats) = cache.get(bucket) {
stats.clone_stats()
} else {
BucketStats {
uptime: 0,
replication_stats: BucketReplicationStats::new(),
queue_stats: Default::default(),
proxy_stats: ProxyMetric::default(),
BucketReplicationStats::new()
};
let uptime = if cache.contains_key(bucket) {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
} else {
0
};
drop(cache);
if let Some(monitor) = get_global_bucket_monitor() {
let bw_report = monitor.get_report(|name| name == bucket);
for (opts, bw) in bw_report.bucket_stats {
let stat = replication_stats
.stats
.entry(opts.replication_arn)
.or_insert_with(|| BucketReplicationStat {
xfer_rate_lrg: XferStats::new(),
xfer_rate_sml: XferStats::new(),
..Default::default()
});
stat.bandwidth_limit_bytes_per_sec = bw.limit_bytes_per_sec;
stat.current_bandwidth_bytes_per_sec = bw.current_bandwidth_bytes_per_sec;
}
}
BucketStats {
uptime,
replication_stats,
queue_stats: Default::default(),
proxy_stats: ProxyMetric::default(),
}
}
/// Increase queue statistics

View File

@@ -139,7 +139,11 @@ pub enum BucketLookupType {
fn load_root_store_from_tls_path() -> Option<rustls::RootCertStore> {
// Load the root certificate bundle from the path specified by the
// RUSTFS_TLS_PATH environment variable.
let tp = std::env::var("RUSTFS_TLS_PATH").ok()?;
let tp = rustfs_utils::get_env_str(rustfs_config::ENV_RUSTFS_TLS_PATH, rustfs_config::DEFAULT_RUSTFS_TLS_PATH);
// If no TLS path is configured, do not fall back to a CA bundle in the current directory.
if tp.is_empty() {
return None;
}
let ca = std::path::Path::new(&tp).join(rustfs_config::RUSTFS_CA_CERT);
if !ca.exists() {
return None;
@@ -155,19 +159,33 @@ fn load_root_store_from_tls_path() -> Option<rustls::RootCertStore> {
Some(store)
}
impl TransitionClient {
pub async fn new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
let clnt = Self::private_new(endpoint, opts, tier_type).await?;
Ok(clnt)
fn panic_payload_to_message(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(message) = payload.downcast_ref::<String>() {
return message.clone();
}
async fn private_new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
let endpoint_url = get_endpoint_url(endpoint, opts.secure)?;
if let Some(message) = payload.downcast_ref::<&'static str>() {
return (*message).to_string();
}
let scheme = endpoint_url.scheme();
let client;
let tls = if let Some(store) = load_root_store_from_tls_path() {
"unknown panic payload".to_string()
}
fn with_rustls_init_guard<T, F>(build: F) -> Result<T, std::io::Error>
where
F: FnOnce() -> Result<T, std::io::Error>,
{
std::panic::catch_unwind(std::panic::AssertUnwindSafe(build)).unwrap_or_else(|payload| {
let panic_message = panic_payload_to_message(payload);
Err(std::io::Error::other(format!(
"failed to initialize rustls crypto provider: {panic_message}. Ensure exactly one rustls crypto provider feature is enabled (aws-lc-rs or ring), or install one with CryptoProvider::install_default()"
)))
})
}
fn build_tls_config() -> Result<rustls::ClientConfig, std::io::Error> {
with_rustls_init_guard(|| {
let config = if let Some(store) = load_root_store_from_tls_path() {
rustls::ClientConfig::builder()
.with_root_certificates(store)
.with_no_client_auth()
@@ -175,20 +193,47 @@ impl TransitionClient {
rustls::ClientConfig::builder().with_native_roots()?.with_no_client_auth()
};
Ok(config)
})
}
impl TransitionClient {
pub async fn new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
let client = Self::private_new(endpoint, opts, tier_type).await?;
Ok(client)
}
async fn private_new(endpoint: &str, opts: Options, tier_type: &str) -> Result<TransitionClient, std::io::Error> {
if rustls::crypto::CryptoProvider::get_default().is_none() {
// No default provider is set yet; try to install aws-lc-rs.
// `install_default` can only fail if another thread races us and installs a provider
// between our check and this call, which is still safe to ignore.
if rustls::crypto::aws_lc_rs::default_provider().install_default().is_err() {
debug!("rustls crypto provider was installed concurrently, skipping aws-lc-rs install");
}
} else {
debug!("rustls crypto provider already installed, skipping aws-lc-rs install");
}
let endpoint_url = get_endpoint_url(endpoint, opts.secure)?;
let tls = build_tls_config()?;
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(tls)
.https_or_http()
.enable_http1()
.enable_http2()
.build();
client = Client::builder(TokioExecutor::new()).build(https);
let http_client = Client::builder(TokioExecutor::new()).build(https);
let mut clnt = TransitionClient {
let mut client = TransitionClient {
endpoint_url,
creds_provider: Arc::new(Mutex::new(opts.creds)),
override_signer_type: SignatureType::SignatureDefault,
secure: opts.secure,
http_client: client,
http_client,
bucket_loc_cache: Arc::new(Mutex::new(BucketLocationCache::new())),
is_trace_enabled: Arc::new(Mutex::new(false)),
trace_errors_only: Arc::new(Mutex::new(false)),
@@ -206,23 +251,23 @@ impl TransitionClient {
};
{
let mut md5_hasher = clnt.md5_hasher.lock().unwrap();
let mut md5_hasher = client.md5_hasher.lock().unwrap();
if md5_hasher.is_none() {
*md5_hasher = Some(HashAlgorithm::Md5);
}
}
if clnt.sha256_hasher.is_none() {
clnt.sha256_hasher = Some(HashAlgorithm::SHA256);
if client.sha256_hasher.is_none() {
client.sha256_hasher = Some(HashAlgorithm::SHA256);
}
clnt.trailing_header_support = opts.trailing_headers && clnt.override_signer_type == SignatureType::SignatureV4;
client.trailing_header_support = opts.trailing_headers && client.override_signer_type == SignatureType::SignatureV4;
clnt.max_retries = MAX_RETRY;
client.max_retries = MAX_RETRY;
if opts.max_retries > 0 {
clnt.max_retries = opts.max_retries;
client.max_retries = opts.max_retries;
}
Ok(clnt)
Ok(client)
}
fn endpoint_url(&self) -> Url {
@@ -1278,3 +1323,57 @@ pub struct CreateBucketConfiguration {
#[serde(rename = "LocationConstraint")]
pub location_constraint: String,
}
#[cfg(test)]
mod tests {
use super::{build_tls_config, load_root_store_from_tls_path, with_rustls_init_guard};
#[test]
fn rustls_guard_converts_panics_to_io_errors() {
let err = with_rustls_init_guard(|| -> Result<(), std::io::Error> { panic!("missing provider") })
.expect_err("panic should be converted into an io::Error");
assert!(
err.to_string().contains("missing provider"),
"expected panic message to be preserved, got: {err}"
);
}
#[test]
fn build_tls_config_returns_result_without_panicking() {
let outcome = std::panic::catch_unwind(build_tls_config);
assert!(outcome.is_ok(), "TLS config creation should not panic");
}
/// When RUSTFS_TLS_PATH is not set, `load_root_store_from_tls_path` must return `None`
/// (i.e. it must not silently look for a CA bundle in the current working directory).
#[test]
fn tls_path_unset_returns_none() {
let result = temp_env::with_var_unset(rustfs_config::ENV_RUSTFS_TLS_PATH, || load_root_store_from_tls_path());
assert!(result.is_none(), "expected None when RUSTFS_TLS_PATH is unset, but got a root store");
}
/// When RUSTFS_TLS_PATH is set to an empty string, `load_root_store_from_tls_path` must
/// return `None` to avoid accidentally trusting a CA bundle in the current directory.
#[test]
fn tls_path_empty_returns_none() {
let result = temp_env::with_var(rustfs_config::ENV_RUSTFS_TLS_PATH, Some(""), || load_root_store_from_tls_path());
assert!(result.is_none(), "expected None when RUSTFS_TLS_PATH is empty, but got a root store");
}
/// Installing the rustls crypto provider when one is already set must not panic or return
/// an error that surfaces to callers (the race-safe `get_default` check guards the install).
#[test]
fn provider_install_is_idempotent() {
// Install once (may already be set by another test in this binary — that's fine).
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
// A second install attempt on an already-set provider must not panic.
let outcome = std::panic::catch_unwind(|| {
if rustls::crypto::CryptoProvider::get_default().is_none() {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
}
// If a default is already present, the branch above is simply skipped.
});
assert!(outcome.is_ok(), "provider install guard must not panic when a provider is already set");
}
}

View File

@@ -22,7 +22,7 @@ use rustfs_utils::path::SLASH_SEPARATOR;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::LazyLock;
use tracing::{error, warn};
use tracing::{error, instrument, warn};
pub const CONFIG_PREFIX: &str = "config";
const CONFIG_FILE: &str = "config.json";
@@ -36,6 +36,8 @@ static SUB_SYSTEMS_DYNAMIC: LazyLock<HashSet<String>> = LazyLock::new(|| {
h.insert(STORAGE_CLASS_SUB_SYS.to_owned());
h
});
#[instrument(skip(api))]
pub async fn read_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<Vec<u8>> {
let (data, _obj) = read_config_with_metadata(api, file, &ObjectOptions::default()).await?;
Ok(data)
@@ -68,6 +70,7 @@ pub async fn read_config_with_metadata<S: StorageAPI>(
Ok((data, rd.object_info))
}
#[instrument(skip(api, data))]
pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>) -> Result<()> {
save_config_with_opts(
api,
@@ -81,6 +84,7 @@ pub async fn save_config<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>)
.await
}
#[instrument(skip(api))]
pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()> {
match api
.delete_object(

View File

@@ -15,12 +15,8 @@
pub mod local_snapshot;
use crate::{
bucket::metadata_sys::get_replication_config,
config::com::read_config,
disk::DiskAPI,
error::Error,
store::ECStore,
store_api::{BucketOperations, ListOperations},
bucket::metadata_sys::get_replication_config, config::com::read_config, disk::DiskAPI, error::Error, store::ECStore,
store_api::ListOperations,
};
pub use local_snapshot::{
DATA_USAGE_DIR, DATA_USAGE_STATE_DIR, LOCAL_USAGE_SNAPSHOT_VERSION, LocalUsageSnapshot, LocalUsageSnapshotMeta,
@@ -38,7 +34,7 @@ use std::{
};
use tokio::fs;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, instrument};
// Data usage storage constants
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR;
@@ -81,6 +77,7 @@ lazy_static::lazy_static! {
}
/// Store data usage info to backend storage
#[instrument(skip(store))]
pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: Arc<ECStore>) -> Result<(), Error> {
// Prevent older data from overwriting newer persisted stats
if let Ok(buf) = read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await
@@ -107,37 +104,30 @@ pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store:
}
/// Load data usage info from backend storage
#[instrument(skip(store))]
pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
let buf: Vec<u8> = match read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await {
Ok(data) => data,
Err(e) => {
error!("Failed to read data usage info from backend: {}", e);
if e == Error::ConfigNotFound {
info!("Data usage config not found, building basic statistics");
return build_basic_data_usage_info(store).await;
match read_config(store.clone(), format!("{}.bkp", DATA_USAGE_OBJ_NAME_PATH.as_str()).as_str()).await {
Ok(data) => data,
Err(e) => {
if e == Error::ConfigNotFound {
return Ok(DataUsageInfo::default());
}
error!("Failed to read data usage info from backend: {}", e);
return Err(Error::other(e));
}
}
return Err(Error::other(e));
}
};
let mut data_usage_info: DataUsageInfo =
serde_json::from_slice(&buf).map_err(|e| Error::other(format!("Failed to deserialize data usage info: {e}")))?;
info!("Loaded data usage info from backend with {} buckets", data_usage_info.buckets_count);
// Validate data and supplement if empty
if data_usage_info.buckets_count == 0 || data_usage_info.buckets_usage.is_empty() {
warn!("Loaded data is empty, supplementing with basic statistics");
if let Ok(basic_info) = build_basic_data_usage_info(store.clone()).await {
data_usage_info.buckets_count = basic_info.buckets_count;
data_usage_info.buckets_usage = basic_info.buckets_usage;
data_usage_info.bucket_sizes = basic_info.bucket_sizes;
data_usage_info.objects_total_count = basic_info.objects_total_count;
data_usage_info.objects_total_size = basic_info.objects_total_size;
data_usage_info.last_update = basic_info.last_update;
}
}
// Handle backward compatibility
if data_usage_info.buckets_usage.is_empty() {
data_usage_info.buckets_usage = data_usage_info
@@ -502,57 +492,6 @@ pub async fn sync_memory_cache_with_backend() -> Result<(), Error> {
Ok(())
}
/// Build basic data usage info with real object counts
pub async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
let mut data_usage_info = DataUsageInfo::default();
// Get bucket list
match store.list_bucket(&crate::store_api::BucketOptions::default()).await {
Ok(buckets) => {
data_usage_info.buckets_count = buckets.len() as u64;
data_usage_info.last_update = Some(SystemTime::now());
let mut total_objects = 0u64;
let mut total_versions = 0u64;
let mut total_size = 0u64;
let mut total_delete_markers = 0u64;
for bucket_info in buckets {
if bucket_info.name.starts_with('.') {
continue; // Skip system buckets
}
match compute_bucket_usage(store.clone(), &bucket_info.name).await {
Ok(bucket_usage) => {
total_objects = total_objects.saturating_add(bucket_usage.objects_count);
total_versions = total_versions.saturating_add(bucket_usage.versions_count);
total_size = total_size.saturating_add(bucket_usage.size);
total_delete_markers = total_delete_markers.saturating_add(bucket_usage.delete_markers_count);
data_usage_info
.buckets_usage
.insert(bucket_info.name.clone(), bucket_usage.clone());
data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_usage.size);
}
Err(e) => {
warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e);
}
}
}
data_usage_info.objects_total_count = total_objects;
data_usage_info.versions_total_count = total_versions;
data_usage_info.objects_total_size = total_size;
data_usage_info.delete_markers_total_count = total_delete_markers;
}
Err(e) => {
warn!("Failed to list buckets for basic data usage info: {}", e);
}
}
Ok(data_usage_info)
}
/// Create a data usage cache entry from size summary
pub fn create_cache_entry_from_summary(summary: &SizeSummary) -> DataUsageEntry {
let mut entry = DataUsageEntry::default();
@@ -696,6 +635,7 @@ pub async fn load_data_usage_cache(store: &crate::set_disk::SetDisks, name: &str
Ok(d)
}
#[instrument(skip(cache))]
pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate::error::Result<()> {
use crate::config::com::save_config;
use crate::disk::BUCKET_META_PREFIX;

View File

@@ -1317,39 +1317,34 @@ fn normalize_path_components(path: impl AsRef<Path>) -> PathBuf {
#[async_trait::async_trait]
impl DiskAPI for LocalDisk {
#[tracing::instrument(skip(self))]
fn to_string(&self) -> String {
self.root.to_string_lossy().to_string()
}
#[tracing::instrument(skip(self))]
fn is_local(&self) -> bool {
true
}
#[tracing::instrument(skip(self))]
fn host_name(&self) -> String {
self.endpoint.host_port()
}
#[tracing::instrument(skip(self))]
async fn is_online(&self) -> bool {
true
}
#[tracing::instrument(skip(self))]
fn endpoint(&self) -> Endpoint {
self.endpoint.clone()
}
#[tracing::instrument(skip(self))]
async fn close(&self) -> Result<()> {
Ok(())
}
#[tracing::instrument(skip(self))]
fn path(&self) -> PathBuf {
self.root.clone()
}
#[tracing::instrument(skip(self))]
fn get_disk_location(&self) -> DiskLocation {
DiskLocation {
pool_idx: {
@@ -1437,7 +1432,6 @@ impl DiskAPI for LocalDisk {
Ok(Some(disk_id))
}
#[tracing::instrument(skip(self))]
async fn set_disk_id(&self, _id: Option<Uuid>) -> Result<()> {
// No setup is required locally
Ok(())
@@ -2601,6 +2595,7 @@ impl DiskAPI for LocalDisk {
ScanGuard(Arc::clone(&self.scanning))
}
#[tracing::instrument(skip(self))]
async fn read_metadata(&self, volume: &str, path: &str) -> Result<Bytes> {
// Try to use cached file content reading for better performance, with safe fallback
let file_path = self.get_object_path(volume, path)?;
@@ -2617,6 +2612,7 @@ impl DiskAPI for LocalDisk {
}
}
#[tracing::instrument]
async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> {
let drive_path = drive_path.to_string_lossy().to_string();
check_path_length(&drive_path)?;

View File

@@ -60,7 +60,6 @@ pub enum Disk {
#[async_trait::async_trait]
impl DiskAPI for Disk {
#[tracing::instrument(skip(self))]
fn to_string(&self) -> String {
match self {
Disk::Local(local_disk) => local_disk.to_string(),
@@ -68,7 +67,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
async fn is_online(&self) -> bool {
match self {
Disk::Local(local_disk) => local_disk.is_online().await,
@@ -76,7 +74,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
fn is_local(&self) -> bool {
match self {
Disk::Local(local_disk) => local_disk.is_local(),
@@ -84,7 +81,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
fn host_name(&self) -> String {
match self {
Disk::Local(local_disk) => local_disk.host_name(),
@@ -92,7 +88,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
fn endpoint(&self) -> Endpoint {
match self {
Disk::Local(local_disk) => local_disk.endpoint(),
@@ -100,7 +95,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
async fn close(&self) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.close().await,
@@ -108,7 +102,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
match self {
Disk::Local(local_disk) => local_disk.get_disk_id().await,
@@ -116,7 +109,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.set_disk_id(id).await,
@@ -124,7 +116,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
fn path(&self) -> PathBuf {
match self {
Disk::Local(local_disk) => local_disk.path(),
@@ -132,7 +123,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
fn get_disk_location(&self) -> DiskLocation {
match self {
Disk::Local(local_disk) => local_disk.get_disk_location(),
@@ -164,7 +154,6 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument(skip(self))]
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
match self {
Disk::Local(local_disk) => local_disk.stat_volume(volume).await,

View File

@@ -28,10 +28,7 @@ pin_project! {
shard_size: usize,
buf: Vec<u8>,
hash_buf: Vec<u8>,
// hash_read: usize,
// data_buf: Vec<u8>,
// data_read: usize,
// hash_checked: bool,
skip_verify: bool,
id: Uuid,
}
}
@@ -41,7 +38,7 @@ where
R: AsyncRead + Unpin + Send + Sync,
{
/// Create a new BitrotReader.
pub fn new(inner: R, shard_size: usize, algo: HashAlgorithm) -> Self {
pub fn new(inner: R, shard_size: usize, algo: HashAlgorithm, skip_verify: bool) -> Self {
let hash_size = algo.size();
Self {
inner,
@@ -49,10 +46,7 @@ where
shard_size,
buf: Vec::new(),
hash_buf: vec![0u8; hash_size],
// hash_read: 0,
// data_buf: Vec::new(),
// data_read: 0,
// hash_checked: false,
skip_verify,
id: Uuid::new_v4(),
}
}
@@ -90,7 +84,7 @@ where
data_len += n;
}
if hash_size > 0 {
if hash_size > 0 && !self.skip_verify {
let actual_hash = self.hash_algo.hash_encode(&out[..data_len]);
if actual_hash.as_ref() != self.hash_buf.as_slice() {
error!("bitrot reader hash mismatch, id={} data_len={}, out_len={}", self.id, data_len, out.len());
@@ -388,7 +382,7 @@ mod tests {
// Read
let reader = bitrot_writer.into_inner();
let reader = Cursor::new(reader.into_inner());
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256, false);
let mut out = Vec::new();
let mut n = 0;
while n < data_size {
@@ -420,7 +414,7 @@ mod tests {
let pos = written.len() - 1;
written[pos] ^= 0xFF;
let reader = Cursor::new(written);
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256);
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::HighwayHash256, false);
let count = data_size.div_ceil(shard_size);
@@ -464,7 +458,7 @@ mod tests {
let reader = bitrot_writer.into_inner();
let reader = Cursor::new(reader.into_inner());
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::None);
let mut bitrot_reader = BitrotReader::new(reader, shard_size, HashAlgorithm::None, false);
let mut out = Vec::new();
let mut n = 0;
while n < data_size {

View File

@@ -454,6 +454,6 @@ mod tests {
}
let reader_cursor = Cursor::new(buf);
BitrotReader::new(reader_cursor, shard_size, hash_algo.clone())
BitrotReader::new(reader_cursor, shard_size, hash_algo.clone(), false)
}
}

View File

@@ -321,7 +321,7 @@ impl Erasure {
///
/// # Returns
/// A vector of encoded shards as `Bytes`.
#[tracing::instrument(level = "info", skip_all, fields(data_len=data.len()))]
#[tracing::instrument(level = "debug", skip_all, fields(data_len=data.len()))]
pub fn encode_data(&self, data: &[u8]) -> io::Result<Vec<Bytes>> {
// let shard_size = self.shard_size();
// let total_size = shard_size * self.total_shard_count();

View File

@@ -13,6 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Defines the EventName enum which represents the various S3 event types that can trigger notifications.
//! This enum includes both specific event types (e.g., ObjectCreated:Put) and aggregate types (e.g., ObjectCreated:*). Each variant has methods to expand into its constituent event types and to compute a bitmask for efficient filtering.
//! The EventName enum is used in the event notification system to determine which events should trigger notifications based on the configured rules.
//!
//! @Deprecated: This module is currently not fully implemented and serves as a placeholder for future development of the event notification system. The EventName enum and its associated methods are defined, but the actual logic for handling events and sending notifications is not yet implemented.
#[derive(Default, Clone)]
pub enum EventName {
ObjectAccessedGet,

View File

@@ -15,7 +15,7 @@
#![allow(unused_variables)]
use crate::bucket::metadata::BucketMetadata;
use crate::event::name::EventName;
// use crate::event::name::EventName;
use crate::event::targetlist::TargetList;
use crate::store::ECStore;
use crate::store_api::ObjectInfo;

View File

@@ -48,7 +48,7 @@ use crate::{
UpdateMetadataOpts, endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk,
},
error::{StorageError, to_object_err},
event::name::EventName,
// event::name::EventName,
event_notification::{EventArgs, send_event},
global::{GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id, is_dist_erasure},
store_api::{
@@ -79,6 +79,7 @@ use rustfs_lock::local_lock::LocalLock;
use rustfs_lock::{FastLockGuard, NamespaceLock, NamespaceLockGuard, NamespaceLockWrapper, ObjectKey};
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, WarpReader};
use rustfs_s3_common::EventName;
use rustfs_utils::http::RUSTFS_BUCKET_REPLICATION_SSEC_CHECKSUM;
use rustfs_utils::http::headers::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX, RESERVED_METADATA_PREFIX_LOWER};
@@ -493,6 +494,7 @@ impl ObjectIO for SetDisks {
let object = object.to_owned();
let set_index = self.set_index;
let pool_index = self.pool_index;
let skip_verify = opts.skip_verify_bitrot;
// Move the read-lock guard into the task so it lives for the duration of the read
// let _guard_to_hold = _read_lock_guard; // moved into closure below
tokio::spawn(async move {
@@ -509,6 +511,7 @@ impl ObjectIO for SetDisks {
&disks,
set_index,
pool_index,
skip_verify,
)
.await
{
@@ -519,7 +522,7 @@ impl ObjectIO for SetDisks {
Ok(reader)
}
#[tracing::instrument(level = "debug", skip(self, data,))]
#[tracing::instrument(skip(self, data,))]
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
let disks = self.get_disks_internal().await;
@@ -1653,6 +1656,7 @@ impl ObjectOperations for SetDisks {
let cloned_fi = fi.clone();
let set_index = self.set_index;
let pool_index = self.pool_index;
let skip_verify = opts.skip_verify_bitrot;
tokio::spawn(async move {
if let Err(e) = Self::get_object_with_fileinfo(
&cloned_bucket,
@@ -1665,6 +1669,7 @@ impl ObjectOperations for SetDisks {
&online_disks,
set_index,
pool_index,
skip_verify,
)
.await
{
@@ -1682,17 +1687,17 @@ impl ObjectOperations for SetDisks {
if let Err(err) = rv {
return Err(StorageError::Io(err));
}
let rv = rv.unwrap();
let rv = rv?;
fi.transition_status = TRANSITION_COMPLETE.to_string();
fi.transitioned_objname = dest_obj;
fi.transition_tier = opts.transition.tier.clone();
fi.transition_version_id = if rv.is_empty() { None } else { Some(Uuid::parse_str(&rv)?) };
let mut event_name = EventName::ObjectTransitionComplete.as_ref();
let mut event_name = EventName::ObjectTransitionComplete.as_str();
let disks = self.get_disks(0, 0).await?;
if let Err(err) = self.delete_object_version(bucket, object, &fi, false).await {
event_name = EventName::ObjectTransitionFailed.as_ref();
event_name = EventName::ObjectTransitionFailed.as_str();
}
for disk in disks.iter() {

View File

@@ -375,6 +375,7 @@ impl SetDisks {
till_offset,
erasure.shard_size(),
checksum_algo.clone(),
false,
)
.await
{
@@ -680,6 +681,7 @@ impl SetDisks {
Ok((result, None))
}
#[tracing::instrument(skip(self))]
pub(super) async fn heal_object_dir(
&self,
bucket: &str,

View File

@@ -568,6 +568,7 @@ impl SetDisks {
disks: &[Option<DiskStore>],
set_index: usize,
pool_index: usize,
skip_verify_bitrot: bool,
) -> Result<()>
where
W: AsyncWrite + Send + Sync + Unpin + 'static,
@@ -659,6 +660,7 @@ impl SetDisks {
till_offset,
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
skip_verify_bitrot,
)
.await
{

View File

@@ -381,7 +381,6 @@ impl BucketOperations for Sets {
#[async_trait::async_trait]
impl ObjectOperations for Sets {
#[tracing::instrument(skip(self))]
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).get_object_info(bucket, object, opts).await
}
@@ -826,7 +825,6 @@ impl HealOperations for Sets {
#[async_trait::async_trait]
impl StorageAPI for Sets {
#[tracing::instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<NamespaceLockWrapper> {
self.disk_set[0].new_ns_lock(bucket, object).await
}

View File

@@ -281,7 +281,6 @@ impl BucketOperations for ECStore {
#[async_trait::async_trait]
impl ObjectOperations for ECStore {
#[instrument(skip(self))]
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.handle_get_object_info(bucket, object, opts).await
}
@@ -561,7 +560,6 @@ impl HealOperations for ECStore {
#[async_trait::async_trait]
impl StorageAPI for ECStore {
#[instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<NamespaceLockWrapper> {
self.handle_new_ns_lock(bucket, object).await
}

View File

@@ -70,6 +70,7 @@ pub struct ObjectOptions {
pub eval_metadata: Option<HashMap<String, String>>,
pub want_checksum: Option<Checksum>,
pub skip_verify_bitrot: bool,
}
impl ObjectOptions {

View File

@@ -72,12 +72,10 @@ impl WarmBackendRustFS {
};
let scheme = u.scheme();
let default_port = if scheme == "https" { 443 } else { 80 };
let client = TransitionClient::new(
&format!("{}:{}", u.host_str().expect("err"), u.port().unwrap_or(default_port)),
opts,
"rustfs",
)
.await?;
let host = u
.host_str()
.ok_or_else(|| std::io::Error::other("endpoint URL must include a host"))?;
let client = TransitionClient::new(&format!("{host}:{}", u.port().unwrap_or(default_port)), opts, "rustfs").await?;
let client = Arc::new(client);
let core = TransitionCore(Arc::clone(&client));
@@ -158,3 +156,35 @@ fn optimal_part_size(object_size: i64) -> Result<i64, std::io::Error> {
}
Ok(part_size)
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use std::panic::AssertUnwindSafe;
use super::*;
fn rustfs_tier(endpoint: &str) -> TierRustFS {
TierRustFS {
endpoint: endpoint.to_string(),
access_key: "access".to_string(),
secret_key: "secret".to_string(),
bucket: "bucket".to_string(),
..Default::default()
}
}
#[tokio::test]
async fn new_returns_error_when_endpoint_has_no_host() {
let conf = rustfs_tier("rustfs://");
let outcome = AssertUnwindSafe(WarmBackendRustFS::new(&conf, "tier")).catch_unwind().await;
let result = outcome.expect("initialization should return an error instead of panicking");
let err = match result {
Ok(_) => panic!("endpoint without host must be rejected"),
Err(err) => err,
};
assert!(err.to_string().contains("host"), "expected host validation error, got: {err}");
}
}

View File

@@ -31,7 +31,7 @@ use tokio::{
time::interval,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
/// Priority queue wrapper for heal requests
/// Uses BinaryHeap for priority-based ordering while maintaining FIFO for same-priority items
@@ -509,7 +509,7 @@ impl HealManager {
}
if endpoints.is_empty() {
info!("start_auto_disk_scanner: No endpoints need healing");
debug!("start_auto_disk_scanner: No endpoints need healing");
continue;
}

View File

@@ -73,7 +73,7 @@ pub async fn init_heal_manager(
.map_err(|_| Error::Config("Heal manager already initialized".to_string()))?;
// Initialize heal channel
let channel_receiver = rustfs_common::heal_channel::init_heal_channel();
let channel_receiver = rustfs_common::heal_channel::init_heal_channel().map_err(|err| Error::Config(err.to_string()))?;
// Create channel processor
let channel_processor = HealChannelProcessor::new(heal_manager.clone());

27
crates/iam/AGENTS.md Normal file
View File

@@ -0,0 +1,27 @@
# IAM Crate Instructions
Applies to `crates/iam/`.
## Security Boundaries
- Treat IAM changes as security-sensitive by default.
- Never log secrets, tokens, private claims, or credential material.
- Keep deny/allow evaluation behavior consistent with existing tests unless explicitly changing policy semantics.
## Contract Stability
- Preserve compatibility of user/group/policy attachment behavior and token claim handling.
- When changing IAM interfaces, verify impacted call sites in:
- `rustfs/src/auth.rs`
- `rustfs/src/admin/`
- `crates/policy/`
## Error Handling
- Return explicit errors; do not use panic-driven control flow outside tests.
- Keep error messages actionable but avoid leaking sensitive context.
## Suggested Validation
- `cargo test -p rustfs-iam`
- Full gate before commit: `make pre-commit`

View File

@@ -1728,6 +1728,15 @@ where
}
let u = m[name].clone();
match user_type {
UserType::Sts => {
Cache::add_or_update(&self.cache.sts_accounts, name, &u, OffsetDateTime::now_utc());
}
UserType::Reg | UserType::Svc => {
Cache::add_or_update(&self.cache.users, name, &u, OffsetDateTime::now_utc());
}
UserType::None => {}
}
match user_type {
UserType::Sts => {
@@ -1752,7 +1761,7 @@ where
return Ok(());
}
Cache::add_or_update(&self.cache.sts_policies, name, &m[name], OffsetDateTime::now_utc());
Cache::add_or_update(&self.cache.user_policies, name, &m[name], OffsetDateTime::now_utc());
}
UserType::Svc => {

View File

@@ -430,7 +430,7 @@ impl Store for ObjectStore {
/// # Returns
///
/// * `Result<()>` - `Ok(())` on success, or an `Error` if all attempts fail.
#[tracing::instrument(level = "debug", skip(self, item, path))]
#[tracing::instrument(skip(self, item, path))]
async fn save_iam_config<Item: Serialize + Send>(&self, item: Item, path: impl AsRef<str> + Send) -> Result<()> {
let mut data = serde_json::to_vec(&item)?;
data = Self::encrypt_data(&data)?;

View File

@@ -1083,7 +1083,16 @@ mod tests {
Err(Error::InvalidArgument)
}
async fn load_user(&self, _name: &str, _user_type: UserType, _m: &mut HashMap<String, UserIdentity>) -> Result<()> {
async fn load_user(&self, name: &str, user_type: UserType, m: &mut HashMap<String, UserIdentity>) -> Result<()> {
if user_type == UserType::Reg && name == "notify-user" {
let user = UserIdentity::from(Credentials {
access_key: name.to_string(),
secret_key: "notify-user-secret".to_string(),
status: ACCOUNT_ON.to_string(),
..Default::default()
});
m.insert(name.to_string(), user);
}
Ok(())
}
@@ -1148,11 +1157,14 @@ mod tests {
async fn load_mapped_policy(
&self,
_name: &str,
_user_type: UserType,
_is_group: bool,
_m: &mut HashMap<String, MappedPolicy>,
name: &str,
user_type: UserType,
is_group: bool,
m: &mut HashMap<String, MappedPolicy>,
) -> Result<()> {
if user_type == UserType::Reg && !is_group && name == "notify-user" {
m.insert(name.to_string(), MappedPolicy::new("readwrite"));
}
Ok(())
}
@@ -1247,4 +1259,29 @@ mod tests {
"STS temp credentials with no groups in args should still be allowed via parent user's group policy (readwrite)"
);
}
/// Regression test for cross-node IAM notifications:
/// `load_user` must populate user cache, and regular-user mapped policy must be written to
/// `user_policies` (not `sts_policies`), otherwise list-users and bucket-scoped user listing
/// may miss users on follower nodes.
#[tokio::test]
async fn test_load_user_notification_populates_user_and_policy_caches() {
let store = StsGroupsFallbackMockStore;
let cache_manager = IamCache::new(store).await;
let iam_sys = IamSys::new(cache_manager);
iam_sys.load_user("notify-user", UserType::Reg).await.unwrap();
let users = iam_sys.list_users().await.unwrap();
assert!(
users.contains_key("notify-user"),
"regular user loaded via notification must appear in list_users cache view"
);
let bucket_users = iam_sys.list_bucket_users("notification-regression-bucket").await.unwrap();
assert!(
bucket_users.contains_key("notify-user"),
"regular user mapped policy must be written to user_policies for bucket user listing"
);
}
}

27
crates/kms/AGENTS.md Normal file
View File

@@ -0,0 +1,27 @@
# KMS Crate Instructions
Applies to `crates/kms/`.
## Change Coordination
When changing key-management behavior, verify compatibility with:
- `rustfs/src/storage/ecfs.rs`
- `rustfs/src/admin/handlers/kms.rs`
- `rustfs/src/admin/handlers/kms_dynamic.rs`
- `rustfs/src/admin/handlers/kms_keys.rs`
- `rustfs/src/admin/handlers/kms_management.rs`
## Security
- Never log plaintext keys, key material, or sensitive request payloads.
- Prefer explicit error propagation over panic paths.
## Testing
For local KMS end-to-end tests, keep proxy bypass settings:
```bash
NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= \
cargo test --package e2e_test test_local_kms_end_to_end -- --nocapture --test-threads=1
```

View File

@@ -16,7 +16,7 @@ use anyhow::Result;
use rmcp::{
ErrorData, RoleServer, ServerHandler,
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, ToolsCapability},
model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo},
service::{NotificationContext, RequestContext},
tool, tool_handler, tool_router,
};
@@ -604,21 +604,10 @@ impl RustfsMcpServer {
#[tool_handler(router = self.tool_router)]
impl ServerHandler for RustfsMcpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ServerCapabilities {
tools: Some(ToolsCapability {
list_changed: Some(false),
}),
..Default::default()
},
instructions: Some("RustFS MCP Server providing S3 operations through Model Context Protocol".into()),
server_info: Implementation {
name: "rustfs-mcp-server".into(),
version: env!("CARGO_PKG_VERSION").into(),
..Default::default()
},
}
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
.with_instructions("RustFS MCP Server providing S3 operations through Model Context Protocol")
.with_server_info(Implementation::new("rustfs-mcp-server", env!("CARGO_PKG_VERSION")))
.with_protocol_version(ProtocolVersion::LATEST)
}
async fn ping(&self, _ctx: RequestContext<RoleServer>) -> Result<(), ErrorData> {

View File

@@ -0,0 +1,119 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Bucket replication bandwidth metrics collector.
use crate::MetricType;
use crate::format::PrometheusMetric;
use std::borrow::Cow;
/// Bucket replication bandwidth stats for one replication target.
#[derive(Debug, Clone, Default)]
pub struct BucketReplicationBandwidthStats {
pub bucket: String,
pub target_arn: String,
pub limit_bytes_per_sec: i64,
pub current_bandwidth_bytes_per_sec: f64,
}
const BUCKET_LABEL: &str = "bucket";
const TARGET_ARN_LABEL: &str = "targetArn";
const METRIC_BANDWIDTH_LIMIT: &str = "rustfs_bucket_replication_bandwidth_limit_bytes_per_second";
const METRIC_BANDWIDTH_CURRENT: &str = "rustfs_bucket_replication_bandwidth_current_bytes_per_second";
const HELP_BANDWIDTH_LIMIT: &str = "Configured bandwidth limit for replication in bytes per second";
const HELP_BANDWIDTH_CURRENT: &str = "Current replication bandwidth in bytes per second (EWMA)";
/// Collect bucket replication bandwidth metrics for Prometheus/OpenTelemetry export.
#[must_use]
#[inline]
pub fn collect_bucket_replication_bandwidth_metrics(stats: &[BucketReplicationBandwidthStats]) -> Vec<PrometheusMetric> {
if stats.is_empty() {
return Vec::new();
}
let mut metrics = Vec::with_capacity(stats.len() * 2);
for stat in stats {
let bucket_label: Cow<'static, str> = Cow::Owned(stat.bucket.clone());
let target_arn_label: Cow<'static, str> = Cow::Owned(stat.target_arn.clone());
metrics.push(
PrometheusMetric::new(
METRIC_BANDWIDTH_LIMIT,
MetricType::Gauge,
HELP_BANDWIDTH_LIMIT,
stat.limit_bytes_per_sec as f64,
)
.with_label(BUCKET_LABEL, bucket_label.clone())
.with_label(TARGET_ARN_LABEL, target_arn_label.clone()),
);
metrics.push(
PrometheusMetric::new(
METRIC_BANDWIDTH_CURRENT,
MetricType::Gauge,
HELP_BANDWIDTH_CURRENT,
stat.current_bandwidth_bytes_per_sec,
)
.with_label(BUCKET_LABEL, bucket_label)
.with_label(TARGET_ARN_LABEL, target_arn_label),
);
}
metrics
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collect_bucket_replication_bandwidth_metrics() {
let stats = vec![BucketReplicationBandwidthStats {
bucket: "b1".to_string(),
target_arn: "arn:rustfs:replication:us-east-1:1:test-2".to_string(),
limit_bytes_per_sec: 1_048_576,
current_bandwidth_bytes_per_sec: 204_800.0,
}];
let metrics = collect_bucket_replication_bandwidth_metrics(&stats);
assert_eq!(metrics.len(), 2);
let limit_metric = metrics.iter().find(|m| m.name == METRIC_BANDWIDTH_LIMIT);
assert!(limit_metric.is_some());
assert_eq!(limit_metric.map(|m| m.value), Some(1_048_576.0));
assert!(
limit_metric
.and_then(|m| {
m.labels
.iter()
.find(|(k, _)| *k == TARGET_ARN_LABEL)
.map(|(_, v)| v.as_ref() == "arn:rustfs:replication:us-east-1:1:test-2")
})
.unwrap_or(false)
);
let current_metric = metrics.iter().find(|m| m.name == METRIC_BANDWIDTH_CURRENT);
assert!(current_metric.is_some());
assert_eq!(current_metric.map(|m| m.value), Some(204_800.0));
}
#[test]
fn test_collect_bucket_replication_bandwidth_metrics_empty() {
let stats: Vec<BucketReplicationBandwidthStats> = Vec::new();
let metrics = collect_bucket_replication_bandwidth_metrics(&stats);
assert!(metrics.is_empty());
}
}

View File

@@ -13,17 +13,19 @@
// limitations under the License.
use crate::collectors::{
BucketStats, ClusterStats, DiskStats, ResourceStats, collect_bucket_metrics, collect_cluster_metrics, collect_node_metrics,
collect_resource_metrics,
BucketReplicationBandwidthStats, BucketStats, ClusterStats, DiskStats, ResourceStats, collect_bucket_metrics,
collect_bucket_replication_bandwidth_metrics, collect_cluster_metrics, collect_node_metrics, collect_resource_metrics,
};
use crate::constants::{
DEFAULT_BUCKET_METRICS_INTERVAL, DEFAULT_CLUSTER_METRICS_INTERVAL, DEFAULT_NODE_METRICS_INTERVAL,
DEFAULT_RESOURCE_METRICS_INTERVAL, ENV_BUCKET_METRICS_INTERVAL, ENV_CLUSTER_METRICS_INTERVAL, ENV_DEFAULT_METRICS_INTERVAL,
DEFAULT_BUCKET_METRICS_INTERVAL, DEFAULT_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL, DEFAULT_CLUSTER_METRICS_INTERVAL,
DEFAULT_NODE_METRICS_INTERVAL, DEFAULT_RESOURCE_METRICS_INTERVAL, ENV_BUCKET_METRICS_INTERVAL,
ENV_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL, ENV_CLUSTER_METRICS_INTERVAL, ENV_DEFAULT_METRICS_INTERVAL,
ENV_NODE_METRICS_INTERVAL, ENV_RESOURCE_METRICS_INTERVAL,
};
use crate::format::report_metrics;
use rustfs_ecstore::bucket::metadata_sys::get_quota_config;
use rustfs_ecstore::data_usage::load_data_usage_from_backend;
use rustfs_ecstore::global::get_global_bucket_monitor;
use rustfs_ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free};
use rustfs_ecstore::store_api::{BucketOperations, BucketOptions};
use rustfs_ecstore::{StorageAPI, new_object_layer_fn};
@@ -32,7 +34,7 @@ use std::sync::OnceLock;
use std::time::{Duration, Instant};
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{instrument, warn};
/// Process start time for calculating uptime.
static PROCESS_START: OnceLock<Instant> = OnceLock::new();
@@ -44,6 +46,7 @@ fn get_process_start() -> &'static Instant {
}
/// Collect cluster statistics from the storage layer.
#[instrument]
async fn collect_cluster_stats() -> ClusterStats {
let Some(store) = new_object_layer_fn() else {
return ClusterStats::default();
@@ -146,6 +149,25 @@ async fn collect_bucket_stats() -> Vec<BucketStats> {
stats
}
/// Collect bucket replication bandwidth stats from the global monitor.
fn collect_bucket_replication_bandwidth_stats() -> Vec<BucketReplicationBandwidthStats> {
let Some(monitor) = get_global_bucket_monitor() else {
return Vec::new();
};
monitor
.get_report(|_| true)
.bucket_stats
.into_iter()
.map(|(opts, details)| BucketReplicationBandwidthStats {
bucket: opts.name,
target_arn: opts.replication_arn,
limit_bytes_per_sec: details.limit_bytes_per_sec,
current_bandwidth_bytes_per_sec: details.current_bandwidth_bytes_per_sec,
})
.collect()
}
/// Collect disk statistics from the storage layer.
async fn collect_disk_stats() -> Vec<DiskStats> {
let Some(store) = new_object_layer_fn() else {
@@ -234,6 +256,10 @@ pub fn init_metrics_collectors(token: CancellationToken) {
let cluster_interval = get_interval(ENV_CLUSTER_METRICS_INTERVAL, DEFAULT_CLUSTER_METRICS_INTERVAL);
let bucket_interval = get_interval(ENV_BUCKET_METRICS_INTERVAL, DEFAULT_BUCKET_METRICS_INTERVAL);
let bucket_replication_bandwidth_interval = get_interval(
ENV_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL,
DEFAULT_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL,
);
let node_interval = get_interval(ENV_NODE_METRICS_INTERVAL, DEFAULT_NODE_METRICS_INTERVAL);
let resource_interval = get_interval(ENV_RESOURCE_METRICS_INTERVAL, DEFAULT_RESOURCE_METRICS_INTERVAL);
@@ -294,6 +320,25 @@ pub fn init_metrics_collectors(token: CancellationToken) {
}
});
// Spawn task for bucket replication bandwidth metrics
let token_clone = token.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(bucket_replication_bandwidth_interval);
loop {
tokio::select! {
_ = interval.tick() => {
let stats = collect_bucket_replication_bandwidth_stats();
let metrics = collect_bucket_replication_bandwidth_metrics(&stats);
report_metrics(&metrics);
}
_ = token_clone.cancelled() => {
warn!("Metrics collection for bucket replication bandwidth stats cancelled.");
return;
}
}
}
});
// Spawn task for resource metrics
let token_clone = token.clone();
tokio::spawn(async move {

View File

@@ -19,6 +19,7 @@
//!
//! - [`cluster`]: Cluster-wide capacity and object statistics
//! - [`bucket`]: Per-bucket usage and quota metrics
//! - [`bucket_replication`]: Per-target replication bandwidth metrics
//! - [`node`]: Per-node disk capacity and health metrics
//! - [`resource`]: System resource metrics (CPU, memory, uptime)
//!
@@ -61,12 +62,14 @@
//! ```
mod bucket;
mod bucket_replication;
mod cluster;
pub(crate) mod global;
mod node;
mod resource;
pub use bucket::{BucketStats, collect_bucket_metrics};
pub use bucket_replication::{BucketReplicationBandwidthStats, collect_bucket_replication_bandwidth_metrics};
pub use cluster::{ClusterStats, collect_cluster_metrics};
pub use global::init_metrics_collectors;
pub use node::{DiskStats, collect_node_metrics};

View File

@@ -40,3 +40,8 @@ pub const DEFAULT_NODE_METRICS_INTERVAL: Duration = Duration::from_secs(60);
pub const ENV_RESOURCE_METRICS_INTERVAL: &str = "RUSTFS_METRICS_RESOURCE_INTERVAL_SEC";
/// Default interval for collecting system resource metrics (CPU, memory).
pub const DEFAULT_RESOURCE_METRICS_INTERVAL: Duration = Duration::from_secs(15);
/// Environment variable key for replication bandwidth metrics interval (seconds).
pub const ENV_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL: &str = "RUSTFS_METRICS_BUCKET_REPLICATION_BANDWIDTH_INTERVAL_SEC";
/// Default interval for collecting replication bandwidth metrics.
pub const DEFAULT_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL: Duration = Duration::from_secs(30);

View File

@@ -190,6 +190,24 @@ pub static BUCKET_REPL_TOTAL_FAILED_COUNT_MD: LazyLock<MetricDescriptor> = LazyL
)
});
pub static BUCKET_REPL_BANDWIDTH_LIMIT_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {
new_gauge_md(
MetricName::BandwidthLimitBytesPerSecond,
"Configured bandwidth limit for replication in bytes per second",
&[BUCKET_L, TARGET_ARN_L],
subsystems::BUCKET_REPLICATION,
)
});
pub static BUCKET_REPL_BANDWIDTH_CURRENT_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {
new_gauge_md(
MetricName::BandwidthCurrentBytesPerSecond,
"Current replication bandwidth in bytes per second (EWMA)",
&[BUCKET_L, TARGET_ARN_L],
subsystems::BUCKET_REPLICATION,
)
});
// TODO - add a metric for the number of DELETE requests proxied to replication target
pub static BUCKET_REPL_PROXIED_DELETE_TAGGING_REQUESTS_FAILURES_MD: LazyLock<MetricDescriptor> = LazyLock::new(|| {
new_counter_md(

View File

@@ -263,6 +263,8 @@ pub enum MetricName {
ReplicationMaxQueuedCount,
ReplicationMaxDataTransferRate,
ReplicationRecentBacklogCount,
BandwidthLimitBytesPerSecond,
BandwidthCurrentBytesPerSecond,
// Scanner-related metrics
ScannerBucketScansFinished,
@@ -580,6 +582,8 @@ impl MetricName {
Self::ReplicationMaxQueuedCount => "max_queued_count".to_string(),
Self::ReplicationMaxDataTransferRate => "max_data_transfer_rate".to_string(),
Self::ReplicationRecentBacklogCount => "recent_backlog_count".to_string(),
Self::BandwidthLimitBytesPerSecond => "bandwidth_limit_bytes_per_second".to_string(),
Self::BandwidthCurrentBytesPerSecond => "bandwidth_current_bytes_per_second".to_string(),
// Scanner-related metrics
Self::ScannerBucketScansFinished => "bucket_scans_finished".to_string(),

View File

@@ -28,6 +28,7 @@ documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/"
[dependencies]
rustfs-config = { workspace = true, features = ["notify", "constants"] }
rustfs-ecstore = { workspace = true }
rustfs-s3-common = { workspace = true }
rustfs-targets = { workspace = true }
rustfs-utils = { workspace = true }
arc-swap = { workspace = true }

View File

@@ -24,7 +24,7 @@ use rustfs_config::{
use rustfs_ecstore::config::{Config, KV, KVS};
use rustfs_notify::{BucketNotificationConfig, Event, NotificationError};
use rustfs_notify::{initialize, notification_system};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -24,7 +24,7 @@ use rustfs_config::{
use rustfs_ecstore::config::{Config, KV, KVS};
use rustfs_notify::{BucketNotificationConfig, Event, NotificationError};
use rustfs_notify::{initialize, notification_system};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -14,7 +14,7 @@
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use serde::{Deserialize, Serialize};
use url::form_urlencoded;

View File

@@ -14,7 +14,8 @@
use crate::{BucketNotificationConfig, Event, EventArgs, LifecycleError, NotificationError, NotificationSystem};
use rustfs_ecstore::config::Config;
use rustfs_targets::{EventName, arn::TargetID};
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::{Arc, OnceLock};
use tracing::error;

Some files were not shown because too many files have changed in this diff Show More