Compare commits

...

48 Commits

Author SHA1 Message Date
heihutu
76fa86fdc5 feat(server): optimize http transport and socket configuration for S3… (#1537)
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-17 02:53:24 +08:00
LeonWang0735
2ab6f8c029 fix:correctly handle compress object when put object (#1534) 2026-01-16 23:11:48 +08:00
weisd
0927f937a7 fix: Fix BitrotWriter encode writer implementation (#1531) 2026-01-16 17:11:54 +08:00
Audric
548a39ffe7 fix: return error instead of silently ignoring invalid ARNs in notification config (#1528)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 16:12:55 +08:00
LeonWang0735
ed4329d50c fix:correctly handle copy object (#1512)
Co-authored-by: loverustfs <hello@rustfs.com>
2026-01-16 10:07:48 +08:00
LeonWang0735
18b22eedd9 Fix:correctly handle versioning obj (#1521) 2026-01-16 08:12:05 +08:00
GatewayJ
55e4cdec5d feat: add Cors (#1496)
Signed-off-by: GatewayJ <835269233@qq.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-15 20:03:26 +08:00
houseme
dceb7aac8a upgrade s3s from 0.13.0-alpha.1 to 0.13.0-alpha.2 (#1518) 2026-01-15 17:18:54 +08:00
GatewayJ
e3a7eb2d3d fix: standart policy format (#1508) 2026-01-15 15:33:22 +08:00
majinghe
1e683f12ef fix: change health check statement to fix unhealthy issue for docker … (#1515) 2026-01-15 11:29:45 +08:00
houseme
6a63fba5c2 chore(deps): bump crc-fast, chrono, aws-smithy-types, ssh-key (#1513) 2026-01-15 10:51:14 +08:00
houseme
df502f2ac6 chore(deps): bump multiple dependencies (#1510) 2026-01-15 00:57:04 +08:00
安正超
cb53ee13cd fix: handle copy_source_if_match in copy_object for S3 compatibility (#1408)
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-14 21:09:13 +08:00
Arthur Darcet
6928221b56 In the PVC definition, skip the storageClassName attr if null/empty (#1498)
Signed-off-by: Arthur Darcet <arthur.darcet@mistral.ai>
Co-authored-by: majinghe <42570491+majinghe@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-14 20:18:00 +08:00
houseme
2d58eea702 fix: exclude matching key from ListObjects results when using marker/startAfter (#1506) 2026-01-14 19:21:51 +08:00
houseme
109ca7a100 perf(utils): optimize User-Agent generation and platform detection (#1504) 2026-01-14 18:08:02 +08:00
Jasper Weyne
15e6d4dbd0 feat: add support for existing gateways in helm chart (#1469)
Co-authored-by: loverustfs <hello@rustfs.com>
2026-01-14 17:54:37 +08:00
Jan S
68c5c0b834 Use POSIX statvfs, since statfs is not designed to be portable (#1495) 2026-01-14 16:03:32 +08:00
houseme
27480f7625 Refactor Event Admin Handlers and Parallelize Target Status Probes (#1501) 2026-01-14 14:18:02 +08:00
houseme
f795299d53 Optimization and collation of dependencies introduction processing (#1493) 2026-01-13 15:02:54 +08:00
houseme
650fae71fb Remove the rustfs/console/config.json route (#1487) 2026-01-13 10:15:41 +08:00
houseme
dc76e4472e Fix object tagging functionality issues #1415 (#1485) 2026-01-13 01:11:50 +08:00
houseme
b5140f0098 build(deps): bump tracing-opentelemetry and flate2 version (#1484)
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-01-12 23:53:31 +08:00
LeonWang0735
5f2e594480 fix:handle null version ID in delete and return version_id in get_object (#1479)
Signed-off-by: houseme <housemecn@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-12 22:02:09 +08:00
houseme
bec51bb783 fix: return 404 for HEAD requests on non-existent objects in TLS (#1480) 2026-01-12 19:30:59 +08:00
houseme
1fad8167af dependency name ignore for object_store (#1481) 2026-01-12 19:13:37 +08:00
weisd
f0da8ce216 fix: avoid unwrap() panic in delete_prefix parsing (#1476)
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-12 13:26:01 +08:00
houseme
f9d3a908f0 Refactor:replace jsonwebtoken feature from rust_crypto to aws_lc_rs (#1474)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-01-12 12:25:02 +08:00
yxrxy
29d86036b1 feat: implement bucket quota system (#1461)
Signed-off-by: yxrxy <1532529704@qq.com>
Co-authored-by: loverustfs <hello@rustfs.com>
2026-01-12 11:42:07 +08:00
weisd
78b13f3ff2 fix: add delete prefix option support (#1471) 2026-01-12 11:19:09 +08:00
houseme
760cb1d734 Fix Windows Path Separator Handling in rustfs_utils (#1464)
Co-authored-by: reatang <tangtang1251@qq.com>
2026-01-11 19:53:51 +08:00
houseme
6b2eebee1d fix: Remove secret and signature from the log (#1466) 2026-01-11 17:45:16 +08:00
houseme
ddaa9e35ea fix(http): Fix console bucket management functionality failure caused by RUSTFS_SERVER_DOMAINS (#1467) 2026-01-11 16:47:51 +08:00
loverustfs
703d961168 fix: honor bucket policy for authenticated users (#1460)
Co-authored-by: GatewayJ <835269233@qq.com>
2026-01-10 20:01:28 +08:00
loverustfs
e614e530cf Modify ahead images url 2026-01-10 16:12:40 +08:00
loverustfs
00119548d2 Ahead 2026-01-10 16:11:11 +08:00
GatewayJ
d532c7c972 feat: object-list access (#1457)
Signed-off-by: loverustfs <github@rustfs.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: loverustfs <github@rustfs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-01-10 10:11:08 +08:00
houseme
04f441361e replace winapi to windows crate (#1455) 2026-01-10 02:15:08 +08:00
mkrueger92
9e162b6e9e Default to helm chart version for docker image and not latest (#1385)
Signed-off-by: mkrueger92 <7305571+mkrueger92@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-08 21:16:00 +08:00
majinghe
900f7724b8 add gateway api support due to ingress nginx retirement (#1432)
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-08 20:57:55 +08:00
majinghe
4f5653e656 add upgrade strategy for standalone mode (#1431) 2026-01-08 20:44:16 +08:00
houseme
a95e549430 Fix/fix improve for audit (#1418) 2026-01-07 18:05:52 +08:00
weisd
00f3275603 rm online check (#1416) 2026-01-07 13:42:03 +08:00
weisd
359c9d2d26 Enhance Object Version Management and Replication Status Handling (#1413) 2026-01-07 10:44:35 +08:00
weisd
3ce99939a3 fix: improve memory ordering for disk health tracker (#1412) 2026-01-06 23:59:08 +08:00
Jan S
02f809312b Fix windows missing default backlog (#1405)
Co-authored-by: houseme <housemecn@gmail.com>
2026-01-06 23:41:12 +08:00
GatewayJ
356dc7e0c2 feat: Add permission verification for account creation (#1401)
Co-authored-by: loverustfs <hello@rustfs.com>
2026-01-06 21:47:18 +08:00
安正超
e4ad86ada6 test(s3): add 9 delimiter list tests to implemented tests (#1410) 2026-01-06 21:13:39 +08:00
133 changed files with 6263 additions and 2232 deletions

View File

@@ -26,6 +26,9 @@ updates:
day: "monday"
timezone: "Asia/Shanghai"
time: "08:00"
ignore:
- dependency-name: "object_store"
versions: [ "0.13.x" ]
groups:
s3s:
update-types:
@@ -36,4 +39,4 @@ updates:
- "s3s-*"
dependencies:
patterns:
- "*"
- "*"

View File

@@ -159,8 +159,8 @@ jobs:
uses: taiki-e/cache-cargo-install-action@v2
with:
tool: s3s-e2e
git: https://github.com/Nugine/s3s.git
rev: 9e41304ed549b89cfb03ede98e9c0d2ac7522051
git: https://github.com/s3s-project/s3s.git
rev: 4a04a670cf41274d9be9ab65dc36f4aa3f92fbad
- name: Build debug binary
run: |

View File

@@ -44,7 +44,6 @@ jobs:
set -x
old_version=$(grep "^appVersion:" helm/rustfs/Chart.yaml | awk '{print $2}')
sed -i "s/$old_version/$new_version/g" helm/rustfs/Chart.yaml
sed -i "/^image:/,/^[^ ]/ s/tag:.*/tag: "$new_version"/" helm/rustfs/values.yaml
- name: Set up Helm
uses: azure/setup-helm@v4.3.0

1234
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -50,7 +50,7 @@ resolver = "2"
edition = "2024"
license = "Apache-2.0"
repository = "https://github.com/rustfs/rustfs"
rust-version = "1.88"
rust-version = "1.90"
version = "0.0.5"
homepage = "https://rustfs.com"
description = "RustFS is a high-performance distributed object storage software built using Rust, one of the most popular languages worldwide. "
@@ -96,7 +96,7 @@ rustfs-zip = { path = "./crates/zip", version = "0.0.5" }
# Async Runtime and Networking
async-channel = "2.5.0"
async-compression = { version = "0.4.19" }
async-compression = { version = "0.4.37" }
async-recursion = "1.1.1"
async-trait = "0.1.89"
axum = "0.8.8"
@@ -121,7 +121,7 @@ tokio-util = { version = "0.7.18", features = ["io", "compat"] }
tonic = { version = "0.14.2", features = ["gzip"] }
tonic-prost = { version = "0.14.2" }
tonic-prost-build = { version = "0.14.2" }
tower = { version = "0.5.2", features = ["timeout"] }
tower = { version = "0.5.3", features = ["timeout"] }
tower-http = { version = "0.6.8", features = ["cors"] }
# Serialization and Data Formats
@@ -130,27 +130,27 @@ bytesize = "2.3.1"
byteorder = "1.5.0"
flatbuffers = "25.12.19"
form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.4"
rmcp = { version = "0.12.0" }
prost = "0.14.3"
quick-xml = "0.39.0"
rmcp = { version = "0.13.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.148", features = ["raw_value"] }
serde_json = { version = "1.0.149", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.2.0"
# Cryptography and Security
aes-gcm = { version = "0.11.0-rc.2", features = ["rand_core"] }
argon2 = { version = "0.6.0-rc.5" }
blake3 = { version = "1.8.2", features = ["rayon", "mmap"] }
blake3 = { version = "1.8.3", features = ["rayon", "mmap"] }
chacha20poly1305 = { version = "0.11.0-rc.2" }
crc-fast = "1.6.0"
crc-fast = "1.9.0"
hmac = { version = "0.13.0-rc.3" }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
pbkdf2 = "0.13.0-rc.5"
rsa = { version = "0.10.0-rc.11" }
rustls = { version = "0.23.35" }
jsonwebtoken = { version = "10.2.0", features = ["aws_lc_rs"] }
pbkdf2 = "0.13.0-rc.7"
rsa = { version = "0.10.0-rc.12" }
rustls = { version = "0.23.36", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] }
rustls-pemfile = "2.2.0"
rustls-pki-types = "1.13.2"
sha1 = "0.11.0-rc.3"
@@ -159,9 +159,9 @@ subtle = "2.6"
zeroize = { version = "1.8.2", features = ["derive"] }
# Time and Date
chrono = { version = "0.4.42", features = ["serde"] }
chrono = { version = "0.4.43", features = ["serde"] }
humantime = "2.3.0"
time = { version = "0.3.44", features = ["std", "parsing", "formatting", "macros", "serde"] }
time = { version = "0.3.45", features = ["std", "parsing", "formatting", "macros", "serde"] }
# Utilities and Tools
anyhow = "1.0.100"
@@ -171,8 +171,8 @@ atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.12" }
aws-credential-types = { version = "1.2.11" }
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-smithy-types = { version = "1.3.5" }
aws-sdk-s3 = { version = "1.120.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-smithy-types = { version = "1.3.6" }
base64 = "0.22.1"
base64-simd = "0.8.0"
brotli = "8.0.2"
@@ -182,24 +182,25 @@ const-str = { version = "1.0.0", features = ["std", "proc"] }
convert_case = "0.10.0"
criterion = { version = "0.8", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "51.0.0"
datafusion = "52.0.0"
derive_builder = "0.20.2"
dunce = "1.0.5"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.5"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
flate2 = "1.1.8"
flexi_logger = { version = "0.31.8", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
glob = "0.3.3"
google-cloud-storage = "1.5.0"
google-cloud-auth = "1.3.0"
google-cloud-storage = "1.6.0"
google-cloud-auth = "1.4.0"
hashbrown = { version = "0.16.1", features = ["serde", "rayon"] }
heed = { version = "0.22.0" }
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
ipnetwork = { version = "0.21.1", features = ["serde"] }
lazy_static = "1.5.0"
libc = "0.2.179"
libc = "0.2.180"
libsystemd = "0.7.2"
local-ip-address = "0.6.8"
local-ip-address = "0.6.9"
lz4 = "1.28.1"
matchit = "0.9.1"
md-5 = "0.11.0-rc.3"
@@ -222,9 +223,9 @@ rayon = "1.11.0"
reed-solomon-simd = { version = "3.1.0" }
regex = { version = "1.12.2" }
rumqttc = { version = "0.25.1" }
rust-embed = { version = "8.9.0" }
rust-embed = { version = "8.11.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.13.0-alpha", features = ["minio"], git = "https://github.com/s3s-project/s3s.git", branch = "main" }
s3s = { version = "0.13.0-alpha.2", features = ["minio"] }
serial_test = "3.3.1"
shadow-rs = { version = "1.5.0", default-features = false }
siphasher = "1.0.1"
@@ -242,18 +243,18 @@ thiserror = "2.0.17"
tracing = { version = "0.1.44" }
tracing-appender = "0.2.4"
tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.7"
url = "2.5.8"
urlencoding = "2.1.3"
uuid = { version = "1.19.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.6.1", features = ["serde"] }
winapi = { version = "0.3.9" }
windows = { version = "0.62.2" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "7.0.0"
zip = "7.1.0"
zstd = "0.13.3"
# Observability and Metrics
@@ -269,8 +270,8 @@ opentelemetry-stdout = { version = "0.31.0" }
libunftp = "0.21.0"
russh = { version = "0.56.0", features = ["aws-lc-rs", "rsa"], default-features = false }
russh-sftp = "2.1.1"
ssh-key = { version = "0.7.0-rc.4", features = ["std", "rsa", "ed25519"] }
suppaftp = { version = "7.0.7", features = ["tokio", "tokio-rustls", "rustls"] }
ssh-key = { version = "0.7.0-rc.6", features = ["std", "rsa", "ed25519"] }
suppaftp = { version = "7.1.0", features = ["tokio", "tokio-rustls", "rustls"] }
rcgen = "0.14.6"
# Performance Analysis and Memory Profiling

View File

@@ -1,4 +1,4 @@
FROM alpine:3.22 AS build
FROM alpine:3.23 AS build
ARG TARGETARCH
ARG RELEASE=latest
@@ -40,7 +40,7 @@ RUN set -eux; \
rm -rf rustfs.zip /build/.tmp || true
FROM alpine:3.22
FROM alpine:3.23
ARG RELEASE=latest
ARG BUILD_DATE

View File

@@ -16,7 +16,7 @@ ARG BUILDPLATFORM
# -----------------------------
# Build stage
# -----------------------------
FROM rust:1.88-bookworm AS builder
FROM rust:1.91-trixie AS builder
# Re-declare args after FROM
ARG TARGETPLATFORM

View File

@@ -83,6 +83,13 @@ Unlike other storage systems, RustFS is released under the permissible Apache 2.
| **Edge & IoT** | **Strong Edge Support**<br>Ideal for secure, innovative edge devices. | **Weak Edge Support**<br>Often too heavy for edge gateways. |
| **Risk Profile** | **Enterprise Risk Mitigation**<br>Clear IP rights and safe for commercial use. | **Legal Risks**<br>Intellectual property ambiguity and usage restrictions. |
## Staying ahead
Star RustFS on GitHub and be instantly notified of new releases.
<img src="https://github.com/user-attachments/assets/7ee40bb4-3e46-4eac-b0d0-5fbeb85ff8f3" />
## Quickstart
To get started with RustFS, follow these steps:

View File

@@ -86,6 +86,15 @@ RustFS 是一个基于 Rust 构建的高性能分布式对象存储系统。Rust
| **成本** | **稳定且免费**<br>免费社区支持,稳定的商业定价。 | **高昂成本**<br>1PiB 的成本可能高达 250,000 美元。 |
| **风险控制** | **企业级风险规避**<br>清晰的知识产权,商业使用安全无忧。 | **法律风险**<br>知识产权归属模糊及使用限制风险。 |
## 保持领先
在 GitHub 上为 RustFS 点赞,即可第一时间收到新版本发布通知。
<img src="https://github.com/user-attachments/assets/7ee40bb4-3e46-4eac-b0d0-5fbeb85ff8f3" />
## 快速开始
请按照以下步骤快速上手 RustFS

View File

@@ -306,7 +306,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
versions_count = versions_count.saturating_add(1);
if latest_file_info.is_none()
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false)
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false, false)
{
latest_file_info = Some(info);
}

View File

@@ -21,6 +21,7 @@ use futures::stream::FuturesUnordered;
use hashbrown::{HashMap, HashSet};
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, audit::AUDIT_ROUTE_PREFIX};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::arn::TargetID;
use rustfs_targets::{Target, TargetError, target::ChannelTargetType};
use std::str::FromStr;
use std::sync::Arc;
@@ -392,4 +393,80 @@ impl AuditRegistry {
Ok(())
}
/// Creates a unique key for a target based on its type and ID
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `String` - The unique key for the target.
pub fn create_key(&self, target_type: &str, target_id: &str) -> String {
let key = TargetID::new(target_id.to_string(), target_type.to_string());
info!(target_type = %target_type, "Create key for {}", key);
key.to_string()
}
/// Enables a target (placeholder, assumes target exists)
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn enable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
if self.get_target(&key).is_some() {
info!("Target {}-{} enabled", target_type, target_id);
Ok(())
} else {
Err(AuditError::Configuration(
format!("Target not found: {}-{}", target_type, target_id),
None,
))
}
}
/// Disables a target (placeholder, assumes target exists)
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn disable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
if self.get_target(&key).is_some() {
info!("Target {}-{} disabled", target_type, target_id);
Ok(())
} else {
Err(AuditError::Configuration(
format!("Target not found: {}-{}", target_type, target_id),
None,
))
}
}
/// Upserts a target into the registry
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
/// * `target` - The target instance to be upserted.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn upsert_target(
&mut self,
target_type: &str,
target_id: &str,
target: Box<dyn Target<AuditEntry> + Send + Sync>,
) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
self.targets.insert(key, target);
Ok(())
}
}

View File

@@ -274,9 +274,9 @@ impl AuditSystem {
drop(state);
let registry = self.registry.lock().await;
let target_ids = registry.list_targets();
let target_keys = registry.list_targets();
if target_ids.is_empty() {
if target_keys.is_empty() {
warn!("No audit targets configured for dispatch");
return Ok(());
}
@@ -284,22 +284,22 @@ impl AuditSystem {
// Dispatch to all targets concurrently
let mut tasks = Vec::new();
for target_id in target_ids {
if let Some(target) = registry.get_target(&target_id) {
for target_key in target_keys {
if let Some(target) = registry.get_target(&target_key) {
let entry_clone = Arc::clone(&entry);
let target_id_clone = target_id.clone();
let target_key_clone = target_key.clone();
// Create EntityTarget for the audit log entry
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
event_name: entry.event, // Default, should be derived from entry
data: (*entry_clone).clone(),
};
let task = async move {
let result = target.save(Arc::new(entity_target)).await;
(target_id_clone, result)
(target_key_clone, result)
};
tasks.push(task);
@@ -312,14 +312,14 @@ impl AuditSystem {
let mut errors = Vec::new();
let mut success_count = 0;
for (target_id, result) in results {
for (target_key, result) in results {
match result {
Ok(_) => {
success_count += 1;
observability::record_target_success();
}
Err(e) => {
error!(target_id = %target_id, error = %e, "Failed to dispatch audit log to target");
error!(target_id = %target_key, error = %e, "Failed to dispatch audit log to target");
errors.push(e);
observability::record_target_failure();
}
@@ -360,18 +360,18 @@ impl AuditSystem {
drop(state);
let registry = self.registry.lock().await;
let target_ids = registry.list_targets();
let target_keys = registry.list_targets();
if target_ids.is_empty() {
if target_keys.is_empty() {
warn!("No audit targets configured for batch dispatch");
return Ok(());
}
let mut tasks = Vec::new();
for target_id in target_ids {
if let Some(target) = registry.get_target(&target_id) {
for target_key in target_keys {
if let Some(target) = registry.get_target(&target_key) {
let entries_clone: Vec<_> = entries.iter().map(Arc::clone).collect();
let target_id_clone = target_id.clone();
let target_key_clone = target_key.clone();
let task = async move {
let mut success_count = 0;
@@ -380,7 +380,7 @@ impl AuditSystem {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: rustfs_targets::EventName::ObjectCreatedPut,
event_name: entry.event,
data: (*entry).clone(),
};
match target.save(Arc::new(entity_target)).await {
@@ -388,7 +388,7 @@ impl AuditSystem {
Err(e) => errors.push(e),
}
}
(target_id_clone, success_count, errors)
(target_key_clone, success_count, errors)
};
tasks.push(task);
}
@@ -418,6 +418,7 @@ impl AuditSystem {
}
/// Starts the audit stream processing for a target with batching and retry logic
///
/// # Arguments
/// * `store` - The store from which to read audit entries
/// * `target` - The target to which audit entries will be sent
@@ -501,7 +502,7 @@ impl AuditSystem {
/// Enables a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to enable
/// * `target_id` - The ID of the target to enable, TargetID to string
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -520,7 +521,7 @@ impl AuditSystem {
/// Disables a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to disable
/// * `target_id` - The ID of the target to disable, TargetID to string
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -539,7 +540,7 @@ impl AuditSystem {
/// Removes a target from the system
///
/// # Arguments
/// * `target_id` - The ID of the target to remove
/// * `target_id` - The ID of the target to remove, TargetID to string
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -559,7 +560,7 @@ impl AuditSystem {
/// Updates or inserts a target
///
/// # Arguments
/// * `target_id` - The ID of the target to upsert
/// * `target_id` - The ID of the target to upsert, TargetID to string
/// * `target` - The target instance to insert or update
///
/// # Returns
@@ -596,7 +597,7 @@ impl AuditSystem {
/// Gets information about a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to retrieve
/// * `target_id` - The ID of the target to retrieve, TargetID to string
///
/// # Returns
/// * `Option<String>` - Target ID if found

View File

@@ -96,6 +96,11 @@ pub enum Metric {
ApplyNonCurrent,
HealAbandonedVersion,
// Quota metrics:
QuotaCheck,
QuotaViolation,
QuotaSync,
// START Trace metrics:
StartTrace,
ScanObject, // Scan object. All operations included.
@@ -131,6 +136,9 @@ impl Metric {
Self::CleanAbandoned => "clean_abandoned",
Self::ApplyNonCurrent => "apply_non_current",
Self::HealAbandonedVersion => "heal_abandoned_version",
Self::QuotaCheck => "quota_check",
Self::QuotaViolation => "quota_violation",
Self::QuotaSync => "quota_sync",
Self::StartTrace => "start_trace",
Self::ScanObject => "scan_object",
Self::HealAbandonedObject => "heal_abandoned_object",
@@ -163,15 +171,18 @@ impl Metric {
10 => Some(Self::CleanAbandoned),
11 => Some(Self::ApplyNonCurrent),
12 => Some(Self::HealAbandonedVersion),
13 => Some(Self::StartTrace),
14 => Some(Self::ScanObject),
15 => Some(Self::HealAbandonedObject),
16 => Some(Self::LastRealtime),
17 => Some(Self::ScanFolder),
18 => Some(Self::ScanCycle),
19 => Some(Self::ScanBucketDrive),
20 => Some(Self::CompactFolder),
21 => Some(Self::Last),
13 => Some(Self::QuotaCheck),
14 => Some(Self::QuotaViolation),
15 => Some(Self::QuotaSync),
16 => Some(Self::StartTrace),
17 => Some(Self::ScanObject),
18 => Some(Self::HealAbandonedObject),
19 => Some(Self::LastRealtime),
20 => Some(Self::ScanFolder),
21 => Some(Self::ScanCycle),
22 => Some(Self::ScanBucketDrive),
23 => Some(Self::CompactFolder),
24 => Some(Self::Last),
_ => None,
}
}

View File

@@ -168,7 +168,7 @@ pub const DEFAULT_OBS_LOG_STDOUT_ENABLED: bool = false;
pub const KI_B: usize = 1024;
/// Constant representing 1 Mebibyte (1024 * 1024 bytes)
/// Default value: 1048576
pub const MI_B: usize = 1024 * 1024;
pub const MI_B: usize = 1024 * KI_B;
#[cfg(test)]
mod tests {

View File

@@ -21,6 +21,7 @@ pub(crate) mod heal;
pub(crate) mod object;
pub(crate) mod profiler;
pub(crate) mod protocols;
pub(crate) mod quota;
pub(crate) mod runtime;
pub(crate) mod targets;
pub(crate) mod tls;

View File

@@ -0,0 +1,26 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub const QUOTA_CONFIG_FILE: &str = "quota.json";
pub const QUOTA_TYPE_HARD: &str = "HARD";
pub const QUOTA_EXCEEDED_ERROR_CODE: &str = "XRustfsQuotaExceeded";
pub const QUOTA_INVALID_CONFIG_ERROR_CODE: &str = "InvalidArgument";
pub const QUOTA_NOT_FOUND_ERROR_CODE: &str = "NoSuchBucket";
pub const QUOTA_INTERNAL_ERROR_CODE: &str = "InternalError";
pub const QUOTA_API_PATH: &str = "/rustfs/admin/v3/quota/{bucket}";
pub const QUOTA_INVALID_TYPE_ERROR_MSG: &str = "Only HARD quota type is supported";
pub const QUOTA_METADATA_SYSTEM_ERROR_MSG: &str = "Bucket metadata system not initialized";

View File

@@ -33,6 +33,8 @@ pub use constants::profiler::*;
#[cfg(feature = "constants")]
pub use constants::protocols::*;
#[cfg(feature = "constants")]
pub use constants::quota::*;
#[cfg(feature = "constants")]
pub use constants::runtime::*;
#[cfg(feature = "constants")]
pub use constants::targets::*;

View File

@@ -0,0 +1,155 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Regression test for Issue #1423
//! Verifies that Bucket Policies are honored for Authenticated Users.
use crate::common::{RustFSTestEnvironment, init_logging};
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::{Client, Config};
use serial_test::serial;
use tracing::info;
async fn create_user(
env: &RustFSTestEnvironment,
username: &str,
password: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let create_user_body = serde_json::json!({
"secretKey": password,
"status": "enabled"
})
.to_string();
let create_user_url = format!("{}/rustfs/admin/v3/add-user?accessKey={}", env.url, username);
crate::common::awscurl_put(&create_user_url, &create_user_body, &env.access_key, &env.secret_key).await?;
Ok(())
}
fn create_user_client(env: &RustFSTestEnvironment, access_key: &str, secret_key: &str) -> Client {
let credentials = Credentials::new(access_key, secret_key, None, None, "test-user");
let config = Config::builder()
.credentials_provider(credentials)
.region(Region::new("us-east-1"))
.endpoint_url(&env.url)
.force_path_style(true)
.behavior_version_latest()
.build();
Client::from_conf(config)
}
#[tokio::test]
#[serial]
async fn test_bucket_policy_authenticated_user() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
info!("Starting test_bucket_policy_authenticated_user...");
let mut env = RustFSTestEnvironment::new().await?;
env.start_rustfs_server(vec![]).await?;
let admin_client = env.create_s3_client();
let bucket_name = "bucket-policy-auth-test";
let object_key = "test-object.txt";
let user_access = "testuser";
let user_secret = "testpassword";
// 1. Create Bucket (Admin)
admin_client.create_bucket().bucket(bucket_name).send().await?;
// 2. Create User (Admin API)
create_user(&env, user_access, user_secret).await?;
// 3. Create User Client
let user_client = create_user_client(&env, user_access, user_secret);
// 4. Verify Access Denied initially (No Policy)
let result = user_client.list_objects_v2().bucket(bucket_name).send().await;
if result.is_ok() {
return Err("Should be Access Denied initially".into());
}
// 5. Apply Bucket Policy Allowed User
let policy_json = serde_json::json!({
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowTestUser",
"Effect": "Allow",
"Principal": {
"AWS": [user_access]
},
"Action": [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
format!("arn:aws:s3:::{}", bucket_name),
format!("arn:aws:s3:::{}/*", bucket_name)
]
}
]
})
.to_string();
admin_client
.put_bucket_policy()
.bucket(bucket_name)
.policy(&policy_json)
.send()
.await?;
// 6. Verify Access Allowed (With Bucket Policy)
info!("Verifying PutObject...");
user_client
.put_object()
.bucket(bucket_name)
.key(object_key)
.body(aws_sdk_s3::primitives::ByteStream::from_static(b"hello world"))
.send()
.await
.map_err(|e| format!("PutObject failed: {}", e))?;
info!("Verifying ListObjects...");
let list_res = user_client
.list_objects_v2()
.bucket(bucket_name)
.send()
.await
.map_err(|e| format!("ListObjects failed: {}", e))?;
assert_eq!(list_res.contents().len(), 1);
info!("Verifying GetObject...");
user_client
.get_object()
.bucket(bucket_name)
.key(object_key)
.send()
.await
.map_err(|e| format!("GetObject failed: {}", e))?;
info!("Verifying DeleteObject...");
user_client
.delete_object()
.bucket(bucket_name)
.key(object_key)
.send()
.await
.map_err(|e| format!("DeleteObject failed: {}", e))?;
info!("Test Passed!");
Ok(())
}

View File

@@ -176,12 +176,14 @@ impl RustFSTestEnvironment {
/// Kill any existing RustFS processes
pub async fn cleanup_existing_processes(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Cleaning up any existing RustFS processes");
let output = Command::new("pkill").args(["-f", "rustfs"]).output();
let binary_path = rustfs_binary_path();
let binary_name = binary_path.to_string_lossy();
let output = Command::new("pkill").args(["-f", &binary_name]).output();
if let Ok(output) = output
&& output.status.success()
{
info!("Killed existing RustFS processes");
info!("Killed existing RustFS processes: {}", binary_name);
sleep(Duration::from_millis(1000)).await;
}
Ok(())
@@ -363,3 +365,12 @@ pub async fn awscurl_put(
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
execute_awscurl(url, "PUT", Some(body), access_key, secret_key).await
}
/// Helper function for DELETE requests
pub async fn awscurl_delete(
url: &str,
access_key: &str,
secret_key: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
execute_awscurl(url, "DELETE", None, access_key, secret_key).await
}

View File

@@ -29,6 +29,13 @@ mod data_usage_test;
#[cfg(test)]
mod kms;
// Quota tests
#[cfg(test)]
mod quota_test;
#[cfg(test)]
mod bucket_policy_check_test;
// Special characters in path test modules
#[cfg(test)]
mod special_chars_test;

View File

@@ -0,0 +1,798 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::common::{RustFSTestEnvironment, awscurl_delete, awscurl_get, awscurl_post, awscurl_put, init_logging};
use aws_sdk_s3::Client;
use serial_test::serial;
use tracing::{debug, info};
/// Test environment setup for quota tests
pub struct QuotaTestEnv {
pub env: RustFSTestEnvironment,
pub client: Client,
pub bucket_name: String,
}
impl QuotaTestEnv {
pub async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let bucket_name = format!("quota-test-{}", uuid::Uuid::new_v4());
let mut env = RustFSTestEnvironment::new().await?;
env.start_rustfs_server(vec![]).await?;
let client = env.create_s3_client();
Ok(Self {
env,
client,
bucket_name,
})
}
pub async fn create_bucket(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.env.create_test_bucket(&self.bucket_name).await?;
Ok(())
}
pub async fn cleanup_bucket(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let objects = self.client.list_objects_v2().bucket(&self.bucket_name).send().await?;
for object in objects.contents() {
self.client
.delete_object()
.bucket(&self.bucket_name)
.key(object.key().unwrap_or_default())
.send()
.await?;
}
self.env.delete_test_bucket(&self.bucket_name).await?;
Ok(())
}
pub async fn set_bucket_quota(&self, quota_bytes: u64) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, self.bucket_name);
let quota_config = serde_json::json!({
"quota": quota_bytes,
"quota_type": "HARD"
});
let response = awscurl_put(&url, &quota_config.to_string(), &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to set quota: {}", response).into())
} else {
Ok(())
}
}
pub async fn get_bucket_quota(&self) -> Result<Option<u64>, Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, self.bucket_name);
let response = awscurl_get(&url, &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to get quota: {}", response).into())
} else {
let quota_info: serde_json::Value = serde_json::from_str(&response)?;
Ok(quota_info.get("quota").and_then(|v| v.as_u64()))
}
}
pub async fn clear_bucket_quota(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, self.bucket_name);
let response = awscurl_delete(&url, &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to clear quota: {}", response).into())
} else {
Ok(())
}
}
pub async fn get_bucket_quota_stats(&self) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/rustfs/admin/v3/quota-stats/{}", self.env.url, self.bucket_name);
let response = awscurl_get(&url, &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to get quota stats: {}", response).into())
} else {
Ok(serde_json::from_str(&response)?)
}
}
pub async fn check_bucket_quota(
&self,
operation_type: &str,
operation_size: u64,
) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/rustfs/admin/v3/quota-check/{}", self.env.url, self.bucket_name);
let check_request = serde_json::json!({
"operation_type": operation_type,
"operation_size": operation_size
});
let response = awscurl_post(&url, &check_request.to_string(), &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to check quota: {}", response).into())
} else {
Ok(serde_json::from_str(&response)?)
}
}
pub async fn upload_object(&self, key: &str, size_bytes: usize) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let data = vec![0u8; size_bytes];
self.client
.put_object()
.bucket(&self.bucket_name)
.key(key)
.body(aws_sdk_s3::primitives::ByteStream::from(data))
.send()
.await?;
Ok(())
}
pub async fn object_exists(&self, key: &str) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
match self.client.head_object().bucket(&self.bucket_name).key(key).send().await {
Ok(_) => Ok(true),
Err(e) => {
// Check for any 404-related errors and return false instead of propagating
let error_str = e.to_string();
if error_str.contains("404") || error_str.contains("Not Found") || error_str.contains("NotFound") {
Ok(false)
} else {
// Also check the error code directly
if let Some(service_err) = e.as_service_error()
&& service_err.is_not_found()
{
return Ok(false);
}
Err(e.into())
}
}
}
}
pub async fn get_bucket_usage(&self) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let stats = self.get_bucket_quota_stats().await?;
Ok(stats.get("current_usage").and_then(|v| v.as_u64()).unwrap_or(0))
}
pub async fn set_bucket_quota_for(
&self,
bucket: &str,
quota_bytes: u64,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, bucket);
let quota_config = serde_json::json!({
"quota": quota_bytes,
"quota_type": "HARD"
});
let response = awscurl_put(&url, &quota_config.to_string(), &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to set quota: {}", response).into())
} else {
Ok(())
}
}
/// Get bucket quota statistics for specific bucket
pub async fn get_bucket_quota_stats_for(
&self,
bucket: &str,
) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
debug!("Getting quota stats for bucket: {}", bucket);
let url = format!("{}/rustfs/admin/v3/quota-stats/{}", self.env.url, bucket);
let response = awscurl_get(&url, &self.env.access_key, &self.env.secret_key).await?;
if response.contains("error") {
Err(format!("Failed to get quota stats: {}", response).into())
} else {
let stats: serde_json::Value = serde_json::from_str(&response)?;
Ok(stats)
}
}
/// Upload an object to specific bucket
pub async fn upload_object_to_bucket(
&self,
bucket: &str,
key: &str,
size_bytes: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
debug!("Uploading object {} with size {} bytes to bucket {}", key, size_bytes, bucket);
let data = vec![0u8; size_bytes];
self.client
.put_object()
.bucket(bucket)
.key(key)
.body(aws_sdk_s3::primitives::ByteStream::from(data))
.send()
.await?;
info!("Successfully uploaded object: {} ({} bytes) to bucket: {}", key, size_bytes, bucket);
Ok(())
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
#[tokio::test]
#[serial]
async fn test_quota_basic_operations() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
// Create test bucket
env.create_bucket().await?;
// Set quota of 1MB
env.set_bucket_quota(1024 * 1024).await?;
// Verify quota is set
let quota = env.get_bucket_quota().await?;
assert_eq!(quota, Some(1024 * 1024));
// Upload a 512KB object (should succeed)
env.upload_object("test1.txt", 512 * 1024).await?;
assert!(env.object_exists("test1.txt").await?);
// Upload another 512KB object (should succeed, total 1MB)
env.upload_object("test2.txt", 512 * 1024).await?;
assert!(env.object_exists("test2.txt").await?);
// Try to upload 1KB more (should fail due to quota)
let upload_result = env.upload_object("test3.txt", 1024).await;
assert!(upload_result.is_err());
assert!(!env.object_exists("test3.txt").await?);
// Clean up
env.clear_bucket_quota().await?;
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_update_and_clear() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set initial quota
env.set_bucket_quota(512 * 1024).await?;
assert_eq!(env.get_bucket_quota().await?, Some(512 * 1024));
// Update quota to larger size
env.set_bucket_quota(2 * 1024 * 1024).await?;
assert_eq!(env.get_bucket_quota().await?, Some(2 * 1024 * 1024));
// Upload 1MB object (should succeed with new quota)
env.upload_object("large_file.txt", 1024 * 1024).await?;
assert!(env.object_exists("large_file.txt").await?);
// Clear quota
env.clear_bucket_quota().await?;
assert_eq!(env.get_bucket_quota().await?, None);
// Upload another large object (should succeed with no quota)
env.upload_object("unlimited_file.txt", 5 * 1024 * 1024).await?;
assert!(env.object_exists("unlimited_file.txt").await?);
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_delete_operations() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota of 1MB
env.set_bucket_quota(1024 * 1024).await?;
// Fill up to quota limit
env.upload_object("file1.txt", 512 * 1024).await?;
env.upload_object("file2.txt", 512 * 1024).await?;
// Delete one file
env.client
.delete_object()
.bucket(&env.bucket_name)
.key("file1.txt")
.send()
.await?;
assert!(!env.object_exists("file1.txt").await?);
// Now we should be able to upload again (quota freed up)
env.upload_object("file3.txt", 256 * 1024).await?;
assert!(env.object_exists("file3.txt").await?);
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_usage_tracking() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota
env.set_bucket_quota(2 * 1024 * 1024).await?;
// Upload some files
env.upload_object("file1.txt", 512 * 1024).await?;
env.upload_object("file2.txt", 256 * 1024).await?;
// Check usage
let usage = env.get_bucket_usage().await?;
assert_eq!(usage, (512 + 256) * 1024);
// Delete a file
env.client
.delete_object()
.bucket(&env.bucket_name)
.key("file1.txt")
.send()
.await?;
// Check updated usage
let updated_usage = env.get_bucket_usage().await?;
assert_eq!(updated_usage, 256 * 1024);
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_statistics() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota of 2MB
env.set_bucket_quota(2 * 1024 * 1024).await?;
// Upload files to use 1.5MB
env.upload_object("file1.txt", 1024 * 1024).await?;
env.upload_object("file2.txt", 512 * 1024).await?;
// Get detailed quota statistics
let stats = env.get_bucket_quota_stats().await?;
assert_eq!(stats.get("bucket").unwrap().as_str().unwrap(), env.bucket_name);
assert_eq!(stats.get("quota_limit").unwrap().as_u64().unwrap(), 2 * 1024 * 1024);
assert_eq!(stats.get("current_usage").unwrap().as_u64().unwrap(), (1024 + 512) * 1024);
assert_eq!(stats.get("remaining_quota").unwrap().as_u64().unwrap(), 512 * 1024);
let usage_percentage = stats.get("usage_percentage").unwrap().as_f64().unwrap();
assert!((usage_percentage - 75.0).abs() < 0.1);
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_check_api() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota of 1MB
env.set_bucket_quota(1024 * 1024).await?;
// Upload 512KB file
env.upload_object("existing_file.txt", 512 * 1024).await?;
// Check if we can upload another 512KB (should succeed, exactly fill quota)
let check_result = env.check_bucket_quota("PUT", 512 * 1024).await?;
assert!(check_result.get("allowed").unwrap().as_bool().unwrap());
assert_eq!(check_result.get("remaining_quota").unwrap().as_u64().unwrap(), 0);
// Note: we haven't actually uploaded the second file yet, so current_usage is still 512KB
// Check if we can upload 1KB (should succeed - we haven't used the full quota yet)
let check_result = env.check_bucket_quota("PUT", 1024).await?;
assert!(check_result.get("allowed").unwrap().as_bool().unwrap());
assert_eq!(check_result.get("remaining_quota").unwrap().as_u64().unwrap(), 512 * 1024 - 1024);
// Check if we can upload 600KB (should fail - would exceed quota)
let check_result = env.check_bucket_quota("PUT", 600 * 1024).await?;
assert!(!check_result.get("allowed").unwrap().as_bool().unwrap());
// Check delete operation (should always be allowed)
let check_result = env.check_bucket_quota("DELETE", 512 * 1024).await?;
assert!(check_result.get("allowed").unwrap().as_bool().unwrap());
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_multiple_buckets() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
// Create two buckets in the same environment
let bucket1 = format!("quota-test-{}-1", uuid::Uuid::new_v4());
let bucket2 = format!("quota-test-{}-2", uuid::Uuid::new_v4());
env.env.create_test_bucket(&bucket1).await?;
env.env.create_test_bucket(&bucket2).await?;
// Set different quotas for each bucket
env.set_bucket_quota_for(&bucket1, 1024 * 1024).await?; // 1MB
env.set_bucket_quota_for(&bucket2, 2 * 1024 * 1024).await?; // 2MB
// Fill first bucket to quota
env.upload_object_to_bucket(&bucket1, "big_file.txt", 1024 * 1024).await?;
// Should still be able to upload to second bucket
env.upload_object_to_bucket(&bucket2, "big_file.txt", 1024 * 1024).await?;
env.upload_object_to_bucket(&bucket2, "another_file.txt", 512 * 1024).await?;
// Verify statistics are independent
let stats1 = env.get_bucket_quota_stats_for(&bucket1).await?;
let stats2 = env.get_bucket_quota_stats_for(&bucket2).await?;
assert_eq!(stats1.get("current_usage").unwrap().as_u64().unwrap(), 1024 * 1024);
assert_eq!(stats2.get("current_usage").unwrap().as_u64().unwrap(), (1024 + 512) * 1024);
// Clean up
env.env.delete_test_bucket(&bucket1).await?;
env.env.delete_test_bucket(&bucket2).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_error_handling() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Test invalid quota type
let url = format!("{}/rustfs/admin/v3/quota/{}", env.env.url, env.bucket_name);
let invalid_config = serde_json::json!({
"quota": 1024,
"quota_type": "SOFT" // Invalid type
});
let response = awscurl_put(&url, &invalid_config.to_string(), &env.env.access_key, &env.env.secret_key).await;
assert!(response.is_err());
let error_msg = response.unwrap_err().to_string();
assert!(error_msg.contains("InvalidArgument"));
// Test operations on non-existent bucket
let url = format!("{}/rustfs/admin/v3/quota/non-existent-bucket", env.env.url);
let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await;
assert!(response.is_err());
let error_msg = response.unwrap_err().to_string();
assert!(error_msg.contains("NoSuchBucket"));
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_http_endpoints() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Test 1: GET quota for bucket without quota config
let url = format!("{}/rustfs/admin/v3/quota/{}", env.env.url, env.bucket_name);
let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await?;
assert!(response.contains("quota") && response.contains("null"));
// Test 2: PUT quota - valid config
let quota_config = serde_json::json!({
"quota": 1048576,
"quota_type": "HARD"
});
let response = awscurl_put(&url, &quota_config.to_string(), &env.env.access_key, &env.env.secret_key).await?;
assert!(response.contains("success") || !response.contains("error"));
// Test 3: GET quota after setting
let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await?;
assert!(response.contains("1048576"));
// Test 4: GET quota stats
let stats_url = format!("{}/rustfs/admin/v3/quota-stats/{}", env.env.url, env.bucket_name);
let response = awscurl_get(&stats_url, &env.env.access_key, &env.env.secret_key).await?;
assert!(response.contains("quota_limit") && response.contains("current_usage"));
// Test 5: POST quota check
let check_url = format!("{}/rustfs/admin/v3/quota-check/{}", env.env.url, env.bucket_name);
let check_request = serde_json::json!({
"operation_type": "PUT",
"operation_size": 1024
});
let response = awscurl_post(&check_url, &check_request.to_string(), &env.env.access_key, &env.env.secret_key).await?;
assert!(response.contains("allowed"));
// Test 6: DELETE quota
let response = awscurl_delete(&url, &env.env.access_key, &env.env.secret_key).await?;
assert!(!response.contains("error"));
// Test 7: GET quota after deletion
let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await?;
assert!(response.contains("quota") && response.contains("null"));
// Test 8: Invalid quota type
let invalid_config = serde_json::json!({
"quota": 1024,
"quota_type": "SOFT"
});
let response = awscurl_put(&url, &invalid_config.to_string(), &env.env.access_key, &env.env.secret_key).await;
assert!(response.is_err());
let error_msg = response.unwrap_err().to_string();
assert!(error_msg.contains("InvalidArgument"));
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_copy_operations() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota of 2MB
env.set_bucket_quota(2 * 1024 * 1024).await?;
// Upload initial file
env.upload_object("original.txt", 1024 * 1024).await?;
// Copy file - should succeed (1MB each, total 2MB)
env.client
.copy_object()
.bucket(&env.bucket_name)
.key("copy1.txt")
.copy_source(format!("{}/{}", env.bucket_name, "original.txt"))
.send()
.await?;
assert!(env.object_exists("copy1.txt").await?);
// Try to copy again - should fail (1.5MB each, total 3MB > 2MB quota)
let copy_result = env
.client
.copy_object()
.bucket(&env.bucket_name)
.key("copy2.txt")
.copy_source(format!("{}/{}", env.bucket_name, "original.txt"))
.send()
.await;
assert!(copy_result.is_err());
assert!(!env.object_exists("copy2.txt").await?);
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_batch_delete() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota of 2MB
env.set_bucket_quota(2 * 1024 * 1024).await?;
// Upload files to fill quota
env.upload_object("file1.txt", 1024 * 1024).await?;
env.upload_object("file2.txt", 1024 * 1024).await?;
// Verify quota is full
let upload_result = env.upload_object("file3.txt", 1024).await;
assert!(upload_result.is_err());
// Delete multiple objects using batch delete
let objects = vec![
aws_sdk_s3::types::ObjectIdentifier::builder()
.key("file1.txt")
.build()
.unwrap(),
aws_sdk_s3::types::ObjectIdentifier::builder()
.key("file2.txt")
.build()
.unwrap(),
];
let delete_result = env
.client
.delete_objects()
.bucket(&env.bucket_name)
.delete(
aws_sdk_s3::types::Delete::builder()
.set_objects(Some(objects))
.quiet(true)
.build()
.unwrap(),
)
.send()
.await?;
assert_eq!(delete_result.deleted().len(), 2);
// Now should be able to upload again (quota freed up)
env.upload_object("file3.txt", 256 * 1024).await?;
assert!(env.object_exists("file3.txt").await?);
env.cleanup_bucket().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_quota_multipart_upload() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let env = QuotaTestEnv::new().await?;
env.create_bucket().await?;
// Set quota of 10MB
env.set_bucket_quota(10 * 1024 * 1024).await?;
let key = "multipart_test.txt";
let part_size = 5 * 1024 * 1024; // 5MB minimum per part (S3 requirement)
// Test 1: Multipart upload within quota (single 5MB part)
let create_result = env
.client
.create_multipart_upload()
.bucket(&env.bucket_name)
.key(key)
.send()
.await?;
let upload_id = create_result.upload_id().unwrap();
// Upload single 5MB part (S3 allows single part with any size ≥ 5MB for the only part)
let part_data = vec![1u8; part_size];
let part_result = env
.client
.upload_part()
.bucket(&env.bucket_name)
.key(key)
.upload_id(upload_id)
.part_number(1)
.body(aws_sdk_s3::primitives::ByteStream::from(part_data))
.send()
.await?;
let uploaded_parts = vec![
aws_sdk_s3::types::CompletedPart::builder()
.part_number(1)
.e_tag(part_result.e_tag().unwrap())
.build(),
];
env.client
.complete_multipart_upload()
.bucket(&env.bucket_name)
.key(key)
.upload_id(upload_id)
.multipart_upload(
aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(uploaded_parts))
.build(),
)
.send()
.await?;
assert!(env.object_exists(key).await?);
// Test 2: Multipart upload exceeds quota (should fail)
// Upload 6MB filler (total now: 5MB + 6MB = 11MB > 10MB quota)
let upload_filler = env.upload_object("filler.txt", 6 * 1024 * 1024).await;
// This should fail due to quota
assert!(upload_filler.is_err());
// Verify filler doesn't exist
assert!(!env.object_exists("filler.txt").await?);
// Now try a multipart upload that exceeds quota
// Current usage: 5MB (from Test 1), quota: 10MB
// Trying to upload 6MB via multipart → should fail
let create_result2 = env
.client
.create_multipart_upload()
.bucket(&env.bucket_name)
.key("over_quota.txt")
.send()
.await?;
let upload_id2 = create_result2.upload_id().unwrap();
let mut uploaded_parts2 = vec![];
for part_num in 1..=2 {
let part_data = vec![part_num as u8; part_size];
let part_result = env
.client
.upload_part()
.bucket(&env.bucket_name)
.key("over_quota.txt")
.upload_id(upload_id2)
.part_number(part_num)
.body(aws_sdk_s3::primitives::ByteStream::from(part_data))
.send()
.await?;
uploaded_parts2.push(
aws_sdk_s3::types::CompletedPart::builder()
.part_number(part_num)
.e_tag(part_result.e_tag().unwrap())
.build(),
);
}
let complete_result = env
.client
.complete_multipart_upload()
.bucket(&env.bucket_name)
.key("over_quota.txt")
.upload_id(upload_id2)
.multipart_upload(
aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(uploaded_parts2))
.build(),
)
.send()
.await;
assert!(complete_result.is_err());
assert!(!env.object_exists("over_quota.txt").await?);
env.cleanup_bucket().await?;
Ok(())
}
}

View File

@@ -48,6 +48,7 @@ async-trait.workspace = true
bytes.workspace = true
byteorder = { workspace = true }
chrono.workspace = true
dunce.workspace = true
glob = { workspace = true }
thiserror.workspace = true
flatbuffers.workspace = true
@@ -109,7 +110,6 @@ google-cloud-auth = { workspace = true }
aws-config = { workspace = true }
faster-hex = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
criterion = { workspace = true, features = ["html_reports"] }

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::disk::error::DiskError;
use crate::disk::{self, DiskAPI as _, DiskStore};
use crate::disk::{self, DiskAPI as _, DiskStore, error::DiskError};
use crate::erasure_coding::{BitrotReader, BitrotWriterWrapper, CustomWriter};
use rustfs_utils::HashAlgorithm;
use std::io::Cursor;

View File

@@ -13,6 +13,14 @@
// limitations under the License.
use crate::bucket::metadata::BucketMetadata;
use crate::bucket::metadata_sys::get_bucket_targets_config;
use crate::bucket::metadata_sys::get_replication_config;
use crate::bucket::replication::ObjectOpts;
use crate::bucket::replication::ReplicationConfigurationExt;
use crate::bucket::target::ARN;
use crate::bucket::target::BucketTargetType;
use crate::bucket::target::{self, BucketTarget, BucketTargets, Credentials};
use crate::bucket::versioning_sys::BucketVersioningSys;
use aws_credential_types::Credentials as SdkCredentials;
use aws_sdk_s3::config::Region as SdkRegion;
use aws_sdk_s3::error::SdkError;
@@ -52,15 +60,6 @@ use tracing::warn;
use url::Url;
use uuid::Uuid;
use crate::bucket::metadata_sys::get_bucket_targets_config;
use crate::bucket::metadata_sys::get_replication_config;
use crate::bucket::replication::ObjectOpts;
use crate::bucket::replication::ReplicationConfigurationExt;
use crate::bucket::target::ARN;
use crate::bucket::target::BucketTargetType;
use crate::bucket::target::{self, BucketTarget, BucketTargets, Credentials};
use crate::bucket::versioning_sys::BucketVersioningSys;
const DEFAULT_HEALTH_CHECK_DURATION: Duration = Duration::from_secs(5);
const DEFAULT_HEALTH_CHECK_RELOAD_DURATION: Duration = Duration::from_secs(30 * 60);

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::lifecycle;
use crate::bucket::lifecycle::lifecycle;
#[derive(Debug, Clone, Default)]
pub enum LcEventSrc {

View File

@@ -18,6 +18,7 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use crate::bucket::lifecycle::rule::TransitionOps;
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition,
@@ -30,8 +31,6 @@ use time::macros::{datetime, offset};
use time::{self, Duration, OffsetDateTime};
use tracing::info;
use crate::bucket::lifecycle::rule::TransitionOps;
pub const TRANSITION_COMPLETE: &str = "complete";
pub const TRANSITION_PENDING: &str = "pending";

View File

@@ -18,15 +18,13 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use rustfs_common::data_usage::TierStats;
use sha2::Sha256;
use std::collections::HashMap;
use std::ops::Sub;
use time::OffsetDateTime;
use tracing::{error, warn};
use rustfs_common::data_usage::TierStats;
pub type DailyAllTierStats = HashMap<String, LastDayTierStats>;
#[derive(Clone)]

View File

@@ -18,15 +18,14 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use crate::bucket::lifecycle::bucket_lifecycle_ops::{ExpiryOp, GLOBAL_ExpiryState, TransitionedObject};
use crate::bucket::lifecycle::lifecycle::{self, ObjectOpts};
use crate::global::GLOBAL_TierConfigMgr;
use sha2::{Digest, Sha256};
use std::any::Any;
use std::io::Write;
use xxhash_rust::xxh64;
use super::bucket_lifecycle_ops::{ExpiryOp, GLOBAL_ExpiryState, TransitionedObject};
use super::lifecycle::{self, ObjectOpts};
use crate::global::GLOBAL_TierConfigMgr;
static XXHASH_SEED: u64 = 0;
#[derive(Default)]

View File

@@ -12,20 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{quota::BucketQuota, target::BucketTargets};
use super::object_lock::ObjectLockApi;
use super::versioning::VersioningApi;
use super::{quota::BucketQuota, target::BucketTargets};
use crate::bucket::utils::deserialize;
use crate::config::com::{read_config, save_config};
use crate::disk::BUCKET_META_PREFIX;
use crate::error::{Error, Result};
use crate::new_object_layer_fn;
use crate::store::ECStore;
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use rmp_serde::Serializer as rmpSerializer;
use rustfs_policy::policy::BucketPolicy;
use s3s::dto::{
BucketLifecycleConfiguration, NotificationConfiguration, ObjectLockConfiguration, ReplicationConfiguration,
ServerSideEncryptionConfiguration, Tagging, VersioningConfiguration,
BucketLifecycleConfiguration, CORSConfiguration, NotificationConfiguration, ObjectLockConfiguration,
ReplicationConfiguration, ServerSideEncryptionConfiguration, Tagging, VersioningConfiguration,
};
use serde::Serializer;
use serde::{Deserialize, Serialize};
@@ -34,9 +35,6 @@ use std::sync::Arc;
use time::OffsetDateTime;
use tracing::error;
use crate::disk::BUCKET_META_PREFIX;
use crate::store::ECStore;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;
pub const BUCKET_METADATA_VERSION: u16 = 1;
@@ -51,6 +49,7 @@ pub const OBJECT_LOCK_CONFIG: &str = "object-lock.xml";
pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml";
pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml";
pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json";
pub const BUCKET_CORS_CONFIG: &str = "cors.xml";
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "PascalCase", default)]
@@ -69,6 +68,7 @@ pub struct BucketMetadata {
pub replication_config_xml: Vec<u8>,
pub bucket_targets_config_json: Vec<u8>,
pub bucket_targets_config_meta_json: Vec<u8>,
pub cors_config_xml: Vec<u8>,
pub policy_config_updated_at: OffsetDateTime,
pub object_lock_config_updated_at: OffsetDateTime,
@@ -81,6 +81,7 @@ pub struct BucketMetadata {
pub notification_config_updated_at: OffsetDateTime,
pub bucket_targets_config_updated_at: OffsetDateTime,
pub bucket_targets_config_meta_updated_at: OffsetDateTime,
pub cors_config_updated_at: OffsetDateTime,
#[serde(skip)]
pub new_field_updated_at: OffsetDateTime,
@@ -107,6 +108,8 @@ pub struct BucketMetadata {
pub bucket_target_config: Option<BucketTargets>,
#[serde(skip)]
pub bucket_target_config_meta: Option<HashMap<String, String>>,
#[serde(skip)]
pub cors_config: Option<CORSConfiguration>,
}
impl Default for BucketMetadata {
@@ -126,6 +129,7 @@ impl Default for BucketMetadata {
replication_config_xml: Default::default(),
bucket_targets_config_json: Default::default(),
bucket_targets_config_meta_json: Default::default(),
cors_config_xml: Default::default(),
policy_config_updated_at: OffsetDateTime::UNIX_EPOCH,
object_lock_config_updated_at: OffsetDateTime::UNIX_EPOCH,
encryption_config_updated_at: OffsetDateTime::UNIX_EPOCH,
@@ -137,6 +141,7 @@ impl Default for BucketMetadata {
notification_config_updated_at: OffsetDateTime::UNIX_EPOCH,
bucket_targets_config_updated_at: OffsetDateTime::UNIX_EPOCH,
bucket_targets_config_meta_updated_at: OffsetDateTime::UNIX_EPOCH,
cors_config_updated_at: OffsetDateTime::UNIX_EPOCH,
new_field_updated_at: OffsetDateTime::UNIX_EPOCH,
policy_config: Default::default(),
notification_config: Default::default(),
@@ -149,6 +154,7 @@ impl Default for BucketMetadata {
replication_config: Default::default(),
bucket_target_config: Default::default(),
bucket_target_config_meta: Default::default(),
cors_config: Default::default(),
}
}
}
@@ -297,6 +303,10 @@ impl BucketMetadata {
self.bucket_targets_config_json = data.clone();
self.bucket_targets_config_updated_at = updated;
}
BUCKET_CORS_CONFIG => {
self.cors_config_xml = data;
self.cors_config_updated_at = updated;
}
_ => return Err(Error::other(format!("config file not found : {config_file}"))),
}
@@ -355,7 +365,7 @@ impl BucketMetadata {
self.tagging_config = Some(deserialize::<Tagging>(&self.tagging_config_xml)?);
}
if !self.quota_config_json.is_empty() {
self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?);
self.quota_config = Some(serde_json::from_slice(&self.quota_config_json)?);
}
if !self.replication_config_xml.is_empty() {
self.replication_config = Some(deserialize::<ReplicationConfiguration>(&self.replication_config_xml)?);
@@ -367,6 +377,9 @@ impl BucketMetadata {
} else {
self.bucket_target_config = Some(BucketTargets::default())
}
if !self.cors_config_xml.is_empty() {
self.cors_config = Some(deserialize::<CORSConfiguration>(&self.cors_config_xml)?);
}
Ok(())
}
@@ -487,7 +500,8 @@ mod test {
bm.tagging_config_updated_at = OffsetDateTime::now_utc();
// Add quota configuration
let quota_json = r#"{"quota":1073741824,"quotaType":"hard"}"#; // 1GB quota
let quota_json =
r#"{"quota":1073741824,"quota_type":"Hard","created_at":"2024-01-01T00:00:00Z","updated_at":"2024-01-01T00:00:00Z"}"#; // 1GB quota
bm.quota_config_json = quota_json.as_bytes().to_vec();
bm.quota_config_updated_at = OffsetDateTime::now_utc();

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::metadata::{BucketMetadata, load_bucket_metadata};
use super::quota::BucketQuota;
use super::target::BucketTargets;
use crate::StorageAPI as _;
use crate::bucket::bucket_target_sys::BucketTargetSys;
use crate::bucket::metadata::{BUCKET_LIFECYCLE_CONFIG, load_bucket_metadata_parse};
@@ -20,12 +23,13 @@ use crate::error::{Error, Result, is_err_bucket_not_found};
use crate::global::{GLOBAL_Endpoints, is_dist_erasure, is_erasure, new_object_layer_fn};
use crate::store::ECStore;
use futures::future::join_all;
use lazy_static::lazy_static;
use rustfs_common::heal_channel::HealOpts;
use rustfs_policy::policy::BucketPolicy;
use s3s::dto::ReplicationConfiguration;
use s3s::dto::{
BucketLifecycleConfiguration, NotificationConfiguration, ObjectLockConfiguration, ServerSideEncryptionConfiguration, Tagging,
VersioningConfiguration,
BucketLifecycleConfiguration, CORSConfiguration, NotificationConfiguration, ObjectLockConfiguration,
ServerSideEncryptionConfiguration, Tagging, VersioningConfiguration,
};
use std::collections::HashSet;
use std::sync::OnceLock;
@@ -36,12 +40,6 @@ use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::error;
use super::metadata::{BucketMetadata, load_bucket_metadata};
use super::quota::BucketQuota;
use super::target::BucketTargets;
use lazy_static::lazy_static;
lazy_static! {
pub static ref GLOBAL_BucketMetadataSys: OnceLock<Arc<RwLock<BucketMetadataSys>>> = OnceLock::new();
}
@@ -112,6 +110,13 @@ pub async fn get_bucket_targets_config(bucket: &str) -> Result<BucketTargets> {
bucket_meta_sys.get_bucket_targets_config(bucket).await
}
pub async fn get_cors_config(bucket: &str) -> Result<(CORSConfiguration, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_cors_config(bucket).await
}
pub async fn get_tagging_config(bucket: &str) -> Result<(Tagging, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
@@ -502,6 +507,16 @@ impl BucketMetadataSys {
}
}
pub async fn get_cors_config(&self, bucket: &str) -> Result<(CORSConfiguration, OffsetDateTime)> {
let (bm, _) = self.get_config(bucket).await?;
if let Some(config) = &bm.cors_config {
Ok((config.clone(), bm.cors_config_updated_at))
} else {
Err(Error::ConfigNotFound)
}
}
pub async fn created_at(&self, bucket: &str) -> Result<OffsetDateTime> {
let bm = match self.get_config(bucket).await {
Ok((bm, _)) => bm.created,

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use time::{OffsetDateTime, format_description};
use s3s::dto::{Date, ObjectLockLegalHold, ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode};
use s3s::header::{X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE};
use std::collections::HashMap;
use time::{OffsetDateTime, format_description};
const _ERR_MALFORMED_BUCKET_OBJECT_CONFIG: &str = "invalid bucket object lock config";
const _ERR_INVALID_RETENTION_DATE: &str = "date must be provided in ISO 8601 format";

View File

@@ -12,16 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::bucket::metadata_sys::get_object_lock_config;
use crate::bucket::object_lock::objectlock;
use crate::store_api::ObjectInfo;
use s3s::dto::{DefaultRetention, ObjectLockLegalHoldStatus, ObjectLockRetentionMode};
use std::sync::Arc;
use time::OffsetDateTime;
use s3s::dto::{DefaultRetention, ObjectLockLegalHoldStatus, ObjectLockRetentionMode};
use crate::bucket::metadata_sys::get_object_lock_config;
use crate::store_api::ObjectInfo;
use super::objectlock;
pub struct BucketObjectLockSys {}
impl BucketObjectLockSys {

View File

@@ -0,0 +1,195 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{BucketQuota, QuotaCheckResult, QuotaError, QuotaOperation};
use crate::bucket::metadata_sys::{BucketMetadataSys, update};
use crate::data_usage::get_bucket_usage_memory;
use rustfs_common::metrics::Metric;
use rustfs_config::QUOTA_CONFIG_FILE;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tracing::{debug, warn};
pub struct QuotaChecker {
metadata_sys: Arc<RwLock<BucketMetadataSys>>,
}
impl QuotaChecker {
pub fn new(metadata_sys: Arc<RwLock<BucketMetadataSys>>) -> Self {
Self { metadata_sys }
}
pub async fn check_quota(
&self,
bucket: &str,
operation: QuotaOperation,
operation_size: u64,
) -> Result<QuotaCheckResult, QuotaError> {
let start_time = Instant::now();
let quota_config = self.get_quota_config(bucket).await?;
// If no quota limit is set, allow operation
let quota_limit = match quota_config.quota {
None => {
let current_usage = self.get_real_time_usage(bucket).await?;
return Ok(QuotaCheckResult {
allowed: true,
current_usage,
quota_limit: None,
operation_size,
remaining: None,
});
}
Some(q) => q,
};
let current_usage = self.get_real_time_usage(bucket).await?;
let expected_usage = match operation {
QuotaOperation::PutObject | QuotaOperation::CopyObject => current_usage + operation_size,
QuotaOperation::DeleteObject => current_usage.saturating_sub(operation_size),
};
let allowed = match operation {
QuotaOperation::PutObject | QuotaOperation::CopyObject => {
quota_config.check_operation_allowed(current_usage, operation_size)
}
QuotaOperation::DeleteObject => true,
};
let remaining = if quota_limit >= expected_usage {
Some(quota_limit - expected_usage)
} else {
Some(0)
};
if !allowed {
warn!(
"Quota exceeded for bucket: {}, current: {}, limit: {}, attempted: {}",
bucket, current_usage, quota_limit, operation_size
);
}
let result = QuotaCheckResult {
allowed,
current_usage,
quota_limit: Some(quota_limit),
operation_size,
remaining,
};
let duration = start_time.elapsed();
rustfs_common::metrics::Metrics::inc_time(Metric::QuotaCheck, duration).await;
if !allowed {
rustfs_common::metrics::Metrics::inc_time(Metric::QuotaViolation, duration).await;
}
Ok(result)
}
pub async fn get_quota_config(&self, bucket: &str) -> Result<BucketQuota, QuotaError> {
let meta = self
.metadata_sys
.read()
.await
.get(bucket)
.await
.map_err(QuotaError::StorageError)?;
if meta.quota_config_json.is_empty() {
debug!("No quota config found for bucket: {}, using default", bucket);
return Ok(BucketQuota::new(None));
}
let quota: BucketQuota = serde_json::from_slice(&meta.quota_config_json).map_err(|e| QuotaError::InvalidConfig {
reason: format!("Failed to parse quota config: {}", e),
})?;
Ok(quota)
}
pub async fn set_quota_config(&mut self, bucket: &str, quota: BucketQuota) -> Result<(), QuotaError> {
let json_data = serde_json::to_vec(&quota).map_err(|e| QuotaError::InvalidConfig {
reason: format!("Failed to serialize quota config: {}", e),
})?;
let start_time = Instant::now();
update(bucket, QUOTA_CONFIG_FILE, json_data)
.await
.map_err(QuotaError::StorageError)?;
rustfs_common::metrics::Metrics::inc_time(Metric::QuotaSync, start_time.elapsed()).await;
Ok(())
}
pub async fn get_quota_stats(&self, bucket: &str) -> Result<(BucketQuota, Option<u64>), QuotaError> {
// If bucket doesn't exist, return ConfigNotFound error
if !self.bucket_exists(bucket).await {
return Err(QuotaError::ConfigNotFound {
bucket: bucket.to_string(),
});
}
let quota = self.get_quota_config(bucket).await?;
let current_usage = self.get_real_time_usage(bucket).await.unwrap_or(0);
Ok((quota, Some(current_usage)))
}
pub async fn bucket_exists(&self, bucket: &str) -> bool {
self.metadata_sys.read().await.get(bucket).await.is_ok()
}
pub async fn get_real_time_usage(&self, bucket: &str) -> Result<u64, QuotaError> {
Ok(get_bucket_usage_memory(bucket).await.unwrap_or(0))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_quota_check_no_limit() {
let result = QuotaCheckResult {
allowed: true,
current_usage: 0,
quota_limit: None,
operation_size: 1024,
remaining: None,
};
assert!(result.allowed);
assert_eq!(result.quota_limit, None);
}
#[tokio::test]
async fn test_quota_check_within_limit() {
let quota = BucketQuota::new(Some(2048)); // 2KB
// Current usage 512, trying to add 1024
let allowed = quota.check_operation_allowed(512, 1024);
assert!(allowed);
}
#[tokio::test]
async fn test_quota_check_exceeds_limit() {
let quota = BucketQuota::new(Some(1024)); // 1KB
// Current usage 512, trying to add 1024
let allowed = quota.check_operation_allowed(512, 1024);
assert!(!allowed);
}
}

View File

@@ -12,36 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod checker;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use rustfs_config::{
QUOTA_API_PATH, QUOTA_EXCEEDED_ERROR_CODE, QUOTA_INTERNAL_ERROR_CODE, QUOTA_INVALID_CONFIG_ERROR_CODE,
QUOTA_NOT_FOUND_ERROR_CODE,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use time::OffsetDateTime;
// Define the QuotaType enum
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum QuotaType {
/// Hard quota: reject immediately when exceeded
#[default]
Hard,
}
// Define the BucketQuota structure
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq)]
pub struct BucketQuota {
quota: Option<u64>, // Use Option to represent optional fields
size: u64,
rate: u64,
requests: u64,
quota_type: Option<QuotaType>,
pub quota: Option<u64>,
pub quota_type: QuotaType,
/// Timestamp when this quota configuration was set (for audit purposes)
pub created_at: Option<OffsetDateTime>,
}
impl BucketQuota {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
@@ -49,4 +50,107 @@ impl BucketQuota {
let t: BucketQuota = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn new(quota: Option<u64>) -> Self {
let now = OffsetDateTime::now_utc();
Self {
quota,
quota_type: QuotaType::Hard,
created_at: Some(now),
}
}
pub fn get_quota_limit(&self) -> Option<u64> {
self.quota
}
pub fn check_operation_allowed(&self, current_usage: u64, operation_size: u64) -> bool {
if let Some(quota_limit) = self.quota {
current_usage.saturating_add(operation_size) <= quota_limit
} else {
true // No quota limit
}
}
pub fn get_remaining_quota(&self, current_usage: u64) -> Option<u64> {
self.quota.map(|limit| limit.saturating_sub(current_usage))
}
}
#[derive(Debug)]
pub struct QuotaCheckResult {
pub allowed: bool,
pub current_usage: u64,
/// quota_limit: None means unlimited
pub quota_limit: Option<u64>,
pub operation_size: u64,
pub remaining: Option<u64>,
}
#[derive(Debug)]
pub enum QuotaOperation {
PutObject,
CopyObject,
DeleteObject,
}
#[derive(Debug, Error)]
pub enum QuotaError {
#[error("Bucket quota exceeded: current={current}, limit={limit}, operation={operation}")]
QuotaExceeded { current: u64, limit: u64, operation: u64 },
#[error("Quota configuration not found for bucket: {bucket}")]
ConfigNotFound { bucket: String },
#[error("Invalid quota configuration: {reason}")]
InvalidConfig { reason: String },
#[error("Storage error: {0}")]
StorageError(#[from] crate::error::StorageError),
}
#[derive(Debug, Serialize)]
pub struct QuotaErrorResponse {
#[serde(rename = "Code")]
pub code: String,
#[serde(rename = "Message")]
pub message: String,
#[serde(rename = "Resource")]
pub resource: String,
#[serde(rename = "RequestId")]
pub request_id: String,
#[serde(rename = "HostId")]
pub host_id: String,
}
impl QuotaErrorResponse {
pub fn new(quota_error: &QuotaError, request_id: &str, host_id: &str) -> Self {
match quota_error {
QuotaError::QuotaExceeded { .. } => Self {
code: QUOTA_EXCEEDED_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
QuotaError::ConfigNotFound { .. } => Self {
code: QUOTA_NOT_FOUND_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
QuotaError::InvalidConfig { .. } => Self {
code: QUOTA_INVALID_CONFIG_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
QuotaError::StorageError(_) => Self {
code: QUOTA_INTERNAL_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
}
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::ReplicationRuleExt as _;
use crate::bucket::replication::ReplicationRuleExt as _;
use crate::bucket::tagging::decode_tags_to_map;
use rustfs_filemeta::ReplicationType;
use s3s::dto::DeleteMarkerReplicationStatus;

View File

@@ -1,22 +1,30 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::StorageAPI;
use crate::bucket::replication::ResyncOpts;
use crate::bucket::replication::ResyncStatusType;
use crate::bucket::replication::replicate_delete;
use crate::bucket::replication::replicate_object;
use crate::disk::BUCKET_META_PREFIX;
use std::any::Any;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use crate::bucket::replication::replication_resyncer::{
BucketReplicationResyncStatus, DeletedObjectReplicationInfo, ReplicationResyncer,
};
use crate::bucket::replication::replication_state::ReplicationStats;
use crate::config::com::read_config;
use crate::disk::BUCKET_META_PREFIX;
use crate::error::Error as EcstoreError;
use crate::store_api::ObjectInfo;
use lazy_static::lazy_static;
use rustfs_filemeta::MrfReplicateEntry;
use rustfs_filemeta::ReplicateDecision;
@@ -29,6 +37,10 @@ use rustfs_filemeta::ResyncDecision;
use rustfs_filemeta::replication_statuses_map;
use rustfs_filemeta::version_purge_statuses_map;
use rustfs_utils::http::RESERVED_METADATA_PREFIX_LOWER;
use std::any::Any;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::sync::Mutex;

View File

@@ -1,3 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::bucket::bucket_target_sys::{
AdvancedPutOptions, BucketTargetSys, PutObjectOptions, PutObjectPartOptions, RemoveObjectOptions, TargetClient,
};
@@ -16,7 +30,6 @@ use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::store_api::{DeletedObject, ObjectInfo, ObjectOptions, ObjectToDelete, WalkOptions};
use crate::{StorageAPI, new_object_layer_fn};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::head_object::HeadObjectOutput;
use aws_sdk_s3::primitives::ByteStream;
@@ -24,7 +37,6 @@ use aws_sdk_s3::types::{CompletedPart, ObjectLockLegalHoldStatus};
use byteorder::ByteOrder;
use futures::future::join_all;
use http::HeaderMap;
use regex::Regex;
use rustfs_filemeta::{
MrfReplicateEntry, REPLICATE_EXISTING, REPLICATE_EXISTING_DELETE, REPLICATION_RESET, ReplicateDecision, ReplicateObjectInfo,

View File

@@ -1,3 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Error;
use rustfs_filemeta::{ReplicatedTargetInfo, ReplicationStatusType, ReplicationType};
use serde::{Deserialize, Serialize};

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::bucket::replication::ObjectOpts;
use s3s::dto::ReplicaModificationsStatus;
use s3s::dto::ReplicationRule;
use super::ObjectOpts;
pub trait ReplicationRuleExt {
fn prefix(&self) -> &str;
fn metadata_replicate(&self, obj: &ObjectOpts) -> bool;

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use s3s::dto::Tag;
use std::collections::HashMap;
use url::form_urlencoded;
pub fn decode_tags(tags: &str) -> Vec<Tag> {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::BucketTargetType;
use crate::bucket::target::BucketTargetType;
use std::fmt::Display;
use std::str::FromStr;

View File

@@ -14,16 +14,15 @@
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result, StorageError};
use rustfs_utils::path::SLASH_SEPARATOR;
use regex::Regex;
use rustfs_utils::path::SLASH_SEPARATOR_STR;
use s3s::xml;
use tracing::instrument;
pub fn is_meta_bucketname(name: &str) -> bool {
name.starts_with(RUSTFS_META_BUCKET)
}
use regex::Regex;
use tracing::instrument;
lazy_static::lazy_static! {
static ref VALID_BUCKET_NAME: Regex = Regex::new(r"^[A-Za-z0-9][A-Za-z0-9\.\-\_\:]{1,61}[A-Za-z0-9]$").unwrap();
static ref VALID_BUCKET_NAME_STRICT: Regex = Regex::new(r"^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$").unwrap();
@@ -194,7 +193,7 @@ pub fn is_valid_object_name(object: &str) -> bool {
return false;
}
if object.ends_with(SLASH_SEPARATOR) {
if object.ends_with(SLASH_SEPARATOR_STR) {
return false;
}
@@ -206,7 +205,7 @@ pub fn check_object_name_for_length_and_slash(bucket: &str, object: &str) -> Res
return Err(StorageError::ObjectNameTooLong(bucket.to_owned(), object.to_owned()));
}
if object.starts_with(SLASH_SEPARATOR) {
if object.starts_with(SLASH_SEPARATOR_STR) {
return Err(StorageError::ObjectNamePrefixAsSlash(bucket.to_owned(), object.to_owned()));
}

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use s3s::dto::{BucketVersioningStatus, VersioningConfiguration};
use rustfs_utils::string::match_simple;
use s3s::dto::{BucketVersioningStatus, VersioningConfiguration};
pub trait VersioningApi {
fn enabled(&self) -> bool;

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
pub mod metacache_set;

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_utils::string::has_pattern;
use rustfs_utils::string::has_string_suffix_in_slice;
use rustfs_utils::string::{has_pattern, has_string_suffix_in_slice};
use std::env;
use tracing::error;

View File

@@ -18,7 +18,7 @@ use crate::error::{Error, Result};
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};
use http::HeaderMap;
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_utils::path::SLASH_SEPARATOR;
use rustfs_utils::path::SLASH_SEPARATOR_STR;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::LazyLock;
@@ -29,7 +29,7 @@ const CONFIG_FILE: &str = "config.json";
pub const STORAGE_CLASS_SUB_SYS: &str = "storage_class";
static CONFIG_BUCKET: LazyLock<String> = LazyLock::new(|| format!("{RUSTFS_META_BUCKET}{SLASH_SEPARATOR}{CONFIG_PREFIX}"));
static CONFIG_BUCKET: LazyLock<String> = LazyLock::new(|| format!("{RUSTFS_META_BUCKET}{SLASH_SEPARATOR_STR}{CONFIG_PREFIX}"));
static SUB_SYSTEMS_DYNAMIC: LazyLock<HashSet<String>> = LazyLock::new(|| {
let mut h = HashSet::new();
@@ -129,7 +129,7 @@ async fn new_and_save_server_config<S: StorageAPI>(api: Arc<S>) -> Result<Config
}
fn get_config_file() -> String {
format!("{CONFIG_PREFIX}{SLASH_SEPARATOR}{CONFIG_FILE}")
format!("{CONFIG_PREFIX}{SLASH_SEPARATOR_STR}{CONFIG_FILE}")
}
/// Handle the situation where the configuration file does not exist, create and save a new configuration

View File

@@ -12,52 +12,66 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::{HashMap, hash_map::Entry},
sync::Arc,
time::SystemTime,
};
pub mod local_snapshot;
use crate::{
bucket::metadata_sys::get_replication_config, config::com::read_config, disk::DiskAPI, error::Error, store::ECStore,
store_api::StorageAPI,
};
pub use local_snapshot::{
DATA_USAGE_DIR, DATA_USAGE_STATE_DIR, LOCAL_USAGE_SNAPSHOT_VERSION, LocalUsageSnapshot, LocalUsageSnapshotMeta,
data_usage_dir, data_usage_state_dir, ensure_data_usage_layout, read_snapshot as read_local_snapshot, snapshot_file_name,
snapshot_object_path, snapshot_path, write_snapshot as write_local_snapshot,
};
use crate::{
bucket::metadata_sys::get_replication_config, config::com::read_config, disk::DiskAPI, store::ECStore, store_api::StorageAPI,
};
use rustfs_common::data_usage::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary,
};
use rustfs_utils::path::SLASH_SEPARATOR;
use rustfs_utils::path::SLASH_SEPARATOR_STR;
use std::{
collections::{HashMap, hash_map::Entry},
sync::{Arc, OnceLock},
time::{Duration, SystemTime},
};
use tokio::fs;
use tracing::{error, info, warn};
use crate::error::Error;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
// Data usage storage constants
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR;
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR_STR;
const DATA_USAGE_OBJ_NAME: &str = ".usage.json";
const DATA_USAGE_BLOOM_NAME: &str = ".bloomcycle.bin";
pub const DATA_USAGE_CACHE_NAME: &str = ".usage-cache.bin";
const DATA_USAGE_CACHE_TTL_SECS: u64 = 30;
type UsageMemoryCache = Arc<RwLock<HashMap<String, (u64, SystemTime)>>>;
type CacheUpdating = Arc<RwLock<bool>>;
static USAGE_MEMORY_CACHE: OnceLock<UsageMemoryCache> = OnceLock::new();
static USAGE_CACHE_UPDATING: OnceLock<CacheUpdating> = OnceLock::new();
fn memory_cache() -> &'static UsageMemoryCache {
USAGE_MEMORY_CACHE.get_or_init(|| Arc::new(RwLock::new(HashMap::new())))
}
fn cache_updating() -> &'static CacheUpdating {
USAGE_CACHE_UPDATING.get_or_init(|| Arc::new(RwLock::new(false)))
}
// Data usage storage paths
lazy_static::lazy_static! {
pub static ref DATA_USAGE_BUCKET: String = format!("{}{}{}",
crate::disk::RUSTFS_META_BUCKET,
SLASH_SEPARATOR,
SLASH_SEPARATOR_STR,
crate::disk::BUCKET_META_PREFIX
);
pub static ref DATA_USAGE_OBJ_NAME_PATH: String = format!("{}{}{}",
crate::disk::BUCKET_META_PREFIX,
SLASH_SEPARATOR,
SLASH_SEPARATOR_STR,
DATA_USAGE_OBJ_NAME
);
pub static ref DATA_USAGE_BLOOM_NAME_PATH: String = format!("{}{}{}",
crate::disk::BUCKET_META_PREFIX,
SLASH_SEPARATOR,
SLASH_SEPARATOR_STR,
DATA_USAGE_BLOOM_NAME
);
}
@@ -94,8 +108,8 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
Ok(data) => data,
Err(e) => {
error!("Failed to read data usage info from backend: {}", e);
if e == crate::error::Error::ConfigNotFound {
warn!("Data usage config not found, building basic statistics");
if e == Error::ConfigNotFound {
info!("Data usage config not found, building basic statistics");
return build_basic_data_usage_info(store).await;
}
return Err(Error::other(e));
@@ -128,7 +142,7 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
.map(|(bucket, &size)| {
(
bucket.clone(),
rustfs_common::data_usage::BucketUsageInfo {
BucketUsageInfo {
size,
..Default::default()
},
@@ -245,7 +259,7 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
// If a snapshot is corrupted or unreadable, skip it but keep processing others
if let Err(err) = &snapshot_result {
warn!(
info!(
"Failed to read data usage snapshot for disk {} (pool {}, set {}, disk {}): {}",
disk_id, pool_idx, set_disks.set_index, disk_index, err
);
@@ -254,7 +268,7 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
if let Err(remove_err) = fs::remove_file(&snapshot_file).await
&& remove_err.kind() != std::io::ErrorKind::NotFound
{
warn!("Failed to remove corrupted snapshot {:?}: {}", snapshot_file, remove_err);
info!("Failed to remove corrupted snapshot {:?}: {}", snapshot_file, remove_err);
}
}
@@ -341,7 +355,7 @@ pub async fn compute_bucket_usage(store: Arc<ECStore>, bucket_name: &str) -> Res
continuation = result.next_continuation_token.clone();
if continuation.is_none() {
warn!(
info!(
"Bucket {} listing marked truncated but no continuation token returned; stopping early",
bucket_name
);
@@ -364,8 +378,120 @@ pub async fn compute_bucket_usage(store: Arc<ECStore>, bucket_name: &str) -> Res
Ok(usage)
}
/// Fast in-memory increment for immediate quota consistency
pub async fn increment_bucket_usage_memory(bucket: &str, size_increment: u64) {
let mut cache = memory_cache().write().await;
let current = cache.entry(bucket.to_string()).or_insert_with(|| (0, SystemTime::now()));
current.0 += size_increment;
current.1 = SystemTime::now();
}
/// Fast in-memory decrement for immediate quota consistency
pub async fn decrement_bucket_usage_memory(bucket: &str, size_decrement: u64) {
let mut cache = memory_cache().write().await;
if let Some(current) = cache.get_mut(bucket) {
current.0 = current.0.saturating_sub(size_decrement);
current.1 = SystemTime::now();
}
}
/// Get bucket usage from in-memory cache
pub async fn get_bucket_usage_memory(bucket: &str) -> Option<u64> {
update_usage_cache_if_needed().await;
let cache = memory_cache().read().await;
cache.get(bucket).map(|(usage, _)| *usage)
}
async fn update_usage_cache_if_needed() {
let ttl = Duration::from_secs(DATA_USAGE_CACHE_TTL_SECS);
let double_ttl = ttl * 2;
let now = SystemTime::now();
let cache = memory_cache().read().await;
let earliest_timestamp = cache.values().map(|(_, ts)| *ts).min();
drop(cache);
let age = match earliest_timestamp {
Some(ts) => now.duration_since(ts).unwrap_or_default(),
None => double_ttl,
};
if age < ttl {
return;
}
let mut updating = cache_updating().write().await;
if age < double_ttl {
if *updating {
return;
}
*updating = true;
drop(updating);
let cache_clone = (*memory_cache()).clone();
let updating_clone = (*cache_updating()).clone();
tokio::spawn(async move {
if let Some(store) = crate::global::GLOBAL_OBJECT_API.get()
&& let Ok(data_usage_info) = load_data_usage_from_backend(store.clone()).await
{
let mut cache = cache_clone.write().await;
for (bucket_name, bucket_usage) in data_usage_info.buckets_usage.iter() {
cache.insert(bucket_name.clone(), (bucket_usage.size, SystemTime::now()));
}
}
let mut updating = updating_clone.write().await;
*updating = false;
});
return;
}
for retry in 0..10 {
if !*updating {
break;
}
drop(updating);
let delay = Duration::from_millis(1 << retry);
tokio::time::sleep(delay).await;
updating = cache_updating().write().await;
}
*updating = true;
drop(updating);
if let Some(store) = crate::global::GLOBAL_OBJECT_API.get()
&& let Ok(data_usage_info) = load_data_usage_from_backend(store.clone()).await
{
let mut cache = memory_cache().write().await;
for (bucket_name, bucket_usage) in data_usage_info.buckets_usage.iter() {
cache.insert(bucket_name.clone(), (bucket_usage.size, SystemTime::now()));
}
}
let mut updating = cache_updating().write().await;
*updating = false;
}
/// Sync memory cache with backend data (called by scanner)
pub async fn sync_memory_cache_with_backend() -> Result<(), Error> {
if let Some(store) = crate::global::GLOBAL_OBJECT_API.get() {
match load_data_usage_from_backend(store.clone()).await {
Ok(data_usage_info) => {
let mut cache = memory_cache().write().await;
for (bucket, bucket_usage) in data_usage_info.buckets_usage.iter() {
cache.insert(bucket.clone(), (bucket_usage.size, SystemTime::now()));
}
}
Err(e) => {
debug!("Failed to sync memory cache with backend: {}", e);
}
}
}
Ok(())
}
/// Build basic data usage info with real object counts
async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
pub async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
let mut data_usage_info = DataUsageInfo::default();
// Get bucket list
@@ -437,7 +563,7 @@ pub fn cache_to_data_usage_info(cache: &DataUsageCache, path: &str, buckets: &[c
None => continue,
};
let flat = cache.flatten(&e);
let mut bui = rustfs_common::data_usage::BucketUsageInfo {
let mut bui = BucketUsageInfo {
size: flat.size as u64,
versions_count: flat.versions as u64,
objects_count: flat.objects as u64,
@@ -515,7 +641,7 @@ pub async fn load_data_usage_cache(store: &crate::set_disk::SetDisks, name: &str
break;
}
Err(err) => match err {
crate::error::Error::FileNotFound | crate::error::Error::VolumeNotFound => {
Error::FileNotFound | Error::VolumeNotFound => {
match store
.get_object_reader(
RUSTFS_META_BUCKET,
@@ -536,7 +662,7 @@ pub async fn load_data_usage_cache(store: &crate::set_disk::SetDisks, name: &str
break;
}
Err(_) => match err {
crate::error::Error::FileNotFound | crate::error::Error::VolumeNotFound => {
Error::FileNotFound | Error::VolumeNotFound => {
break;
}
_ => {}
@@ -565,9 +691,9 @@ pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate:
use std::path::Path;
let Some(store) = new_object_layer_fn() else {
return Err(crate::error::Error::other("errServerNotInitialized"));
return Err(Error::other("errServerNotInitialized"));
};
let buf = cache.marshal_msg().map_err(crate::error::Error::other)?;
let buf = cache.marshal_msg().map_err(Error::other)?;
let buf_clone = buf.clone();
let store_clone = store.clone();

View File

@@ -1,13 +1,25 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use tokio::fs;
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::data_usage::BucketUsageInfo;
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use tokio::fs;
/// Directory used to store per-disk usage snapshots under the metadata bucket.
pub const DATA_USAGE_DIR: &str = "datausage";

View File

@@ -95,22 +95,22 @@ impl DiskHealthTracker {
/// Check if disk is faulty
pub fn is_faulty(&self) -> bool {
self.status.load(Ordering::Relaxed) == DISK_HEALTH_FAULTY
self.status.load(Ordering::Acquire) == DISK_HEALTH_FAULTY
}
/// Set disk as faulty
pub fn set_faulty(&self) {
self.status.store(DISK_HEALTH_FAULTY, Ordering::Relaxed);
self.status.store(DISK_HEALTH_FAULTY, Ordering::Release);
}
/// Set disk as OK
pub fn set_ok(&self) {
self.status.store(DISK_HEALTH_OK, Ordering::Relaxed);
self.status.store(DISK_HEALTH_OK, Ordering::Release);
}
pub fn swap_ok_to_faulty(&self) -> bool {
self.status
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::Relaxed, Ordering::Relaxed)
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
}
@@ -131,7 +131,7 @@ impl DiskHealthTracker {
/// Get last success timestamp
pub fn last_success(&self) -> i64 {
self.last_success.load(Ordering::Relaxed)
self.last_success.load(Ordering::Acquire)
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::error::{Error, Result};
use crate::disk::error::{Error, Result};
use path_absolutize::Absolutize;
use rustfs_utils::{is_local_host, is_socket_addr};
use std::{fmt::Display, path::Path};

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// use crate::quorum::CheckErrorFn;
use std::hash::{Hash, Hasher};
use std::io::{self};
use std::path::PathBuf;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::error::DiskError;
use crate::disk::error::DiskError;
pub fn to_file_error(io_err: std::io::Error) -> std::io::Error {
match io_err.kind() {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::error::Error;
use crate::disk::error::Error;
pub static OBJECT_OP_IGNORED_ERRS: &[Error] = &[
Error::DiskNotFound,

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::error::{Error, Result};
use super::{DiskInfo, error::DiskError};
use crate::disk::error::{Error, Result};
use crate::disk::{DiskInfo, error::DiskError};
use serde::{Deserialize, Serialize};
use serde_json::Error as JsonError;
use uuid::Uuid;

View File

@@ -17,7 +17,6 @@ use std::{
path::Path,
sync::{Arc, OnceLock},
};
use tokio::{
fs::{self, File},
io,

View File

@@ -12,38 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::error::{Error, Result};
use super::os::{is_root_disk, rename_all};
use super::{
BUCKET_META_PREFIX, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics,
FileInfoVersions, RUSTFS_META_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp,
STORAGE_FORMAT_FILE_BACKUP, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, os,
};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use crate::config::storageclass::DEFAULT_INLINE_BLOCK;
use crate::data_usage::local_snapshot::ensure_data_usage_layout;
use crate::disk::error::FileAccessDeniedWithContext;
use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error};
use crate::disk::fs::{
O_APPEND, O_CREATE, O_RDONLY, O_TRUNC, O_WRONLY, access, lstat, lstat_std, remove, remove_all_std, remove_std, rename,
};
use crate::disk::os::{check_path_length, is_empty_dir};
use crate::disk::{
CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, CHECK_PART_VOLUME_NOT_FOUND,
FileReader, RUSTFS_META_TMP_DELETED_BUCKET, conv_part_err_to_int,
BUCKET_META_PREFIX, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN,
CHECK_PART_VOLUME_NOT_FOUND, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics,
FileInfoVersions, FileReader, FileWriter, RUSTFS_META_BUCKET, RUSTFS_META_TMP_DELETED_BUCKET, ReadMultipleReq,
ReadMultipleResp, ReadOptions, RenameDataResp, STORAGE_FORMAT_FILE, STORAGE_FORMAT_FILE_BACKUP, UpdateMetadataOpts,
VolumeInfo, WalkDirOptions, conv_part_err_to_int,
endpoint::Endpoint,
error::{DiskError, Error, FileAccessDeniedWithContext, Result},
error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error},
format::FormatV3,
fs::{O_APPEND, O_CREATE, O_RDONLY, O_TRUNC, O_WRONLY, access, lstat, lstat_std, remove, remove_all_std, remove_std, rename},
os,
os::{check_path_length, is_empty_dir, is_root_disk, rename_all},
};
use crate::disk::{FileWriter, STORAGE_FORMAT_FILE};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
use rustfs_utils::path::{
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix,
path_join, path_join_buf,
};
use tokio::time::interval;
use crate::erasure_coding::bitrot_verify;
use bytes::Bytes;
// use path_absolutize::Absolutize; // Replaced with direct path operations for better performance
use crate::file_cache::{get_global_file_cache, prefetch_metadata_patterns, read_metadata_cached};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
use bytes::Bytes;
use parking_lot::RwLock as ParkingLotRwLock;
use rustfs_filemeta::{
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn,
@@ -51,6 +39,10 @@ use rustfs_filemeta::{
};
use rustfs_utils::HashAlgorithm;
use rustfs_utils::os::get_info;
use rustfs_utils::path::{
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR_STR, clean, decode_dir_object, encode_dir_object,
has_suffix, path_join, path_join_buf,
};
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -66,6 +58,7 @@ use time::OffsetDateTime;
use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ErrorKind};
use tokio::sync::RwLock;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
@@ -128,7 +121,8 @@ impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self> {
debug!("Creating local disk");
// Use optimized path resolution instead of absolutize() for better performance
let root = match std::fs::canonicalize(ep.get_file_path()) {
// Use dunce::canonicalize instead of std::fs::canonicalize to avoid UNC paths on Windows
let root = match dunce::canonicalize(ep.get_file_path()) {
Ok(path) => path,
Err(e) => {
if e.kind() == ErrorKind::NotFound {
@@ -482,23 +476,31 @@ impl LocalDisk {
// Async prefetch related files, don't block current read
if let Some(parent) = file_path.parent() {
prefetch_metadata_patterns(parent, &[super::STORAGE_FORMAT_FILE, "part.1", "part.2", "part.meta"]).await;
prefetch_metadata_patterns(parent, &[STORAGE_FORMAT_FILE, "part.1", "part.2", "part.meta"]).await;
}
// Main read logic
let file_dir = self.get_bucket_path(volume)?;
let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?;
get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data })
.await
.map_err(|_e| DiskError::Unexpected)
get_file_info(
&data,
volume,
path,
version_id,
FileInfoOpts {
data: opts.read_data,
include_free_versions: false,
},
)
.map_err(|_e| DiskError::Unexpected)
}
// Batch metadata reading for multiple objects
async fn read_metadata_batch(&self, requests: Vec<(String, String)>) -> Result<Vec<Option<Arc<FileMeta>>>> {
let paths: Vec<PathBuf> = requests
.iter()
.map(|(bucket, key)| self.get_object_path(bucket, &format!("{}/{}", key, super::STORAGE_FORMAT_FILE)))
.map(|(bucket, key)| self.get_object_path(bucket, &format!("{}/{}", key, STORAGE_FORMAT_FILE)))
.collect::<Result<Vec<_>>>()?;
let cache = get_global_file_cache();
@@ -535,7 +537,7 @@ impl LocalDisk {
// TODO: async notifications for disk space checks and trash cleanup
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
// if let Some(parent) = trash_path.parent() {
// if !parent.exists() {
// fs::create_dir_all(parent).await?;
@@ -543,7 +545,7 @@ impl LocalDisk {
// }
let err = if recursive {
rename_all(delete_path, trash_path, self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?)
rename_all(delete_path, trash_path, self.get_bucket_path(RUSTFS_META_TMP_DELETED_BUCKET)?)
.await
.err()
} else {
@@ -553,12 +555,12 @@ impl LocalDisk {
.err()
};
if immediate_purge || delete_path.to_string_lossy().ends_with(SLASH_SEPARATOR) {
let trash_path2 = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
if immediate_purge || delete_path.to_string_lossy().ends_with(SLASH_SEPARATOR_STR) {
let trash_path2 = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
let _ = rename_all(
encode_dir_object(delete_path.to_string_lossy().as_ref()),
trash_path2,
self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?,
self.get_bucket_path(RUSTFS_META_TMP_DELETED_BUCKET)?,
)
.await;
}
@@ -842,7 +844,11 @@ impl LocalDisk {
self.write_all_internal(&tmp_file_path, InternalBuf::Ref(buf), sync, &tmp_volume_dir)
.await?;
rename_all(tmp_file_path, file_path, volume_dir).await
rename_all(tmp_file_path, &file_path, volume_dir).await?;
// Invalidate cache after successful write
get_global_file_cache().invalidate(&file_path).await;
Ok(())
}
// write_all_public for trail
@@ -907,7 +913,7 @@ impl LocalDisk {
}
if let Some(parent) = path.as_ref().parent() {
super::os::make_dir_all(parent, skip_parent).await?;
os::make_dir_all(parent, skip_parent).await?;
}
let f = super::fs::open_file(path.as_ref(), mode).await.map_err(to_file_error)?;
@@ -933,7 +939,7 @@ impl LocalDisk {
let meta = file.metadata().await.map_err(to_file_error)?;
let file_size = meta.len() as usize;
bitrot_verify(Box::new(file), file_size, part_size, algo, bytes::Bytes::copy_from_slice(sum), shard_size)
bitrot_verify(Box::new(file), file_size, part_size, algo, Bytes::copy_from_slice(sum), shard_size)
.await
.map_err(to_file_error)?;
@@ -1029,15 +1035,16 @@ impl LocalDisk {
continue;
}
if entry.ends_with(SLASH_SEPARATOR) {
if entry.ends_with(SLASH_SEPARATOR_STR) {
if entry.ends_with(GLOBAL_DIR_SUFFIX_WITH_SLASH) {
let entry = format!("{}{}", entry.as_str().trim_end_matches(GLOBAL_DIR_SUFFIX_WITH_SLASH), SLASH_SEPARATOR);
let entry =
format!("{}{}", entry.as_str().trim_end_matches(GLOBAL_DIR_SUFFIX_WITH_SLASH), SLASH_SEPARATOR_STR);
dir_objes.insert(entry.clone());
*item = entry;
continue;
}
*item = entry.trim_end_matches(SLASH_SEPARATOR).to_owned();
*item = entry.trim_end_matches(SLASH_SEPARATOR_STR).to_owned();
continue;
}
@@ -1049,7 +1056,7 @@ impl LocalDisk {
.await?;
let entry = entry.strip_suffix(STORAGE_FORMAT_FILE).unwrap_or_default().to_owned();
let name = entry.trim_end_matches(SLASH_SEPARATOR);
let name = entry.trim_end_matches(SLASH_SEPARATOR_STR);
let name = decode_dir_object(format!("{}/{}", &current, &name).as_str());
// if opts.limit > 0
@@ -1132,7 +1139,7 @@ impl LocalDisk {
Ok(res) => {
if is_dir_obj {
meta.name = meta.name.trim_end_matches(GLOBAL_DIR_SUFFIX_WITH_SLASH).to_owned();
meta.name.push_str(SLASH_SEPARATOR);
meta.name.push_str(SLASH_SEPARATOR_STR);
}
meta.metadata = res;
@@ -1150,7 +1157,7 @@ impl LocalDisk {
// NOT an object, append to stack (with slash)
// If dirObject, but no metadata (which is unexpected) we skip it.
if !is_dir_obj && !is_empty_dir(self.get_object_path(&opts.bucket, &meta.name)?).await {
meta.name.push_str(SLASH_SEPARATOR);
meta.name.push_str(SLASH_SEPARATOR_STR);
dir_stack.push(meta.name);
}
}
@@ -1225,7 +1232,7 @@ async fn read_file_metadata(p: impl AsRef<Path>) -> Result<Metadata> {
fn skip_access_checks(p: impl AsRef<str>) -> bool {
let vols = [
super::RUSTFS_META_TMP_DELETED_BUCKET,
RUSTFS_META_TMP_DELETED_BUCKET,
super::RUSTFS_META_TMP_BUCKET,
super::RUSTFS_META_MULTIPART_BUCKET,
RUSTFS_META_BUCKET,
@@ -1619,8 +1626,8 @@ impl DiskAPI for LocalDisk {
super::fs::access_std(&dst_volume_dir).map_err(|e| to_access_error(e, DiskError::VolumeAccessDenied))?
}
let src_is_dir = has_suffix(src_path, SLASH_SEPARATOR);
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR);
let src_is_dir = has_suffix(src_path, SLASH_SEPARATOR_STR);
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR_STR);
if !src_is_dir && dst_is_dir || src_is_dir && !dst_is_dir {
warn!(
@@ -1686,8 +1693,8 @@ impl DiskAPI for LocalDisk {
.map_err(|e| to_access_error(e, DiskError::VolumeAccessDenied))?;
}
let src_is_dir = has_suffix(src_path, SLASH_SEPARATOR);
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR);
let src_is_dir = has_suffix(src_path, SLASH_SEPARATOR_STR);
let dst_is_dir = has_suffix(dst_path, SLASH_SEPARATOR_STR);
if (dst_is_dir || src_is_dir) && (!dst_is_dir || !src_is_dir) {
return Err(Error::from(DiskError::FileAccessDenied));
}
@@ -1838,12 +1845,12 @@ impl DiskAPI for LocalDisk {
}
let volume_dir = self.get_bucket_path(volume)?;
let dir_path_abs = self.get_object_path(volume, dir_path.trim_start_matches(SLASH_SEPARATOR))?;
let dir_path_abs = self.get_object_path(volume, dir_path.trim_start_matches(SLASH_SEPARATOR_STR))?;
let entries = match os::read_dir(&dir_path_abs, count).await {
Ok(res) => res,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound
if e.kind() == ErrorKind::NotFound
&& !skip_access_checks(volume)
&& let Err(e) = access(&volume_dir).await
{
@@ -1874,11 +1881,11 @@ impl DiskAPI for LocalDisk {
let mut objs_returned = 0;
if opts.base_dir.ends_with(SLASH_SEPARATOR) {
if opts.base_dir.ends_with(SLASH_SEPARATOR_STR) {
let fpath = self.get_object_path(
&opts.bucket,
path_join_buf(&[
format!("{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR), GLOBAL_DIR_SUFFIX).as_str(),
format!("{}{}", opts.base_dir.trim_end_matches(SLASH_SEPARATOR_STR), GLOBAL_DIR_SUFFIX).as_str(),
STORAGE_FORMAT_FILE,
])
.as_str(),
@@ -2110,7 +2117,7 @@ impl DiskAPI for LocalDisk {
let volume_dir = self.get_bucket_path(volume)?;
if let Err(e) = access(&volume_dir).await {
if e.kind() == std::io::ErrorKind::NotFound {
if e.kind() == ErrorKind::NotFound {
os::make_dir_all(&volume_dir, self.root.as_path()).await?;
return Ok(());
}
@@ -2128,7 +2135,7 @@ impl DiskAPI for LocalDisk {
let entries = os::read_dir(&self.root, -1).await.map_err(to_volume_error)?;
for entry in entries {
if !has_suffix(&entry, SLASH_SEPARATOR) || !Self::is_valid_volname(clean(&entry).as_str()) {
if !has_suffix(&entry, SLASH_SEPARATOR_STR) || !Self::is_valid_volname(clean(&entry).as_str()) {
continue;
}
@@ -2240,20 +2247,93 @@ impl DiskAPI for LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn read_version(
&self,
_org_volume: &str,
org_volume: &str,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
if !org_volume.is_empty() {
let org_volume_path = self.get_bucket_path(org_volume)?;
if !skip_access_checks(org_volume) {
access(&org_volume_path)
.await
.map_err(|e| to_access_error(e, DiskError::VolumeAccessDenied))?;
}
}
let file_path = self.get_object_path(volume, path)?;
let file_dir = self.get_bucket_path(volume)?;
let volume_dir = self.get_bucket_path(volume)?;
check_path_length(file_path.to_string_lossy().as_ref())?;
let read_data = opts.read_data;
let (data, _) = self.read_raw(volume, file_dir, file_path, read_data).await?;
let (data, _) = self
.read_raw(volume, volume_dir.clone(), file_path, read_data)
.await
.map_err(|e| {
if e == DiskError::FileNotFound && !version_id.is_empty() {
DiskError::FileVersionNotFound
} else {
e
}
})?;
let fi = get_file_info(&data, volume, path, version_id, FileInfoOpts { data: read_data }).await?;
let mut fi = get_file_info(
&data,
volume,
path,
version_id,
FileInfoOpts {
data: read_data,
include_free_versions: opts.incl_free_versions,
},
)?;
if opts.read_data {
if fi.data.as_ref().is_some_and(|d| !d.is_empty()) || fi.size == 0 {
if fi.inline_data() {
return Ok(fi);
}
if fi.size == 0 || fi.version_id.is_none_or(|v| v.is_nil()) {
fi.set_inline_data();
return Ok(fi);
};
if let Some(part) = fi.parts.first() {
let part_path = format!("part.{}", part.number);
let part_path = path_join_buf(&[
path,
fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(),
part_path.as_str(),
]);
let part_path = self.get_object_path(volume, part_path.as_str())?;
if lstat(&part_path).await.is_err() {
fi.set_inline_data();
return Ok(fi);
}
}
fi.data = None;
}
let inline = fi.transition_status.is_empty() && fi.data_dir.is_some() && fi.parts.len() == 1;
if inline && fi.shard_file_size(fi.parts[0].actual_size) < DEFAULT_INLINE_BLOCK as i64 {
let part_path = path_join_buf(&[
path,
fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(),
format!("part.{}", fi.parts[0].number).as_str(),
]);
let part_path = self.get_object_path(volume, part_path.as_str())?;
let data = self.read_all_data(volume, volume_dir, part_path.clone()).await.map_err(|e| {
warn!("read_version read_all_data {:?} failed: {e}", part_path);
e
})?;
fi.data = Some(Bytes::from(data));
}
}
Ok(fi)
}
@@ -2277,7 +2357,7 @@ impl DiskAPI for LocalDisk {
force_del_marker: bool,
opts: DeleteOptions,
) -> Result<()> {
if path.starts_with(SLASH_SEPARATOR) {
if path.starts_with(SLASH_SEPARATOR_STR) {
return self
.delete(
volume,
@@ -2338,7 +2418,7 @@ impl DiskAPI for LocalDisk {
if !meta.versions.is_empty() {
let buf = meta.marshal_msg()?;
return self
.write_all_meta(volume, format!("{path}{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}").as_str(), &buf, true)
.write_all_meta(volume, format!("{path}{SLASH_SEPARATOR_STR}{STORAGE_FORMAT_FILE}").as_str(), &buf, true)
.await;
}
@@ -2348,11 +2428,11 @@ impl DiskAPI for LocalDisk {
{
let src_path = path_join(&[
file_path.as_path(),
Path::new(format!("{old_data_dir}{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE_BACKUP}").as_str()),
Path::new(format!("{old_data_dir}{SLASH_SEPARATOR_STR}{STORAGE_FORMAT_FILE_BACKUP}").as_str()),
]);
let dst_path = path_join(&[
file_path.as_path(),
Path::new(format!("{path}{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}").as_str()),
Path::new(format!("{path}{SLASH_SEPARATOR_STR}{STORAGE_FORMAT_FILE}").as_str()),
]);
return rename_all(src_path, dst_path, file_path).await;
}
@@ -2481,7 +2561,7 @@ async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInf
if root_disk_threshold > 0 {
disk_info.total <= root_disk_threshold
} else {
is_root_disk(&drive_path, SLASH_SEPARATOR).unwrap_or_default()
is_root_disk(&drive_path, SLASH_SEPARATOR_STR).unwrap_or_default()
}
} else {
false
@@ -2499,7 +2579,7 @@ mod test {
// let arr = Vec::new();
let vols = [
super::super::RUSTFS_META_TMP_DELETED_BUCKET,
RUSTFS_META_TMP_DELETED_BUCKET,
super::super::RUSTFS_META_TMP_BUCKET,
super::super::RUSTFS_META_MULTIPART_BUCKET,
RUSTFS_META_BUCKET,
@@ -2527,9 +2607,7 @@ mod test {
let disk = LocalDisk::new(&ep, false).await.unwrap();
let tmpp = disk
.resolve_abs_path(Path::new(super::super::RUSTFS_META_TMP_DELETED_BUCKET))
.unwrap();
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
println!("ppp :{:?}", &tmpp);
@@ -2557,9 +2635,7 @@ mod test {
let disk = LocalDisk::new(&ep, false).await.unwrap();
let tmpp = disk
.resolve_abs_path(Path::new(super::super::RUSTFS_META_TMP_DELETED_BUCKET))
.unwrap();
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
println!("ppp :{:?}", &tmpp);

View File

@@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::disk::error::DiskError;
use crate::disk::error::Result;
use crate::disk::error_conv::to_file_error;
use rustfs_utils::path::SLASH_SEPARATOR_STR;
use std::{
io,
path::{Component, Path},
};
use super::error::Result;
use crate::disk::error_conv::to_file_error;
use rustfs_utils::path::SLASH_SEPARATOR;
use tokio::fs;
use tracing::warn;
use super::error::DiskError;
/// Check path length according to OS limits.
pub fn check_path_length(path_name: &str) -> Result<()> {
// Apple OS X path length is limited to 1016
@@ -118,7 +116,7 @@ pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> std::io::Result<Vec
if file_type.is_file() {
volumes.push(name);
} else if file_type.is_dir() {
volumes.push(format!("{name}{SLASH_SEPARATOR}"));
volumes.push(format!("{name}{SLASH_SEPARATOR_STR}"));
}
count -= 1;
if count == 0 {

View File

@@ -12,19 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use tracing::{error, info, instrument, warn};
use crate::{
disk::endpoint::{Endpoint, EndpointType},
disks_layout::DisksLayout,
global::global_rustfs_port,
};
use std::io::{Error, Result};
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use std::{
collections::{HashMap, HashSet, hash_map::Entry},
io::{Error, Result},
net::IpAddr,
};
use tracing::{error, info, instrument, warn};
/// enum for setup type.
#[derive(PartialEq, Eq, Debug, Clone)]

View File

@@ -108,7 +108,6 @@ pin_project! {
inner: W,
hash_algo: HashAlgorithm,
shard_size: usize,
buf: Vec<u8>,
finished: bool,
}
}
@@ -124,7 +123,6 @@ where
inner,
hash_algo,
shard_size,
buf: Vec::new(),
finished: false,
}
}
@@ -159,19 +157,19 @@ where
if hash_algo.size() > 0 {
let hash = hash_algo.hash_encode(buf);
self.buf.extend_from_slice(hash.as_ref());
if hash.as_ref().is_empty() {
error!("bitrot writer write hash error: hash is empty");
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "hash is empty"));
}
self.inner.write_all(hash.as_ref()).await?;
}
self.buf.extend_from_slice(buf);
self.inner.write_all(buf).await?;
self.inner.write_all(&self.buf).await?;
// self.inner.flush().await?;
self.inner.flush().await?;
let n = buf.len();
self.buf.clear();
Ok(n)
}

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::BitrotReader;
use super::Erasure;
use crate::disk::error::Error;
use crate::disk::error_reduce::reduce_errs;
use crate::erasure_coding::{BitrotReader, Erasure};
use futures::stream::{FuturesUnordered, StreamExt};
use pin_project_lite::pin_project;
use std::io;
@@ -312,11 +311,12 @@ impl Erasure {
#[cfg(test)]
mod tests {
use rustfs_utils::HashAlgorithm;
use crate::{disk::error::DiskError, erasure_coding::BitrotWriter};
use super::*;
use crate::{
disk::error::DiskError,
erasure_coding::{BitrotReader, BitrotWriter},
};
use rustfs_utils::HashAlgorithm;
use std::io::Cursor;
#[tokio::test]

View File

@@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::BitrotWriterWrapper;
use super::Erasure;
use crate::disk::error::Error;
use crate::disk::error_reduce::count_errs;
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_write_quorum_errs};
use crate::erasure_coding::BitrotWriterWrapper;
use crate::erasure_coding::Erasure;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::FuturesUnordered;

View File

@@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::BitrotReader;
use super::BitrotWriterWrapper;
use super::decode::ParallelReader;
use crate::disk::error::{Error, Result};
use crate::erasure_coding::BitrotReader;
use crate::erasure_coding::BitrotWriterWrapper;
use crate::erasure_coding::decode::ParallelReader;
use crate::erasure_coding::encode::MultiWriter;
use bytes::Bytes;
use tokio::io::AsyncRead;

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod bitrot;
pub mod decode;
pub mod encode;
pub mod erasure;
pub mod heal;
mod bitrot;
pub use bitrot::*;
pub use erasure::{Erasure, ReedSolomonEncoder, calc_shard_size};

View File

@@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use s3s::{S3Error, S3ErrorCode};
use rustfs_utils::path::decode_dir_object;
use crate::bucket::error::BucketMetadataError;
use crate::disk::error::DiskError;
use rustfs_utils::path::decode_dir_object;
use s3s::{S3Error, S3ErrorCode};
pub type Error = StorageError;
pub type Result<T> = core::result::Result<T, Error>;

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::event::targetid::TargetID;
use std::sync::atomic::AtomicI64;
use super::targetid::TargetID;
#[derive(Default)]
pub struct TargetList {
pub current_send_calls: AtomicI64,

View File

@@ -14,15 +14,14 @@
// limitations under the License.
#![allow(unused_variables)]
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::bucket::metadata::BucketMetadata;
use crate::event::name::EventName;
use crate::event::targetlist::TargetList;
use crate::store::ECStore;
use crate::store_api::ObjectInfo;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct EventNotifier {
target_list: TargetList,

View File

@@ -12,12 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{
admin_server_info::get_local_server_property,
new_object_layer_fn,
store_api::StorageAPI,
// utils::os::get_drive_stats,
};
use crate::{admin_server_info::get_local_server_property, new_object_layer_fn, store_api::StorageAPI};
use chrono::Utc;
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_ADDR, heal_channel::DriveState, metrics::global_metrics};
use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};

View File

@@ -38,7 +38,7 @@ use rustfs_common::defer;
use rustfs_common::heal_channel::HealOpts;
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams};
use rustfs_rio::{HashReader, WarpReader};
use rustfs_utils::path::{SLASH_SEPARATOR, encode_dir_object, path_join};
use rustfs_utils::path::{SLASH_SEPARATOR_STR, encode_dir_object, path_join};
use rustfs_workers::workers::Workers;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -451,10 +451,10 @@ fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (String, S
let trimmed_path = path
.strip_prefix(base_path)
.unwrap_or(path)
.strip_prefix(SLASH_SEPARATOR)
.strip_prefix(SLASH_SEPARATOR_STR)
.unwrap_or(path);
// Find the position of the first '/'
let pos = trimmed_path.find(SLASH_SEPARATOR).unwrap_or(trimmed_path.len());
let pos = trimmed_path.find(SLASH_SEPARATOR_STR).unwrap_or(trimmed_path.len());
// Split into bucket and prefix
let bucket = &trimmed_path[0..pos];
let prefix = &trimmed_path[pos + 1..]; // +1 to skip the '/' character if it exists

View File

@@ -12,16 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::error::Error;
use crate::rpc::{TONIC_RPC_PREFIX, gen_signature_headers};
use http::Method;
use rustfs_common::GLOBAL_CONN_MAP;
use rustfs_protos::{create_new_channel, proto_gen::node_service::node_service_client::NodeServiceClient};
use std::error::Error;
use tonic::{service::interceptor::InterceptedService, transport::Channel};
use tracing::debug;
use crate::rpc::{TONIC_RPC_PREFIX, gen_signature_headers};
/// 3. Subsequent calls will attempt fresh connections
/// 4. If node is still down, connection will fail fast (3s timeout)
pub async fn node_service_time_out_client(

View File

@@ -108,14 +108,19 @@ pub fn verify_rpc_signature(url: &str, method: &Method, headers: &HeaderMap) ->
}
// Generate expected signature
let expected_signature = generate_signature(&secret, url, method, timestamp);
// Compare signatures
if signature != expected_signature {
error!(
"verify_rpc_signature: Invalid signature: secret {}, url {}, method {}, timestamp {}, signature {}, expected_signature {}",
secret, url, method, timestamp, signature, expected_signature
"verify_rpc_signature: Invalid signature: url {}, method {}, timestamp {}, signature {}, expected_signature: {}***{}|{}",
url,
method,
timestamp,
signature,
expected_signature.chars().next().unwrap_or('*'),
expected_signature.chars().last().unwrap_or('*'),
expected_signature.len()
);
return Err(std::io::Error::other("Invalid signature"));

View File

@@ -27,7 +27,6 @@ use rustfs_madmin::{
net::NetInfo,
};
use rustfs_protos::evict_failed_connection;
use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_protos::proto_gen::node_service::{
DeleteBucketMetadataRequest, DeletePolicyRequest, DeleteServiceAccountRequest, DeleteUserRequest, GetCpusRequest,
GetMemInfoRequest, GetMetricsRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest,
@@ -35,6 +34,7 @@ use rustfs_protos::proto_gen::node_service::{
LoadPolicyMappingRequest, LoadPolicyRequest, LoadRebalanceMetaRequest, LoadServiceAccountRequest,
LoadTransitionTierConfigRequest, LoadUserRequest, LocalStorageInfoRequest, Mss, ReloadPoolMetaRequest,
ReloadSiteReplicationConfigRequest, ServerInfoRequest, SignalServiceRequest, StartProfilingRequest, StopRebalanceRequest,
node_service_client::NodeServiceClient,
};
use rustfs_utils::XHost;
use serde::{Deserialize, Serialize as _};

View File

@@ -12,15 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
path::PathBuf,
sync::{Arc, atomic::Ordering},
time::Duration,
use crate::{
disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo,
WalkDirOptions,
disk_store::{
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE,
get_max_timeout_duration,
},
endpoint::Endpoint,
{
disk_store::DiskHealthTracker,
error::{DiskError, Error, Result},
},
},
rpc::build_auth_headers,
rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client},
};
use bytes::Bytes;
use futures::lock::Mutex;
use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE};
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_protos::proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
@@ -28,37 +42,18 @@ use rustfs_protos::proto_gen::node_service::{
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
node_service_client::NodeServiceClient,
};
use rustfs_utils::string::parse_bool_with_default;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::disk::{disk_store::DiskHealthTracker, error::DiskError};
use crate::{
disk::error::{Error, Result},
rpc::build_auth_headers,
};
use crate::{
disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
disk_store::{
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE,
get_max_timeout_duration,
},
endpoint::Endpoint,
},
rpc::client::gen_tonic_signature_interceptor,
};
use crate::{
disk::{FileReader, FileWriter},
rpc::client::{TonicInterceptor, node_service_time_out_client},
};
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_rio::{HttpReader, HttpWriter};
use rustfs_utils::string::parse_bool_with_default;
use std::{
path::PathBuf,
sync::{Arc, atomic::Ordering},
time::Duration,
};
use tokio::time;
use tokio::{io::AsyncWrite, net::TcpStream, time::timeout};
use tokio_util::sync::CancellationToken;
use tonic::{Request, service::interceptor::InterceptedService, transport::Channel};
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug)]

View File

@@ -14,9 +14,10 @@
use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use async_trait::async_trait;
use rustfs_lock::types::{LockId, LockMetadata, LockPriority};
use rustfs_lock::{LockClient, LockError, LockInfo, LockResponse, LockStats, LockStatus, Result};
use rustfs_lock::{LockRequest, LockType};
use rustfs_lock::{
LockClient, LockError, LockInfo, LockRequest, LockResponse, LockStats, LockStatus, LockType, Result,
types::{LockId, LockMetadata, LockPriority},
};
use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_protos::proto_gen::node_service::{GenerallyLockRequest, PingRequest};
use std::collections::HashMap;

View File

@@ -82,7 +82,7 @@ use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX,
use rustfs_utils::{
HashAlgorithm,
crypto::hex,
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf},
path::{SLASH_SEPARATOR_STR, encode_dir_object, has_suffix, path_join_buf},
};
use rustfs_workers::workers::Workers;
use s3s::header::X_AMZ_RESTORE;
@@ -1485,20 +1485,8 @@ impl SetDisks {
let object = object.clone();
let version_id = version_id.clone();
tokio::spawn(async move {
if let Some(disk) = disk
&& disk.is_online().await
{
if version_id.is_empty() {
match disk.read_xl(&bucket, &object, read_data).await {
Ok(info) => {
let fi = file_info_from_raw(info, &bucket, &object, read_data).await?;
Ok(fi)
}
Err(err) => Err(err),
}
} else {
disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await
}
if let Some(disk) = disk {
disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await
} else {
Err(DiskError::DiskNotFound)
}
@@ -1626,7 +1614,7 @@ impl SetDisks {
bucket: &str,
object: &str,
read_data: bool,
_incl_free_vers: bool,
incl_free_vers: bool,
) -> (Vec<FileInfo>, Vec<Option<DiskError>>) {
let mut metadata_array = vec![None; fileinfos.len()];
let mut meta_file_infos = vec![FileInfo::default(); fileinfos.len()];
@@ -1676,7 +1664,7 @@ impl SetDisks {
..Default::default()
};
let finfo = match meta.into_fileinfo(bucket, object, "", true, true) {
let finfo = match meta.into_fileinfo(bucket, object, "", true, incl_free_vers, true) {
Ok(res) => res,
Err(err) => {
for item in errs.iter_mut() {
@@ -1703,7 +1691,7 @@ impl SetDisks {
for (idx, meta_op) in metadata_array.iter().enumerate() {
if let Some(meta) = meta_op {
match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, true) {
match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, incl_free_vers, true) {
Ok(res) => meta_file_infos[idx] = res,
Err(err) => errs[idx] = Some(err.into()),
}
@@ -4626,7 +4614,9 @@ impl StorageAPI for SetDisks {
.await
.map_err(|e| to_object_err(e, vec![bucket, object]))?;
Ok(ObjectInfo::from_file_info(&dfi, bucket, object, opts.versioned || opts.version_suspended))
let mut obj_info = ObjectInfo::from_file_info(&dfi, bucket, object, opts.versioned || opts.version_suspended);
obj_info.size = goi.size;
Ok(obj_info)
}
#[tracing::instrument(skip(self))]
@@ -5336,7 +5326,7 @@ impl StorageAPI for SetDisks {
&upload_id_path,
fi.data_dir.map(|v| v.to_string()).unwrap_or_default().as_str(),
]),
SLASH_SEPARATOR
SLASH_SEPARATOR_STR
);
let mut part_numbers = match Self::list_parts(&online_disks, &part_path, read_quorum).await {
@@ -5474,7 +5464,7 @@ impl StorageAPI for SetDisks {
let mut populated_upload_ids = HashSet::new();
for upload_id in upload_ids.iter() {
let upload_id = upload_id.trim_end_matches(SLASH_SEPARATOR).to_string();
let upload_id = upload_id.trim_end_matches(SLASH_SEPARATOR_STR).to_string();
if populated_upload_ids.contains(&upload_id) {
continue;
}
@@ -6234,7 +6224,7 @@ impl StorageAPI for SetDisks {
None
};
if has_suffix(object, SLASH_SEPARATOR) {
if has_suffix(object, SLASH_SEPARATOR_STR) {
let (result, err) = self.heal_object_dir_locked(bucket, object, opts.dry_run, opts.remove).await?;
return Ok((result, err.map(|e| e.into())));
}

View File

@@ -13,8 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, sync::Arc};
use crate::disk::error_reduce::count_errs;
use crate::error::{Error, Result};
use crate::store_api::{ListPartsInfo, ObjectInfoOrErr, WalkOptions};
@@ -44,17 +42,16 @@ use rustfs_common::{
heal_channel::{DriveState, HealItemType},
};
use rustfs_filemeta::FileInfo;
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_utils::{crc_hash, path::path_join_buf, sip_hash};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use tracing::{error, info};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct Sets {

View File

@@ -1825,16 +1825,16 @@ impl StorageAPI for ECStore {
if self.is_suspended(pool.pool_idx).await {
continue;
}
match pool
return match pool
.list_object_parts(bucket, object, upload_id, part_number_marker, max_parts, opts)
.await
{
Ok(res) => return Ok(res),
Ok(res) => Ok(res),
Err(err) => {
if is_err_invalid_upload_id(&err) {
continue;
}
return Err(err);
Err(err)
}
};
}
@@ -2204,7 +2204,7 @@ impl StorageAPI for ECStore {
async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> {
check_del_obj_args(bucket, object)?;
let object = rustfs_utils::path::encode_dir_object(object);
let object = encode_dir_object(object);
if self.single_pool() {
return self.pools[0]
@@ -2324,17 +2324,15 @@ impl StorageAPI for ECStore {
// No pool returned a nil error, return the first non 'not found' error
for (index, err) in errs.iter().enumerate() {
match err {
return match err {
Some(err) => {
if is_err_object_not_found(err) || is_err_version_not_found(err) {
continue;
}
return Ok((ress.remove(index), Some(err.clone())));
Ok((ress.remove(index), Some(err.clone())))
}
None => {
return Ok((ress.remove(index), None));
}
}
None => Ok((ress.remove(index), None)),
};
}
// At this stage, all errors are 'not found'

View File

@@ -28,14 +28,15 @@ use http::{HeaderMap, HeaderValue};
use rustfs_common::heal_channel::HealOpts;
use rustfs_filemeta::{
FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState,
ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
ReplicationStatusType, RestoreStatusOps as _, VersionPurgeStatusType, parse_restore_obj_status, replication_statuses_map,
version_purge_statuses_map,
};
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_rio::Checksum;
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_RESTORE, AMZ_STORAGE_CLASS};
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -756,7 +757,24 @@ impl ObjectInfo {
.ok()
});
// TODO:ReplicationState
let replication_status_internal = fi
.replication_state_internal
.as_ref()
.and_then(|v| v.replication_status_internal.clone());
let version_purge_status_internal = fi
.replication_state_internal
.as_ref()
.and_then(|v| v.version_purge_status_internal.clone());
let mut replication_status = fi.replication_status();
if replication_status.is_empty()
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS).cloned()
&& status == ReplicationStatusType::Replica.as_str()
{
replication_status = ReplicationStatusType::Replica;
}
let version_purge_status = fi.version_purge_status();
let transitioned_object = TransitionedObject {
name: fi.transitioned_objname.clone(),
@@ -777,10 +795,24 @@ impl ObjectInfo {
};
// Extract storage class from metadata, default to STANDARD if not found
let storage_class = metadata
.get(AMZ_STORAGE_CLASS)
.cloned()
.or_else(|| Some(storageclass::STANDARD.to_string()));
let storage_class = if !fi.transition_tier.is_empty() {
Some(fi.transition_tier.clone())
} else {
fi.metadata
.get(AMZ_STORAGE_CLASS)
.cloned()
.or_else(|| Some(storageclass::STANDARD.to_string()))
};
let mut restore_ongoing = false;
let mut restore_expires = None;
if let Some(restore_status) = fi.metadata.get(AMZ_RESTORE).cloned() {
//
if let Ok(restore_status) = parse_restore_obj_status(&restore_status) {
restore_ongoing = restore_status.on_going();
restore_expires = restore_status.expiry();
}
}
// Convert parts from rustfs_filemeta::ObjectPartInfo to store_api::ObjectPartInfo
let parts = fi
@@ -798,6 +830,8 @@ impl ObjectInfo {
})
.collect();
// TODO: part checksums
ObjectInfo {
bucket: bucket.to_string(),
name,
@@ -822,6 +856,12 @@ impl ObjectInfo {
transitioned_object,
checksum: fi.checksum.clone(),
storage_class,
restore_ongoing,
restore_expires,
replication_status_internal,
replication_status,
version_purge_status_internal,
version_purge_status,
..Default::default()
}
}

View File

@@ -27,7 +27,6 @@ use crate::{
};
use futures::future::join_all;
use std::collections::{HashMap, hash_map::Entry};
use tracing::{info, warn};
use uuid::Uuid;

View File

@@ -34,7 +34,7 @@ use rustfs_filemeta::{
MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, MetadataResolutionParams,
merge_file_meta_versions,
};
use rustfs_utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix};
use rustfs_utils::path::{self, SLASH_SEPARATOR_STR, base_dir_from_prefix};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast::{self};
@@ -132,7 +132,7 @@ impl ListPathOptions {
return;
}
let s = SLASH_SEPARATOR.chars().next().unwrap_or_default();
let s = SLASH_SEPARATOR_STR.chars().next().unwrap_or_default();
self.filter_prefix = {
let fp = self.prefix.trim_start_matches(&self.base_dir).trim_matches(s);
@@ -346,7 +346,7 @@ impl ECStore {
if let Some(delimiter) = &delimiter {
if obj.is_dir && obj.mod_time.is_none() {
let mut found = false;
if delimiter != SLASH_SEPARATOR {
if delimiter != SLASH_SEPARATOR_STR {
for p in prefixes.iter() {
if found {
break;
@@ -410,13 +410,13 @@ impl ECStore {
..Default::default()
};
let mut list_result = match self.list_path(&opts).await {
Ok(res) => res,
Err(err) => MetaCacheEntriesSortedResult {
let mut list_result = self
.list_path(&opts)
.await
.unwrap_or_else(|err| MetaCacheEntriesSortedResult {
err: Some(err.into()),
..Default::default()
},
};
});
if let Some(err) = list_result.err.clone()
&& err != rustfs_filemeta::Error::Unexpected
@@ -470,7 +470,7 @@ impl ECStore {
if let Some(delimiter) = &delimiter {
if obj.is_dir && obj.mod_time.is_none() {
let mut found = false;
if delimiter != SLASH_SEPARATOR {
if delimiter != SLASH_SEPARATOR_STR {
for p in prefixes.iter() {
if found {
break;
@@ -502,7 +502,7 @@ impl ECStore {
// warn!("list_path opt {:?}", &o);
check_list_objs_args(&o.bucket, &o.prefix, &o.marker)?;
// if opts.prefix.ends_with(SLASH_SEPARATOR) {
// if opts.prefix.ends_with(SLASH_SEPARATOR_STR) {
// return Err(Error::msg("eof"));
// }
@@ -520,11 +520,11 @@ impl ECStore {
return Err(Error::Unexpected);
}
if o.prefix.starts_with(SLASH_SEPARATOR) {
if o.prefix.starts_with(SLASH_SEPARATOR_STR) {
return Err(Error::Unexpected);
}
let slash_separator = Some(SLASH_SEPARATOR.to_owned());
let slash_separator = Some(SLASH_SEPARATOR_STR.to_owned());
o.include_directories = o.separator == slash_separator;
@@ -774,8 +774,8 @@ impl ECStore {
let mut filter_prefix = {
prefix
.trim_start_matches(&path)
.trim_start_matches(SLASH_SEPARATOR)
.trim_end_matches(SLASH_SEPARATOR)
.trim_start_matches(SLASH_SEPARATOR_STR)
.trim_end_matches(SLASH_SEPARATOR_STR)
.to_owned()
};
@@ -988,7 +988,7 @@ async fn gather_results(
}
if let Some(marker) = &opts.marker
&& &entry.name < marker
&& &entry.name <= marker
{
continue;
}
@@ -1130,7 +1130,7 @@ async fn merge_entry_channels(
if path::clean(&best_entry.name) == path::clean(&other_entry.name) {
let dir_matches = best_entry.is_dir() && other_entry.is_dir();
let suffix_matches =
best_entry.name.ends_with(SLASH_SEPARATOR) == other_entry.name.ends_with(SLASH_SEPARATOR);
best_entry.name.ends_with(SLASH_SEPARATOR_STR) == other_entry.name.ends_with(SLASH_SEPARATOR_STR);
if dir_matches && suffix_matches {
to_merge.push(other_idx);
@@ -1476,7 +1476,6 @@ mod test {
// use crate::error::Error;
// use crate::metacache::writer::MetacacheReader;
// use crate::set_disk::SetDisks;
// use crate::store::ECStore;
// use crate::store_list_objects::ListPathOptions;
// use crate::store_list_objects::WalkOptions;
// use crate::store_list_objects::WalkVersionsSortOrder;

View File

@@ -51,7 +51,7 @@ use crate::{
store_api::{ObjectOptions, PutObjReader},
};
use rustfs_rio::HashReader;
use rustfs_utils::path::{SLASH_SEPARATOR, path_join};
use rustfs_utils::path::{SLASH_SEPARATOR_STR, path_join};
use s3s::S3ErrorCode;
use super::{
@@ -403,7 +403,7 @@ impl TierConfigMgr {
pub async fn save_tiering_config<S: StorageAPI>(&self, api: Arc<S>) -> std::result::Result<(), std::io::Error> {
let data = self.marshal()?;
let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR, TIER_CONFIG_FILE);
let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR_STR, TIER_CONFIG_FILE);
self.save_config(api, &config_file, data).await
}
@@ -483,7 +483,7 @@ async fn new_and_save_tiering_config<S: StorageAPI>(api: Arc<S>) -> Result<TierC
#[tracing::instrument(level = "debug", name = "load_tier_config", skip(api))]
async fn load_tier_config(api: Arc<ECStore>) -> std::result::Result<TierConfigMgr, std::io::Error> {
let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR, TIER_CONFIG_FILE);
let config_file = format!("{}{}{}", CONFIG_PREFIX, SLASH_SEPARATOR_STR, TIER_CONFIG_FILE);
let data = read_config(api.clone(), config_file.as_str()).await;
if let Err(err) = data {
if is_err_config_not_found(&err) {

View File

@@ -30,13 +30,11 @@ use crate::client::{
transition_api::{Options, TransitionClient, TransitionCore},
transition_api::{ReadCloser, ReaderImpl},
};
use crate::error::ErrorResponse;
use crate::error::error_resp_to_object_err;
use crate::tier::{
tier_config::TierS3,
warm_backend::{WarmBackend, WarmBackendGetOpts},
};
use rustfs_utils::path::SLASH_SEPARATOR;
use rustfs_utils::path::SLASH_SEPARATOR_STR;
pub struct WarmBackendS3 {
pub client: Arc<TransitionClient>,
@@ -178,7 +176,7 @@ impl WarmBackend for WarmBackendS3 {
async fn in_use(&self) -> Result<bool, std::io::Error> {
let result = self
.core
.list_objects_v2(&self.bucket, &self.prefix, "", "", SLASH_SEPARATOR, 1)
.list_objects_v2(&self.bucket, &self.prefix, "", "", SLASH_SEPARATOR_STR, 1)
.await?;
Ok(result.common_prefixes.len() > 0 || result.contents.len() > 0)

View File

@@ -27,19 +27,11 @@ use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::primitives::ByteStream;
use crate::client::{
api_get_options::GetObjectOptions,
api_put_object::PutObjectOptions,
api_remove::RemoveObjectOptions,
transition_api::{ReadCloser, ReaderImpl},
};
use crate::error::ErrorResponse;
use crate::error::error_resp_to_object_err;
use crate::client::transition_api::{ReadCloser, ReaderImpl};
use crate::tier::{
tier_config::TierS3,
warm_backend::{WarmBackend, WarmBackendGetOpts},
};
use rustfs_utils::path::SLASH_SEPARATOR;
pub struct WarmBackendS3 {
pub client: Arc<Client>,

View File

@@ -505,6 +505,10 @@ impl FileInfo {
ReplicationStatusType::Empty
}
}
pub fn shard_file_size(&self, total_length: i64) -> i64 {
self.erasure.shard_file_size(total_length)
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -590,7 +594,7 @@ impl RestoreStatusOps for RestoreStatus {
}
}
fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
pub fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
let tokens: Vec<&str> = restore_hdr.splitn(2, ",").collect();
let progress_tokens: Vec<&str> = tokens[0].splitn(2, "=").collect();
if progress_tokens.len() != 2 {

View File

@@ -14,7 +14,8 @@
use crate::{
ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState,
ReplicationStatusType, Result, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
ReplicationStatusType, Result, TIER_FV_ID, TIER_FV_MARKER, VersionPurgeStatusType, replication_statuses_map,
version_purge_statuses_map,
};
use byteorder::ByteOrder;
use bytes::Bytes;
@@ -909,6 +910,7 @@ impl FileMeta {
path: &str,
version_id: &str,
read_data: bool,
include_free_versions: bool,
all_parts: bool,
) -> Result<FileInfo> {
let vid = {
@@ -921,11 +923,35 @@ impl FileMeta {
let mut is_latest = true;
let mut succ_mod_time = None;
let mut non_free_versions = self.versions.len();
let mut found = false;
let mut found_free_version = None;
let mut found_fi = None;
for ver in self.versions.iter() {
let header = &ver.header;
// TODO: freeVersion
if header.free_version() {
non_free_versions -= 1;
if include_free_versions && found_free_version.is_none() {
let mut found_free_fi = FileMetaVersion::default();
if found_free_fi.unmarshal_msg(&ver.meta).is_ok() && found_free_fi.version_type != VersionType::Invalid {
let mut free_fi = found_free_fi.into_fileinfo(volume, path, all_parts);
free_fi.is_latest = true;
found_free_version = Some(free_fi);
}
}
if header.version_id != Some(vid) {
continue;
}
}
if found {
continue;
}
if !version_id.is_empty() && header.version_id != Some(vid) {
is_latest = false;
@@ -933,6 +959,8 @@ impl FileMeta {
continue;
}
found = true;
let mut fi = ver.into_fileinfo(volume, path, all_parts)?;
fi.is_latest = is_latest;
@@ -947,7 +975,25 @@ impl FileMeta {
.map(bytes::Bytes::from);
}
fi.num_versions = self.versions.len();
found_fi = Some(fi);
}
if !found {
if version_id.is_empty() {
if include_free_versions
&& non_free_versions == 0
&& let Some(free_version) = found_free_version
{
return Ok(free_version);
}
return Err(Error::FileNotFound);
} else {
return Err(Error::FileVersionNotFound);
}
}
if let Some(mut fi) = found_fi {
fi.num_versions = non_free_versions;
return Ok(fi);
}
@@ -1767,14 +1813,27 @@ impl MetaObject {
metadata.insert(k.to_owned(), v.to_owned());
}
let tier_fvidkey = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}").to_lowercase();
let tier_fvmarker_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}").to_lowercase();
for (k, v) in &self.meta_sys {
if k == AMZ_STORAGE_CLASS && v == b"STANDARD" {
let lower_k = k.to_lowercase();
if lower_k == tier_fvidkey || lower_k == tier_fvmarker_key {
continue;
}
if lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() {
continue;
}
if lower_k == AMZ_STORAGE_CLASS.to_lowercase() && v == b"STANDARD" {
continue;
}
if k.starts_with(RESERVED_METADATA_PREFIX)
|| k.starts_with(RESERVED_METADATA_PREFIX_LOWER)
|| k == VERSION_PURGE_STATUS_KEY
|| lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase()
{
metadata.insert(k.to_owned(), String::from_utf8(v.to_owned()).unwrap_or_default());
}
@@ -2511,15 +2570,31 @@ pub fn merge_file_meta_versions(
merged
}
pub async fn file_info_from_raw(ri: RawFileInfo, bucket: &str, object: &str, read_data: bool) -> Result<FileInfo> {
get_file_info(&ri.buf, bucket, object, "", FileInfoOpts { data: read_data }).await
pub fn file_info_from_raw(
ri: RawFileInfo,
bucket: &str,
object: &str,
read_data: bool,
include_free_versions: bool,
) -> Result<FileInfo> {
get_file_info(
&ri.buf,
bucket,
object,
"",
FileInfoOpts {
data: read_data,
include_free_versions,
},
)
}
pub struct FileInfoOpts {
pub data: bool,
pub include_free_versions: bool,
}
pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result<FileInfo> {
pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result<FileInfo> {
let vid = {
if version_id.is_empty() {
None
@@ -2541,7 +2616,7 @@ pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &st
});
}
let fi = meta.into_fileinfo(volume, path, version_id, opts.data, true)?;
let fi = meta.into_fileinfo(volume, path, version_id, opts.data, opts.include_free_versions, true)?;
Ok(fi)
}

View File

@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Error, FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, merge_file_meta_versions};
use crate::{
Error, FileInfo, FileInfoOpts, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, get_file_info,
merge_file_meta_versions,
};
use rmp::Marker;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
@@ -141,8 +144,7 @@ impl MetaCacheEntry {
});
}
if self.cached.is_some() {
let fm = self.cached.as_ref().unwrap();
if let Some(fm) = &self.cached {
if fm.versions.is_empty() {
return Ok(FileInfo {
volume: bucket.to_owned(),
@@ -154,14 +156,20 @@ impl MetaCacheEntry {
});
}
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false, true)?;
return Ok(fi);
}
let mut fm = FileMeta::new();
fm.unmarshal_msg(&self.metadata)?;
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
Ok(fi)
get_file_info(
&self.metadata,
bucket,
self.name.as_str(),
"",
FileInfoOpts {
data: false,
include_free_versions: false,
},
)
}
pub fn file_info_versions(&self, bucket: &str) -> Result<FileInfoVersions> {

View File

@@ -32,7 +32,7 @@ use rustfs_ecstore::{
store_api::{ObjectInfo, ObjectOptions},
};
use rustfs_policy::{auth::UserIdentity, policy::PolicyDoc};
use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf};
use rustfs_utils::path::{SLASH_SEPARATOR_STR, path_join_buf};
use serde::{Serialize, de::DeserializeOwned};
use std::sync::LazyLock;
use std::{collections::HashMap, sync::Arc};
@@ -182,7 +182,7 @@ impl ObjectStore {
} else {
info.name
};
let name = object_name.trim_start_matches(&prefix).trim_end_matches(SLASH_SEPARATOR);
let name = object_name.trim_start_matches(&prefix).trim_end_matches(SLASH_SEPARATOR_STR);
let _ = sender
.send(StringOrErr {
item: Some(name.to_owned()),

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use crate::notification_system_subscriber::NotificationSystemSubscriberView;
use crate::notifier::TargetList;
use crate::{
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
};
@@ -191,6 +192,22 @@ impl NotificationSystem {
self.notifier.target_list().read().await.keys()
}
/// Gets the complete Target list, including both active and inactive Targets.
///
/// # Return
/// An `Arc<RwLock<TargetList>>` containing all Targets.
pub async fn get_all_targets(&self) -> Arc<RwLock<TargetList>> {
self.notifier.target_list()
}
/// Gets all Target values, including both active and inactive Targets.
///
/// # Return
/// A Vec containing all Targets.
pub async fn get_target_values(&self) -> Vec<Arc<dyn Target<Event> + Send + Sync>> {
self.notifier.target_list().read().await.values()
}
/// Checks if there are active subscribers for the given bucket and event name.
pub async fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool {
if !self.subscriber_view.has_subscriber(bucket, event) {

View File

@@ -370,6 +370,11 @@ impl TargetList {
self.targets.keys().cloned().collect()
}
/// Returns all targets in the list
pub fn values(&self) -> Vec<Arc<dyn Target<Event> + Send + Sync>> {
self.targets.values().cloned().collect()
}
/// Returns the number of targets
pub fn len(&self) -> usize {
self.targets.len()

View File

@@ -22,8 +22,8 @@ use strum::{EnumString, IntoStaticStr};
use super::{Error as IamError, Validator, utils::wildcard};
/// A set of policy actions that serializes as a single string when containing one item,
/// or as an array when containing multiple items (matching AWS S3 API format).
/// A set of policy actions that always serializes as an array of strings,
/// conforming to the S3 policy specification for consistency and compatibility.
#[derive(Clone, Default, Debug)]
pub struct ActionSet(pub HashSet<Action>);
@@ -34,15 +34,8 @@ impl Serialize for ActionSet {
{
use serde::ser::SerializeSeq;
if self.0.len() == 1 {
// Serialize single action as string (not array)
if let Some(action) = self.0.iter().next() {
let action_str: &str = action.into();
return serializer.serialize_str(action_str);
}
}
// Serialize multiple actions as array
// Always serialize as array, even for single action, to match S3 specification
// and ensure compatibility with AWS SDK clients that expect array format
let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
for action in &self.0 {
let action_str: &str = action.into();
@@ -610,13 +603,17 @@ mod tests {
#[test]
fn test_actionset_serialize_single_element() {
// Single element should serialize as string
// Single element should serialize as array for S3 specification compliance
let mut set = HashSet::new();
set.insert(Action::S3Action(S3Action::GetObjectAction));
let actionset = ActionSet(set);
let json = serde_json::to_string(&actionset).expect("Should serialize");
assert_eq!(json, "\"s3:GetObject\"");
let parsed: serde_json::Value = serde_json::from_str(&json).expect("Should parse");
assert!(parsed.is_array(), "Should serialize as array");
let arr = parsed.as_array().expect("Should be array");
assert_eq!(arr.len(), 1);
assert_eq!(arr[0].as_str().unwrap(), "s3:GetObject");
}
#[test]
@@ -636,12 +633,16 @@ mod tests {
#[test]
fn test_actionset_wildcard_serialization() {
// Wildcard action should serialize correctly
// Wildcard action should serialize as array for S3 specification compliance
let mut set = HashSet::new();
set.insert(Action::try_from("*").expect("Should parse wildcard"));
let actionset = ActionSet(set);
let json = serde_json::to_string(&actionset).expect("Should serialize");
assert_eq!(json, "\"s3:*\"");
let parsed: serde_json::Value = serde_json::from_str(&json).expect("Should parse");
assert!(parsed.is_array(), "Should serialize as array");
let arr = parsed.as_array().expect("Should be array");
assert_eq!(arr.len(), 1);
assert_eq!(arr[0].as_str().unwrap(), "s3:*");
}
}

View File

@@ -63,16 +63,23 @@ pub struct Policy {
impl Policy {
pub async fn is_allowed(&self, args: &Args<'_>) -> bool {
// First, check all Deny statements - if any Deny matches, deny the request
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Deny)) {
if !statement.is_allowed(args).await {
return false;
}
}
if args.deny_only || args.is_owner {
// Owner has all permissions
if args.is_owner {
return true;
}
if args.deny_only {
return false;
}
// Check Allow statements
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Allow)) {
if statement.is_allowed(args).await {
return true;
@@ -594,6 +601,102 @@ mod test {
Ok(())
}
#[tokio::test]
async fn test_deny_only_security_fix() -> Result<()> {
let data = r#"
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": ["arn:aws:s3:::bucket1/*"]
}
]
}
"#;
let policy = Policy::parse_config(data.as_bytes())?;
let conditions = HashMap::new();
let claims = HashMap::new();
// Test with deny_only=true but no matching Allow statement
let args_deny_only = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::PutObjectAction),
bucket: "bucket2",
conditions: &conditions,
is_owner: false,
object: "test.txt",
claims: &claims,
deny_only: true, // Should NOT automatically allow
};
// Should return false because deny_only=true, regardless of whether there's a matching Allow statement
assert!(
!policy.is_allowed(&args_deny_only).await,
"deny_only should return false when deny_only=true, regardless of Allow statements"
);
// Test with deny_only=true and matching Allow statement
let args_deny_only_allowed = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::GetObjectAction),
bucket: "bucket1",
conditions: &conditions,
is_owner: false,
object: "test.txt",
claims: &claims,
deny_only: true,
};
// Should return false because deny_only=true prevents checking Allow statements (unless is_owner=true)
assert!(
!policy.is_allowed(&args_deny_only_allowed).await,
"deny_only should return false even with matching Allow statement"
);
// Test with deny_only=false (normal case)
let args_normal = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::GetObjectAction),
bucket: "bucket1",
conditions: &conditions,
is_owner: false,
object: "test.txt",
claims: &claims,
deny_only: false,
};
// Should return true because there's an Allow statement
assert!(
policy.is_allowed(&args_normal).await,
"normal policy evaluation should allow with matching Allow statement"
);
let args_owner_deny_only = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::PutObjectAction),
bucket: "bucket2",
conditions: &conditions,
is_owner: true, // Owner has all permissions
object: "test.txt",
claims: &claims,
deny_only: true, // Even with deny_only=true, owner should be allowed
};
assert!(
policy.is_allowed(&args_owner_deny_only).await,
"owner should retain all permissions even when deny_only=true"
);
Ok(())
}
#[tokio::test]
async fn test_aws_username_policy_variable() -> Result<()> {
let data = r#"
@@ -1016,7 +1119,7 @@ mod test {
}
#[test]
fn test_bucket_policy_serialize_single_action_as_string() {
fn test_bucket_policy_serialize_single_action_as_array() {
use crate::policy::action::{Action, ActionSet, S3Action};
use crate::policy::resource::{Resource, ResourceSet};
use crate::policy::{Effect, Principal};
@@ -1050,8 +1153,10 @@ mod test {
let parsed: serde_json::Value = serde_json::from_str(&json).expect("Should parse");
let action = &parsed["Statement"][0]["Action"];
// Single action should be serialized as string
assert!(action.is_string(), "Single action should serialize as string");
assert_eq!(action.as_str().unwrap(), "s3:ListBucket");
// Single action should be serialized as array for S3 specification compliance
assert!(action.is_array(), "Single action should serialize as array");
let arr = action.as_array().expect("Should be array");
assert_eq!(arr.len(), 1);
assert_eq!(arr[0].as_str().unwrap(), "s3:ListBucket");
}
}

View File

@@ -169,8 +169,9 @@ impl HashReader {
sha256hex: Option<String>,
diskable_md5: bool,
) -> std::io::Result<Self> {
// Check if it's already a HashReader and update its parameters
if let Some(existing_hash_reader) = inner.as_hash_reader_mut() {
if size >= 0
&& let Some(existing_hash_reader) = inner.as_hash_reader_mut()
{
if existing_hash_reader.bytes_read() > 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
@@ -212,7 +213,8 @@ impl HashReader {
let content_sha256 = existing_hash_reader.content_sha256().clone();
let content_sha256_hasher = existing_hash_reader.content_sha256().clone().map(|_| Sha256Hasher::new());
let inner = existing_hash_reader.take_inner();
return Ok(Self {
Ok(Self {
inner,
size,
checksum: md5hex.clone(),
@@ -225,34 +227,36 @@ impl HashReader {
content_hasher,
checksum_on_finish: false,
trailer_s3s: existing_hash_reader.get_trailer().cloned(),
});
}
})
} else {
if size > 0 {
let hr = HardLimitReader::new(inner, size);
inner = Box::new(hr);
if size > 0 {
let hr = HardLimitReader::new(inner, size);
inner = Box::new(hr);
if !diskable_md5 && !inner.is_hash_reader() {
if !diskable_md5 && !inner.is_hash_reader() {
let er = EtagReader::new(inner, md5hex.clone());
inner = Box::new(er);
}
} else if !diskable_md5 {
let er = EtagReader::new(inner, md5hex.clone());
inner = Box::new(er);
}
} else if !diskable_md5 {
let er = EtagReader::new(inner, md5hex.clone());
inner = Box::new(er);
Ok(Self {
inner,
size,
checksum: md5hex,
actual_size,
diskable_md5,
bytes_read: 0,
content_hash: None,
content_hasher: None,
content_sha256: sha256hex.clone(),
content_sha256_hasher: sha256hex.map(|_| Sha256Hasher::new()),
checksum_on_finish: false,
trailer_s3s: None,
})
}
Ok(Self {
inner,
size,
checksum: md5hex,
actual_size,
diskable_md5,
bytes_read: 0,
content_hash: None,
content_hasher: None,
content_sha256: sha256hex.clone(),
content_sha256_hasher: sha256hex.clone().map(|_| Sha256Hasher::new()),
checksum_on_finish: false,
trailer_s3s: None,
})
}
pub fn into_inner(self) -> Box<dyn Reader> {

View File

@@ -37,11 +37,6 @@ impl TargetID {
Self { id, name }
}
/// Convert to string representation
pub fn to_id_string(&self) -> String {
format!("{}:{}", self.id, self.name)
}
/// Create an ARN
pub fn to_arn(&self, region: &str) -> ARN {
ARN {
@@ -80,7 +75,7 @@ impl Serialize for TargetID {
where
S: Serializer,
{
serializer.serialize_str(&self.to_id_string())
serializer.serialize_str(&self.to_string())
}
}
@@ -130,10 +125,12 @@ impl ARN {
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
return String::new();
}
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string())
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id)
}
/// Parsing ARN from string
/// Only accepts ARNs with the RustFS prefix: "arn:rustfs:sqs:"
/// Format: arn:rustfs:sqs:{region}:{id}:{name}
pub fn parse(s: &str) -> Result<Self, TargetError> {
if !s.starts_with(ARN_PREFIX) {
return Err(TargetError::InvalidARN(s.to_string()));

View File

@@ -72,7 +72,7 @@ rand = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, optional = true, features = ["std", "fileapi", "minwindef", "ntdef", "winnt"] }
windows = { workspace = true, optional = true, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
[lints]
workspace = true
@@ -83,13 +83,13 @@ ip = ["dep:local-ip-address"] # ip characteristics and their dependencies
tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies
net = ["ip", "dep:url", "dep:netif", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:thiserror", "dep:tokio"] # network features with DNS resolver
io = ["dep:tokio"]
path = []
path = [] # path manipulation features
notify = ["dep:hyper", "dep:s3s", "dep:hashbrown", "dep:thiserror", "dep:serde", "dep:libc", "dep:url", "dep:regex"] # file system notification features
compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"]
string = ["dep:regex"]
crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper", "dep:sha1"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher", "dep:hex-simd", "dep:crc-fast"]
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
os = ["dep:nix", "dep:tempfile", "dep:windows"] # operating system utilities
integration = [] # integration test features
sys = ["dep:sysinfo"] # system information features
http = ["dep:convert_case", "dep:http", "dep:regex"]

View File

@@ -51,6 +51,7 @@ pub const AMZ_TAG_COUNT: &str = "x-amz-tagging-count";
pub const AMZ_TAG_DIRECTIVE: &str = "X-Amz-Tagging-Directive";
// S3 transition restore
pub const AMZ_RESTORE: &str = "x-amz-restore";
pub const AMZ_RESTORE_EXPIRY_DAYS: &str = "X-Amz-Restore-Expiry-Days";
pub const AMZ_RESTORE_REQUEST_DATE: &str = "X-Amz-Restore-Request-Date";

View File

@@ -14,8 +14,6 @@
use bytes::Bytes;
use futures::{Stream, StreamExt, pin_mut};
#[cfg(test)]
use std::sync::MutexGuard;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
@@ -83,7 +81,7 @@ fn reset_dns_resolver_inner() {
#[cfg(test)]
pub struct MockResolverGuard {
_lock: MutexGuard<'static, ()>,
_lock: std::sync::MutexGuard<'static, ()>,
}
#[cfg(test)]

View File

@@ -16,6 +16,7 @@
mod linux;
#[cfg(all(unix, not(target_os = "linux")))]
mod unix;
#[cfg(target_os = "windows")]
mod windows;

View File

@@ -13,56 +13,19 @@
// limitations under the License.
use super::{DiskInfo, IOStats};
use nix::sys::statfs::Statfs;
use nix::sys::{stat::stat, statfs::statfs};
use nix::sys::{stat::stat, statvfs::statvfs};
use std::io::Error;
use std::path::Path;
// FreeBSD and OpenBSD return a signed integer for blocks_available.
// Cast to an unsigned integer to use with DiskInfo.
#[cfg(any(target_os = "freebsd", target_os = "openbsd"))]
fn blocks_available(stat: &Statfs) -> u64 {
match stat.blocks_available().try_into() {
Ok(bavail) => bavail,
Err(e) => {
tracing::warn!("blocks_available returned a negative value: Using 0 as fallback. {}", e);
0
}
}
}
// FreeBSD returns a signed integer for files_free. Cast to an unsigned integer
// to use with DiskInfo
#[cfg(target_os = "freebsd")]
fn files_free(stat: &Statfs) -> u64 {
match stat.files_free().try_into() {
Ok(files_free) => files_free,
Err(e) => {
tracing::warn!("files_free returned a negative value: Using 0 as fallback. {}", e);
0
}
}
}
#[cfg(not(target_os = "freebsd"))]
fn files_free(stat: &Statfs) -> u64 {
stat.files_free()
}
#[cfg(not(any(target_os = "freebsd", target_os = "openbsd")))]
fn blocks_available(stat: &Statfs) -> u64 {
stat.blocks_available()
}
/// Returns total and free bytes available in a directory, e.g. `/`.
pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
let path_display = p.as_ref().display();
let stat = statfs(p.as_ref())?;
let stat = statvfs(p.as_ref())?;
let bsize = stat.block_size() as u64;
let bfree = stat.blocks_free();
let bavail = blocks_available(&stat);
let blocks = stat.blocks();
let bsize = stat.block_size();
let bfree = stat.blocks_free() as u64;
let bavail = stat.blocks_available() as u64;
let blocks = stat.blocks() as u64;
let reserved = match bfree.checked_sub(bavail) {
Some(reserved) => reserved,
@@ -96,9 +59,9 @@ pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
total,
free,
used,
files: stat.files(),
ffree: files_free(&stat),
fstype: stat.filesystem_type_name().to_string(),
files: stat.files() as u64,
ffree: stat.files_free() as u64,
// Statvfs does not provide a way to return the filesystem as name.
..Default::default()
})
}

View File

@@ -1,4 +1,3 @@
#![allow(unsafe_code)] // TODO: audit unsafe code
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,149 +12,232 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{DiskInfo, IOStats};
#![allow(unsafe_code)] // TODO: audit unsafe code
use crate::os::{DiskInfo, IOStats};
use std::io::Error;
use std::mem;
use std::os::windows::ffi::OsStrExt;
use std::path::Path;
use winapi::shared::minwindef::{DWORD, MAX_PATH};
use winapi::shared::ntdef::ULARGE_INTEGER;
use winapi::um::fileapi::{GetDiskFreeSpaceExW, GetDiskFreeSpaceW, GetVolumeInformationW, GetVolumePathNameW};
use winapi::um::winnt::{LPCWSTR, WCHAR};
use windows::Win32::Foundation::MAX_PATH;
use windows::Win32::Storage::FileSystem::{GetDiskFreeSpaceExW, GetDiskFreeSpaceW, GetVolumeInformationW, GetVolumePathNameW};
/// Returns total and free bytes available in a directory, e.g. `C:\`.
pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
let path_display = p.as_ref().display();
let path_wide: Vec<WCHAR> = p
let path_wide = p
.as_ref()
.to_path_buf()
.into_os_string()
.encode_wide()
.chain(std::iter::once(0)) // Null-terminate the string
.collect();
.to_string_lossy()
.encode_utf16()
.chain(std::iter::once(0))
.collect::<Vec<u16>>();
let mut lp_free_bytes_available: ULARGE_INTEGER = unsafe { mem::zeroed() };
let mut lp_total_number_of_bytes: ULARGE_INTEGER = unsafe { mem::zeroed() };
let mut lp_total_number_of_free_bytes: ULARGE_INTEGER = unsafe { mem::zeroed() };
let mut free_bytes_available = 0u64;
let mut total_number_of_bytes = 0u64;
let mut total_number_of_free_bytes = 0u64;
let success = unsafe {
unsafe {
GetDiskFreeSpaceExW(
path_wide.as_ptr(),
&mut lp_free_bytes_available,
&mut lp_total_number_of_bytes,
&mut lp_total_number_of_free_bytes,
windows::core::PCWSTR::from_raw(path_wide.as_ptr()),
Some(&mut free_bytes_available),
Some(&mut total_number_of_bytes),
Some(&mut total_number_of_free_bytes),
)
};
if success == 0 {
return Err(Error::last_os_error());
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
}
let total = unsafe { *lp_total_number_of_bytes.QuadPart() };
let free = unsafe { *lp_total_number_of_free_bytes.QuadPart() };
let total = total_number_of_bytes;
let free = total_number_of_free_bytes;
if free > total {
return Err(Error::other(format!(
"detected free space ({free}) > total drive space ({total}), fs corruption at ({path_display}). please run 'fsck'"
"detected free space ({free}) > total drive space ({total}), fs corruption at ({}). please run 'fsck'",
p.as_ref().display()
)));
}
let mut lp_sectors_per_cluster: DWORD = 0;
let mut lp_bytes_per_sector: DWORD = 0;
let mut lp_number_of_free_clusters: DWORD = 0;
let mut lp_total_number_of_clusters: DWORD = 0;
let mut sectors_per_cluster = 0u32;
let mut bytes_per_sector = 0u32;
let mut number_of_free_clusters = 0u32;
let mut total_number_of_clusters = 0u32;
let success = unsafe {
unsafe {
GetDiskFreeSpaceW(
path_wide.as_ptr(),
&mut lp_sectors_per_cluster,
&mut lp_bytes_per_sector,
&mut lp_number_of_free_clusters,
&mut lp_total_number_of_clusters,
windows::core::PCWSTR::from_raw(path_wide.as_ptr()),
Some(&mut sectors_per_cluster),
Some(&mut bytes_per_sector),
Some(&mut number_of_free_clusters),
Some(&mut total_number_of_clusters),
)
};
if success == 0 {
return Err(Error::last_os_error());
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
}
Ok(DiskInfo {
total,
free,
used: total - free,
files: lp_total_number_of_clusters as u64,
ffree: lp_number_of_free_clusters as u64,
// TODO This field is currently unused, and since this logic causes a
// NotFound error during startup on Windows systems, it has been commented out here
//
// The error occurs in GetVolumeInformationW where the path parameter
// is of type [WCHAR; MAX_PATH]. For a drive letter, there are excessive
// trailing zeros, which causes the failure here.
//
// fstype: get_fs_type(&path_wide)?,
files: total_number_of_clusters as u64,
ffree: number_of_free_clusters as u64,
fstype: get_fs_type(&path_wide).unwrap_or_default(),
..Default::default()
})
}
/// Returns leading volume name.
///
/// # Arguments
/// * `v` - A slice of u16 representing the path in UTF-16 encoding
///
/// # Returns
/// * `Ok(Vec<u16>)` containing the volume name in UTF-16 encoding.
/// * `Err` if an error occurs during the operation.
#[allow(dead_code)]
fn get_volume_name(v: &[WCHAR]) -> std::io::Result<LPCWSTR> {
let volume_name_size: DWORD = MAX_PATH as _;
let mut lp_volume_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
fn get_volume_name(v: &[u16]) -> std::io::Result<Vec<u16>> {
let mut volume_name_buffer = [0u16; MAX_PATH as usize];
let success = unsafe { GetVolumePathNameW(v.as_ptr(), lp_volume_name_buffer.as_mut_ptr(), volume_name_size) };
if success == 0 {
return Err(Error::last_os_error());
unsafe {
GetVolumePathNameW(windows::core::PCWSTR::from_raw(v.as_ptr()), &mut volume_name_buffer)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
}
Ok(lp_volume_name_buffer.as_ptr())
let len = volume_name_buffer
.iter()
.position(|&x| x == 0)
.unwrap_or(volume_name_buffer.len());
Ok(volume_name_buffer[..len].to_vec())
}
#[allow(dead_code)]
fn utf16_to_string(v: &[WCHAR]) -> String {
fn utf16_to_string(v: &[u16]) -> String {
let len = v.iter().position(|&x| x == 0).unwrap_or(v.len());
String::from_utf16_lossy(&v[..len])
}
/// Returns the filesystem type of the underlying mounted filesystem
///
/// # Arguments
/// * `p` - A slice of u16 representing the path in UTF-16 encoding
///
/// # Returns
/// * `Ok(String)` containing the filesystem type (e.g., "NTFS", "FAT32").
/// * `Err` if an error occurs during the operation.
#[allow(dead_code)]
fn get_fs_type(p: &[WCHAR]) -> std::io::Result<String> {
fn get_fs_type(p: &[u16]) -> std::io::Result<String> {
let path = get_volume_name(p)?;
let volume_name_size: DWORD = MAX_PATH as _;
let n_file_system_name_size: DWORD = MAX_PATH as _;
let mut volume_serial_number = 0u32;
let mut maximum_component_length = 0u32;
let mut file_system_flags = 0u32;
let mut volume_name_buffer = [0u16; MAX_PATH as usize];
let mut file_system_name_buffer = [0u16; MAX_PATH as usize];
let mut lp_volume_serial_number: DWORD = 0;
let mut lp_maximum_component_length: DWORD = 0;
let mut lp_file_system_flags: DWORD = 0;
let mut lp_volume_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
let mut lp_file_system_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
let success = unsafe {
unsafe {
GetVolumeInformationW(
path,
lp_volume_name_buffer.as_mut_ptr(),
volume_name_size,
&mut lp_volume_serial_number,
&mut lp_maximum_component_length,
&mut lp_file_system_flags,
lp_file_system_name_buffer.as_mut_ptr(),
n_file_system_name_size,
windows::core::PCWSTR::from_raw(path.as_ptr()),
Some(&mut volume_name_buffer),
Some(&mut volume_serial_number),
Some(&mut maximum_component_length),
Some(&mut file_system_flags),
Some(&mut file_system_name_buffer),
)
};
if success == 0 {
return Err(Error::last_os_error());
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
}
Ok(utf16_to_string(&lp_file_system_name_buffer))
Ok(utf16_to_string(&file_system_name_buffer))
}
pub fn same_disk(_disk1: &str, _disk2: &str) -> std::io::Result<bool> {
Ok(false)
/// Determines if two paths are on the same disk.
///
/// # Arguments
/// * `disk1` - The first disk path as a string slice.
/// * `disk2` - The second disk path as a string slice.
///
/// # Returns
/// * `Ok(true)` if both paths are on the same disk.
/// * `Ok(false)` if both paths are on different disks.
/// * `Err` if an error occurs during the operation.
pub fn same_disk(disk1: &str, disk2: &str) -> std::io::Result<bool> {
let path1_wide: Vec<u16> = disk1.encode_utf16().chain(std::iter::once(0)).collect();
let path2_wide: Vec<u16> = disk2.encode_utf16().chain(std::iter::once(0)).collect();
let volume1 = get_volume_name(&path1_wide)?;
let volume2 = get_volume_name(&path2_wide)?;
Ok(volume1 == volume2)
}
/// Retrieves I/O statistics for a drive identified by its major and minor numbers.
///
/// # Arguments
/// * `major` - The major number of the drive.
/// * `minor` - The minor number of the drive.
///
/// # Returns
/// * `Ok(IOStats)` containing the I/O statistics.
/// * `Err` if an error occurs during the operation.
pub fn get_drive_stats(_major: u32, _minor: u32) -> std::io::Result<IOStats> {
// Windows does not provide direct IO stats via simple API; this is a stub
// For full implementation, consider using PDH or WMI, but that adds complexity
Ok(IOStats::default())
}
#[cfg(test)]
mod tests {
use crate::os::{get_info, same_disk};
#[cfg(target_os = "windows")]
#[test]
fn test_get_info_valid_path() {
let temp_dir = tempfile::tempdir().unwrap();
let info = get_info(temp_dir.path()).unwrap();
// Verify disk info is valid
assert!(info.total > 0);
assert!(info.free > 0);
assert!(info.used > 0);
assert!(info.files > 0);
assert!(info.ffree > 0);
assert!(!info.fstype.is_empty());
}
#[cfg(target_os = "windows")]
#[test]
fn test_get_info_invalid_path() {
use std::path::PathBuf;
let invalid_path = PathBuf::from("Z:\\invalid\\path");
let result = get_info(&invalid_path);
assert!(result.is_err());
}
#[cfg(target_os = "windows")]
#[test]
fn test_same_disk_same_path() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().to_str().unwrap();
let result = same_disk(path, path).unwrap();
assert!(result);
}
#[cfg(target_os = "windows")]
#[test]
fn test_same_disk_different_paths() {
let temp_dir1 = tempfile::tempdir().unwrap();
let temp_dir2 = tempfile::tempdir().unwrap();
let path1 = temp_dir1.path().to_str().unwrap();
let path2 = temp_dir2.path().to_str().unwrap();
let _result = same_disk(path1, path2).unwrap();
// Since both temporary directories are created in the same file system,
// they should be on the same disk in most cases
// Test passes if the function doesn't panic - the actual result depends on test environment
}
#[cfg(target_os = "windows")]
#[test]
fn get_info_with_root_drive() {
let info = get_info("C:\\").unwrap();
assert!(info.total > 0);
assert!(info.free > 0);
assert!(info.used > 0);
assert!(info.files > 0);
assert!(info.ffree > 0);
assert!(!info.fstype.is_empty());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -13,9 +13,11 @@
// limitations under the License.
use rustfs_config::VERSION;
use std::borrow::Cow;
use std::env;
use std::fmt;
#[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
use std::sync::OnceLock;
#[cfg(not(target_os = "openbsd"))]
use sysinfo::System;
/// Business Type Enumeration
@@ -25,7 +27,7 @@ pub enum ServiceType {
Core,
Event,
Logger,
Custom(String),
Custom(Cow<'static, str>),
}
impl ServiceType {
@@ -35,71 +37,65 @@ impl ServiceType {
ServiceType::Core => "core",
ServiceType::Event => "event",
ServiceType::Logger => "logger",
ServiceType::Custom(s) => s.as_str(),
ServiceType::Custom(s) => s,
}
}
}
/// UserAgent structure to hold User-Agent information
/// including OS platform, architecture, version, and service type.
/// It provides methods to generate a formatted User-Agent string.
/// # Examples
/// ```
/// use rustfs_utils::{get_user_agent, ServiceType};
///
/// let ua = get_user_agent(ServiceType::Core);
/// println!("User-Agent: {}", ua);
/// ```
#[derive(Debug)]
struct UserAgent {
os_platform: String,
arch: String,
version: String,
os_platform: &'static str,
arch: &'static str,
version: &'static str,
service: ServiceType,
}
static OS_PLATFORM: OnceLock<String> = OnceLock::new();
impl UserAgent {
/// Create a new UserAgent instance and accept business type parameters
///
/// # Arguments
/// * `service` - The type of service for which the User-Agent is being created.
/// # Returns
/// A new instance of `UserAgent` with the current OS platform, architecture, version, and service type.
fn new(service: ServiceType) -> Self {
let os_platform = Self::get_os_platform();
let arch = env::consts::ARCH.to_string();
let version = VERSION.to_string();
UserAgent {
os_platform,
arch,
version,
os_platform: Self::get_os_platform(),
arch: env::consts::ARCH,
version: VERSION,
service,
}
}
/// Obtain operating system platform information
fn get_os_platform() -> String {
if cfg!(target_os = "windows") {
Self::get_windows_platform()
} else if cfg!(target_os = "macos") {
Self::get_macos_platform()
} else if cfg!(target_os = "linux") {
Self::get_linux_platform()
} else {
"Unknown".to_string()
}
/// Obtain operating system platform information using a thread-safe cache.
///
/// The value is computed once on first use via `OnceLock` and then reused
/// for all subsequent calls for the lifetime of the program.
fn get_os_platform() -> &'static str {
OS_PLATFORM.get_or_init(|| {
if cfg!(target_os = "windows") {
Self::get_windows_platform()
} else if cfg!(target_os = "macos") {
Self::get_macos_platform()
} else if cfg!(target_os = "linux") {
Self::get_linux_platform()
} else if cfg!(target_os = "freebsd") {
Self::get_freebsd_platform()
} else if cfg!(target_os = "netbsd") {
Self::get_netbsd_platform()
} else {
"Unknown".to_string()
}
})
}
/// Get Windows platform information
#[cfg(windows)]
fn get_windows_platform() -> String {
// Priority to using sysinfo to get versions
let version = match System::os_version() {
Some(version) => version,
None => "Windows NT Unknown".to_string(),
};
format!("Windows NT {version}")
let version = System::os_version().unwrap_or_else(|| "NT Unknown".to_string());
if version.starts_with("Windows") {
version
} else {
format!("Windows NT {version}")
}
}
#[cfg(not(windows))]
@@ -110,16 +106,14 @@ impl UserAgent {
/// Get macOS platform information
#[cfg(target_os = "macos")]
fn get_macos_platform() -> String {
let binding = System::os_version().unwrap_or("14.5.0".to_string());
let version = binding.split('.').collect::<Vec<&str>>();
let major = version.first().unwrap_or(&"14").to_string();
let minor = version.get(1).unwrap_or(&"5").to_string();
let patch = version.get(2).unwrap_or(&"0").to_string();
let version_str = System::os_version().unwrap_or_else(|| "14.0.0".to_string());
let mut parts = version_str.split('.');
let major = parts.next().unwrap_or("14");
let minor = parts.next().unwrap_or("0");
let patch = parts.next().unwrap_or("0");
let arch = env::consts::ARCH;
let cpu_info = if arch == "aarch64" { "Apple" } else { "Intel" };
let cpu_info = if env::consts::ARCH == "aarch64" { "Apple" } else { "Intel" };
// Convert to User-Agent format
format!("Macintosh; {cpu_info} Mac OS X {major}_{minor}_{patch}")
}
@@ -131,40 +125,47 @@ impl UserAgent {
/// Get Linux platform information
#[cfg(target_os = "linux")]
fn get_linux_platform() -> String {
format!("X11; {}", System::long_os_version().unwrap_or("Linux Unknown".to_string()))
let os_name = System::long_os_version().unwrap_or_else(|| "Linux Unknown".to_string());
format!("X11; {os_name}")
}
#[cfg(not(target_os = "linux"))]
fn get_linux_platform() -> String {
"N/A".to_string()
}
#[cfg(target_os = "freebsd")]
fn get_freebsd_platform() -> String {
format!("FreeBSD; {}", env::consts::ARCH)
}
#[cfg(not(target_os = "freebsd"))]
fn get_freebsd_platform() -> String {
"N/A".to_string()
}
#[cfg(target_os = "netbsd")]
fn get_netbsd_platform() -> String {
format!("NetBSD; {}", env::consts::ARCH)
}
#[cfg(not(target_os = "netbsd"))]
fn get_netbsd_platform() -> String {
"N/A".to_string()
}
}
/// Implement Display trait to format User-Agent
impl fmt::Display for UserAgent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.service == ServiceType::Basis {
return write!(f, "Mozilla/5.0 ({}; {}) RustFS/{}", self.os_platform, self.arch, self.version);
write!(f, "Mozilla/5.0 ({}; {}) RustFS/{}", self.os_platform, self.arch, self.version)?;
if self.service != ServiceType::Basis {
write!(f, " ({})", self.service.as_str())?;
}
write!(
f,
"Mozilla/5.0 ({}; {}) RustFS/{} ({})",
self.os_platform,
self.arch,
self.version,
self.service.as_str()
)
Ok(())
}
}
/// Get the User-Agent string and accept business type parameters
///
/// # Arguments
/// * `service` - The type of service for which the User-Agent is being created.
///
/// # Returns
/// A formatted User-Agent string.
///
pub fn get_user_agent(service: ServiceType) -> String {
UserAgent::new(service).to_string()
}
@@ -173,58 +174,33 @@ pub fn get_user_agent(service: ServiceType) -> String {
mod tests {
use super::*;
use rustfs_config::VERSION;
use tracing::debug;
#[test]
fn test_user_agent_format_basis() {
let ua = get_user_agent(ServiceType::Basis);
assert!(ua.starts_with("Mozilla/5.0"));
assert!(ua.contains(&format!("RustFS/{VERSION}").to_string()));
debug!("Basic User-Agent: {}", ua);
assert!(ua.contains(&format!("RustFS/{VERSION}")));
assert!(!ua.contains("(basis)"));
}
#[test]
fn test_user_agent_format_core() {
let ua = get_user_agent(ServiceType::Core);
assert!(ua.starts_with("Mozilla/5.0"));
assert!(ua.contains(&format!("RustFS/{VERSION} (core)").to_string()));
debug!("Core User-Agent: {}", ua);
}
#[test]
fn test_user_agent_format_event() {
let ua = get_user_agent(ServiceType::Event);
assert!(ua.starts_with("Mozilla/5.0"));
assert!(ua.contains(&format!("RustFS/{VERSION} (event)").to_string()));
debug!("Event User-Agent: {}", ua);
}
#[test]
fn test_user_agent_format_logger() {
let ua = get_user_agent(ServiceType::Logger);
assert!(ua.starts_with("Mozilla/5.0"));
assert!(ua.contains(&format!("RustFS/{VERSION} (logger)").to_string()));
debug!("Logger User-Agent: {}", ua);
assert!(ua.contains(&format!("RustFS/{VERSION} (core)")));
}
#[test]
fn test_user_agent_format_custom() {
let ua = get_user_agent(ServiceType::Custom("monitor".to_string()));
assert!(ua.starts_with("Mozilla/5.0"));
assert!(ua.contains(&format!("RustFS/{VERSION} (monitor)").to_string()));
debug!("Monitor User-Agent: {}", ua);
let ua = get_user_agent(ServiceType::Custom("monitor".into()));
assert!(ua.contains(&format!("RustFS/{VERSION} (monitor)")));
}
#[test]
fn test_all_service_type() {
// Example: Generate User-Agents of Different Business Types
let ua_core = get_user_agent(ServiceType::Core);
let ua_event = get_user_agent(ServiceType::Event);
let ua_logger = get_user_agent(ServiceType::Logger);
let ua_custom = get_user_agent(ServiceType::Custom("monitor".to_string()));
debug!("Core User-Agent: {}", ua_core);
debug!("Event User-Agent: {}", ua_event);
debug!("Logger User-Agent: {}", ua_logger);
debug!("Custom User-Agent: {}", ua_custom);
fn test_os_platform_caching() {
let ua1 = UserAgent::new(ServiceType::Basis);
let ua2 = UserAgent::new(ServiceType::Basis);
assert_eq!(ua1.os_platform, ua2.os_platform);
// Ensure they point to the same static memory
assert!(std::ptr::eq(ua1.os_platform.as_ptr(), ua2.os_platform.as_ptr()));
}
}

Some files were not shown because too many files have changed in this diff Show More