Compare commits

..

1 Commits

Author SHA1 Message Date
overtrue
4147d5ad8c fix: handle copy_source_if_match in copy_object for S3 compatibility
- Update s3s to PR #449 (ac13a56) which accepts unquoted ETag values
- Add copy_source_if_match and copy_source_if_none_match handling in copy_object
- Return PreconditionFailed when ETag conditions are not met

Fixes #1400
2026-01-06 19:00:12 +08:00
38 changed files with 300 additions and 822 deletions

View File

@@ -44,6 +44,7 @@ jobs:
set -x
old_version=$(grep "^appVersion:" helm/rustfs/Chart.yaml | awk '{print $2}')
sed -i "s/$old_version/$new_version/g" helm/rustfs/Chart.yaml
sed -i "/^image:/,/^[^ ]/ s/tag:.*/tag: "$new_version"/" helm/rustfs/values.yaml
- name: Set up Helm
uses: azure/setup-helm@v4.3.0

34
Cargo.lock generated
View File

@@ -3524,7 +3524,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -4620,7 +4620,7 @@ dependencies = [
"js-sys",
"log",
"wasm-bindgen",
"windows-core 0.62.2",
"windows-core 0.61.2",
]
[[package]]
@@ -4912,7 +4912,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -7078,7 +7078,7 @@ dependencies = [
"once_cell",
"socket2",
"tracing",
"windows-sys 0.60.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -7974,7 +7974,6 @@ dependencies = [
"bytesize",
"chrono",
"criterion",
"dunce",
"enumset",
"faster-hex",
"flatbuffers",
@@ -8411,7 +8410,7 @@ dependencies = [
"tracing",
"transform-stream",
"url",
"windows 0.62.2",
"winapi",
"zstd",
]
@@ -8486,7 +8485,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -8499,14 +8498,14 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.11.0",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.23.36"
version = "0.23.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b"
checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
dependencies = [
"aws-lc-rs",
"log",
@@ -8587,7 +8586,7 @@ checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984"
[[package]]
name = "s3s"
version = "0.13.0-alpha"
source = "git+https://github.com/s3s-project/s3s.git?branch=main#18c168ae21bf1176555f8f529686ecdc2ebd6db7"
source = "git+https://github.com/s3s-project/s3s.git?rev=ac13a56#ac13a568c44f2da640d0292269a0ee02ec8da433"
dependencies = [
"arrayvec",
"async-trait",
@@ -8865,9 +8864,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.149"
version = "1.0.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86"
checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da"
dependencies = [
"itoa",
"memchr",
@@ -9409,6 +9408,7 @@ dependencies = [
"cfg-if",
"libc",
"psm",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
@@ -9730,7 +9730,7 @@ dependencies = [
"getrandom 0.3.4",
"once_cell",
"rustix 1.1.3",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -10467,9 +10467,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.8"
version = "2.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed"
checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b"
dependencies = [
"form_urlencoded",
"idna",
@@ -10771,7 +10771,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]

View File

@@ -50,7 +50,7 @@ resolver = "2"
edition = "2024"
license = "Apache-2.0"
repository = "https://github.com/rustfs/rustfs"
rust-version = "1.90"
rust-version = "1.88"
version = "0.0.5"
homepage = "https://rustfs.com"
description = "RustFS is a high-performance distributed object storage software built using Rust, one of the most popular languages worldwide. "
@@ -136,7 +136,7 @@ rmcp = { version = "0.12.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.149", features = ["raw_value"] }
serde_json = { version = "1.0.148", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.2.0"
@@ -150,7 +150,7 @@ hmac = { version = "0.13.0-rc.3" }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
pbkdf2 = "0.13.0-rc.5"
rsa = { version = "0.10.0-rc.11" }
rustls = { version = "0.23.36", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] }
rustls = { version = "0.23.35" }
rustls-pemfile = "2.2.0"
rustls-pki-types = "1.13.2"
sha1 = "0.11.0-rc.3"
@@ -184,7 +184,6 @@ criterion = { version = "0.8", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
datafusion = "51.0.0"
derive_builder = "0.20.2"
dunce = "1.0.5"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.5"
@@ -225,7 +224,7 @@ regex = { version = "1.12.2" }
rumqttc = { version = "0.25.1" }
rust-embed = { version = "8.9.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.13.0-alpha", features = ["minio"], git = "https://github.com/s3s-project/s3s.git", branch = "main" }
s3s = { version = "0.13.0-alpha", features = ["minio"], git = "https://github.com/s3s-project/s3s.git", rev = "ac13a56" }
serial_test = "3.3.1"
shadow-rs = { version = "1.5.0", default-features = false }
siphasher = "1.0.1"
@@ -246,13 +245,13 @@ tracing-error = "0.2.1"
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "time"] }
transform-stream = "0.3.1"
url = "2.5.8"
url = "2.5.7"
urlencoding = "2.1.3"
uuid = { version = "1.19.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
vaultrs = { version = "0.7.4" }
walkdir = "2.5.0"
wildmatch = { version = "2.6.1", features = ["serde"] }
windows = { version = "0.62.2" }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "7.0.0"
zstd = "0.13.3"

View File

@@ -1,4 +1,4 @@
FROM alpine:3.23 AS build
FROM alpine:3.22 AS build
ARG TARGETARCH
ARG RELEASE=latest
@@ -40,7 +40,7 @@ RUN set -eux; \
rm -rf rustfs.zip /build/.tmp || true
FROM alpine:3.23
FROM alpine:3.22
ARG RELEASE=latest
ARG BUILD_DATE

View File

@@ -16,7 +16,7 @@ ARG BUILDPLATFORM
# -----------------------------
# Build stage
# -----------------------------
FROM rust:1.91-trixie AS builder
FROM rust:1.88-bookworm AS builder
# Re-declare args after FROM
ARG TARGETPLATFORM

View File

@@ -83,13 +83,6 @@ Unlike other storage systems, RustFS is released under the permissible Apache 2.
| **Edge & IoT** | **Strong Edge Support**<br>Ideal for secure, innovative edge devices. | **Weak Edge Support**<br>Often too heavy for edge gateways. |
| **Risk Profile** | **Enterprise Risk Mitigation**<br>Clear IP rights and safe for commercial use. | **Legal Risks**<br>Intellectual property ambiguity and usage restrictions. |
## Staying ahead
Star RustFS on GitHub and be instantly notified of new releases.
<img src="https://github.com/user-attachments/assets/7ee40bb4-3e46-4eac-b0d0-5fbeb85ff8f3" />
## Quickstart
To get started with RustFS, follow these steps:

View File

@@ -86,15 +86,6 @@ RustFS 是一个基于 Rust 构建的高性能分布式对象存储系统。Rust
| **成本** | **稳定且免费**<br>免费社区支持,稳定的商业定价。 | **高昂成本**<br>1PiB 的成本可能高达 250,000 美元。 |
| **风险控制** | **企业级风险规避**<br>清晰的知识产权,商业使用安全无忧。 | **法律风险**<br>知识产权归属模糊及使用限制风险。 |
## 保持领先
在 GitHub 上为 RustFS 点赞,即可第一时间收到新版本发布通知。
<img src="https://github.com/user-attachments/assets/7ee40bb4-3e46-4eac-b0d0-5fbeb85ff8f3" />
## 快速开始
请按照以下步骤快速上手 RustFS

View File

@@ -306,7 +306,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
versions_count = versions_count.saturating_add(1);
if latest_file_info.is_none()
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false, false)
&& let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false)
{
latest_file_info = Some(info);
}

View File

@@ -21,7 +21,6 @@ use futures::stream::FuturesUnordered;
use hashbrown::{HashMap, HashSet};
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, audit::AUDIT_ROUTE_PREFIX};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::arn::TargetID;
use rustfs_targets::{Target, TargetError, target::ChannelTargetType};
use std::str::FromStr;
use std::sync::Arc;
@@ -393,80 +392,4 @@ impl AuditRegistry {
Ok(())
}
/// Creates a unique key for a target based on its type and ID
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `String` - The unique key for the target.
pub fn create_key(&self, target_type: &str, target_id: &str) -> String {
let key = TargetID::new(target_id.to_string(), target_type.to_string());
info!(target_type = %target_type, "Create key for {}", key);
key.to_string()
}
/// Enables a target (placeholder, assumes target exists)
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn enable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
if self.get_target(&key).is_some() {
info!("Target {}-{} enabled", target_type, target_id);
Ok(())
} else {
Err(AuditError::Configuration(
format!("Target not found: {}-{}", target_type, target_id),
None,
))
}
}
/// Disables a target (placeholder, assumes target exists)
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn disable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
if self.get_target(&key).is_some() {
info!("Target {}-{} disabled", target_type, target_id);
Ok(())
} else {
Err(AuditError::Configuration(
format!("Target not found: {}-{}", target_type, target_id),
None,
))
}
}
/// Upserts a target into the registry
///
/// # Arguments
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
/// * `target_id` - The identifier for the target instance.
/// * `target` - The target instance to be upserted.
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure.
pub fn upsert_target(
&mut self,
target_type: &str,
target_id: &str,
target: Box<dyn Target<AuditEntry> + Send + Sync>,
) -> AuditResult<()> {
let key = self.create_key(target_type, target_id);
self.targets.insert(key, target);
Ok(())
}
}

View File

@@ -274,9 +274,9 @@ impl AuditSystem {
drop(state);
let registry = self.registry.lock().await;
let target_keys = registry.list_targets();
let target_ids = registry.list_targets();
if target_keys.is_empty() {
if target_ids.is_empty() {
warn!("No audit targets configured for dispatch");
return Ok(());
}
@@ -284,22 +284,22 @@ impl AuditSystem {
// Dispatch to all targets concurrently
let mut tasks = Vec::new();
for target_key in target_keys {
if let Some(target) = registry.get_target(&target_key) {
for target_id in target_ids {
if let Some(target) = registry.get_target(&target_id) {
let entry_clone = Arc::clone(&entry);
let target_key_clone = target_key.clone();
let target_id_clone = target_id.clone();
// Create EntityTarget for the audit log entry
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: entry.event, // Default, should be derived from entry
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
data: (*entry_clone).clone(),
};
let task = async move {
let result = target.save(Arc::new(entity_target)).await;
(target_key_clone, result)
(target_id_clone, result)
};
tasks.push(task);
@@ -312,14 +312,14 @@ impl AuditSystem {
let mut errors = Vec::new();
let mut success_count = 0;
for (target_key, result) in results {
for (target_id, result) in results {
match result {
Ok(_) => {
success_count += 1;
observability::record_target_success();
}
Err(e) => {
error!(target_id = %target_key, error = %e, "Failed to dispatch audit log to target");
error!(target_id = %target_id, error = %e, "Failed to dispatch audit log to target");
errors.push(e);
observability::record_target_failure();
}
@@ -360,18 +360,18 @@ impl AuditSystem {
drop(state);
let registry = self.registry.lock().await;
let target_keys = registry.list_targets();
let target_ids = registry.list_targets();
if target_keys.is_empty() {
if target_ids.is_empty() {
warn!("No audit targets configured for batch dispatch");
return Ok(());
}
let mut tasks = Vec::new();
for target_key in target_keys {
if let Some(target) = registry.get_target(&target_key) {
for target_id in target_ids {
if let Some(target) = registry.get_target(&target_id) {
let entries_clone: Vec<_> = entries.iter().map(Arc::clone).collect();
let target_key_clone = target_key.clone();
let target_id_clone = target_id.clone();
let task = async move {
let mut success_count = 0;
@@ -380,7 +380,7 @@ impl AuditSystem {
let entity_target = EntityTarget {
object_name: entry.api.name.clone().unwrap_or_default(),
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
event_name: entry.event,
event_name: rustfs_targets::EventName::ObjectCreatedPut,
data: (*entry).clone(),
};
match target.save(Arc::new(entity_target)).await {
@@ -388,7 +388,7 @@ impl AuditSystem {
Err(e) => errors.push(e),
}
}
(target_key_clone, success_count, errors)
(target_id_clone, success_count, errors)
};
tasks.push(task);
}
@@ -418,7 +418,6 @@ impl AuditSystem {
}
/// Starts the audit stream processing for a target with batching and retry logic
///
/// # Arguments
/// * `store` - The store from which to read audit entries
/// * `target` - The target to which audit entries will be sent
@@ -502,7 +501,7 @@ impl AuditSystem {
/// Enables a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to enable, TargetID to string
/// * `target_id` - The ID of the target to enable
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -521,7 +520,7 @@ impl AuditSystem {
/// Disables a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to disable, TargetID to string
/// * `target_id` - The ID of the target to disable
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -540,7 +539,7 @@ impl AuditSystem {
/// Removes a target from the system
///
/// # Arguments
/// * `target_id` - The ID of the target to remove, TargetID to string
/// * `target_id` - The ID of the target to remove
///
/// # Returns
/// * `AuditResult<()>` - Result indicating success or failure
@@ -560,7 +559,7 @@ impl AuditSystem {
/// Updates or inserts a target
///
/// # Arguments
/// * `target_id` - The ID of the target to upsert, TargetID to string
/// * `target_id` - The ID of the target to upsert
/// * `target` - The target instance to insert or update
///
/// # Returns
@@ -597,7 +596,7 @@ impl AuditSystem {
/// Gets information about a specific target
///
/// # Arguments
/// * `target_id` - The ID of the target to retrieve, TargetID to string
/// * `target_id` - The ID of the target to retrieve
///
/// # Returns
/// * `Option<String>` - Target ID if found

View File

@@ -108,7 +108,6 @@ google-cloud-storage = { workspace = true }
google-cloud-auth = { workspace = true }
aws-config = { workspace = true }
faster-hex = { workspace = true }
dunce = { workspace = true }
[dev-dependencies]

View File

@@ -95,22 +95,22 @@ impl DiskHealthTracker {
/// Check if disk is faulty
pub fn is_faulty(&self) -> bool {
self.status.load(Ordering::Acquire) == DISK_HEALTH_FAULTY
self.status.load(Ordering::Relaxed) == DISK_HEALTH_FAULTY
}
/// Set disk as faulty
pub fn set_faulty(&self) {
self.status.store(DISK_HEALTH_FAULTY, Ordering::Release);
self.status.store(DISK_HEALTH_FAULTY, Ordering::Relaxed);
}
/// Set disk as OK
pub fn set_ok(&self) {
self.status.store(DISK_HEALTH_OK, Ordering::Release);
self.status.store(DISK_HEALTH_OK, Ordering::Relaxed);
}
pub fn swap_ok_to_faulty(&self) -> bool {
self.status
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::AcqRel, Ordering::Relaxed)
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}
@@ -131,7 +131,7 @@ impl DiskHealthTracker {
/// Get last success timestamp
pub fn last_success(&self) -> i64 {
self.last_success.load(Ordering::Acquire)
self.last_success.load(Ordering::Relaxed)
}
}

View File

@@ -21,7 +21,6 @@ use super::{
};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use crate::config::storageclass::DEFAULT_INLINE_BLOCK;
use crate::data_usage::local_snapshot::ensure_data_usage_layout;
use crate::disk::error::FileAccessDeniedWithContext;
use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error};
@@ -129,8 +128,7 @@ impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self> {
debug!("Creating local disk");
// Use optimized path resolution instead of absolutize() for better performance
// Use dunce::canonicalize instead of std::fs::canonicalize to avoid UNC paths on Windows
let root = match dunce::canonicalize(ep.get_file_path()) {
let root = match std::fs::canonicalize(ep.get_file_path()) {
Ok(path) => path,
Err(e) => {
if e.kind() == ErrorKind::NotFound {
@@ -491,17 +489,9 @@ impl LocalDisk {
let file_dir = self.get_bucket_path(volume)?;
let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?;
get_file_info(
&data,
volume,
path,
version_id,
FileInfoOpts {
data: opts.read_data,
include_free_versions: false,
},
)
.map_err(|_e| DiskError::Unexpected)
get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data })
.await
.map_err(|_e| DiskError::Unexpected)
}
// Batch metadata reading for multiple objects
@@ -2250,93 +2240,20 @@ impl DiskAPI for LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn read_version(
&self,
org_volume: &str,
_org_volume: &str,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
if !org_volume.is_empty() {
let org_volume_path = self.get_bucket_path(org_volume)?;
if !skip_access_checks(org_volume) {
access(&org_volume_path)
.await
.map_err(|e| to_access_error(e, DiskError::VolumeAccessDenied))?;
}
}
let file_path = self.get_object_path(volume, path)?;
let volume_dir = self.get_bucket_path(volume)?;
check_path_length(file_path.to_string_lossy().as_ref())?;
let file_dir = self.get_bucket_path(volume)?;
let read_data = opts.read_data;
let (data, _) = self
.read_raw(volume, volume_dir.clone(), file_path, read_data)
.await
.map_err(|e| {
if e == DiskError::FileNotFound && !version_id.is_empty() {
DiskError::FileVersionNotFound
} else {
e
}
})?;
let (data, _) = self.read_raw(volume, file_dir, file_path, read_data).await?;
let mut fi = get_file_info(
&data,
volume,
path,
version_id,
FileInfoOpts {
data: read_data,
include_free_versions: opts.incl_free_versions,
},
)?;
if opts.read_data {
if fi.data.as_ref().is_some_and(|d| !d.is_empty()) || fi.size == 0 {
if fi.inline_data() {
return Ok(fi);
}
if fi.size == 0 || fi.version_id.is_none_or(|v| v.is_nil()) {
fi.set_inline_data();
return Ok(fi);
};
if let Some(part) = fi.parts.first() {
let part_path = format!("part.{}", part.number);
let part_path = path_join_buf(&[
path,
fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(),
part_path.as_str(),
]);
let part_path = self.get_object_path(volume, part_path.as_str())?;
if lstat(&part_path).await.is_err() {
fi.set_inline_data();
return Ok(fi);
}
}
fi.data = None;
}
let inline = fi.transition_status.is_empty() && fi.data_dir.is_some() && fi.parts.len() == 1;
if inline && fi.shard_file_size(fi.parts[0].actual_size) < DEFAULT_INLINE_BLOCK as i64 {
let part_path = path_join_buf(&[
path,
fi.data_dir.map_or("".to_string(), |dir| dir.to_string()).as_str(),
format!("part.{}", fi.parts[0].number).as_str(),
]);
let part_path = self.get_object_path(volume, part_path.as_str())?;
let data = self.read_all_data(volume, volume_dir, part_path.clone()).await.map_err(|e| {
warn!("read_version read_all_data {:?} failed: {e}", part_path);
e
})?;
fi.data = Some(Bytes::from(data));
}
}
let fi = get_file_info(&data, volume, path, version_id, FileInfoOpts { data: read_data }).await?;
Ok(fi)
}

View File

@@ -1485,8 +1485,20 @@ impl SetDisks {
let object = object.clone();
let version_id = version_id.clone();
tokio::spawn(async move {
if let Some(disk) = disk {
disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await
if let Some(disk) = disk
&& disk.is_online().await
{
if version_id.is_empty() {
match disk.read_xl(&bucket, &object, read_data).await {
Ok(info) => {
let fi = file_info_from_raw(info, &bucket, &object, read_data).await?;
Ok(fi)
}
Err(err) => Err(err),
}
} else {
disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await
}
} else {
Err(DiskError::DiskNotFound)
}
@@ -1614,7 +1626,7 @@ impl SetDisks {
bucket: &str,
object: &str,
read_data: bool,
incl_free_vers: bool,
_incl_free_vers: bool,
) -> (Vec<FileInfo>, Vec<Option<DiskError>>) {
let mut metadata_array = vec![None; fileinfos.len()];
let mut meta_file_infos = vec![FileInfo::default(); fileinfos.len()];
@@ -1664,7 +1676,7 @@ impl SetDisks {
..Default::default()
};
let finfo = match meta.into_fileinfo(bucket, object, "", true, incl_free_vers, true) {
let finfo = match meta.into_fileinfo(bucket, object, "", true, true) {
Ok(res) => res,
Err(err) => {
for item in errs.iter_mut() {
@@ -1691,7 +1703,7 @@ impl SetDisks {
for (idx, meta_op) in metadata_array.iter().enumerate() {
if let Some(meta) = meta_op {
match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, incl_free_vers, true) {
match meta.into_fileinfo(bucket, object, vid.to_string().as_str(), read_data, true) {
Ok(res) => meta_file_infos[idx] = res,
Err(err) => errs[idx] = Some(err.into()),
}

View File

@@ -28,15 +28,14 @@ use http::{HeaderMap, HeaderValue};
use rustfs_common::heal_channel::HealOpts;
use rustfs_filemeta::{
FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState,
ReplicationStatusType, RestoreStatusOps as _, VersionPurgeStatusType, parse_restore_obj_status, replication_statuses_map,
version_purge_statuses_map,
ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
};
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_rio::Checksum;
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_RESTORE, AMZ_STORAGE_CLASS};
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -757,24 +756,7 @@ impl ObjectInfo {
.ok()
});
let replication_status_internal = fi
.replication_state_internal
.as_ref()
.and_then(|v| v.replication_status_internal.clone());
let version_purge_status_internal = fi
.replication_state_internal
.as_ref()
.and_then(|v| v.version_purge_status_internal.clone());
let mut replication_status = fi.replication_status();
if replication_status.is_empty()
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS).cloned()
&& status == ReplicationStatusType::Replica.as_str()
{
replication_status = ReplicationStatusType::Replica;
}
let version_purge_status = fi.version_purge_status();
// TODO:ReplicationState
let transitioned_object = TransitionedObject {
name: fi.transitioned_objname.clone(),
@@ -795,24 +777,10 @@ impl ObjectInfo {
};
// Extract storage class from metadata, default to STANDARD if not found
let storage_class = if !fi.transition_tier.is_empty() {
Some(fi.transition_tier.clone())
} else {
fi.metadata
.get(AMZ_STORAGE_CLASS)
.cloned()
.or_else(|| Some(storageclass::STANDARD.to_string()))
};
let mut restore_ongoing = false;
let mut restore_expires = None;
if let Some(restore_status) = fi.metadata.get(AMZ_RESTORE).cloned() {
//
if let Ok(restore_status) = parse_restore_obj_status(&restore_status) {
restore_ongoing = restore_status.on_going();
restore_expires = restore_status.expiry();
}
}
let storage_class = metadata
.get(AMZ_STORAGE_CLASS)
.cloned()
.or_else(|| Some(storageclass::STANDARD.to_string()));
// Convert parts from rustfs_filemeta::ObjectPartInfo to store_api::ObjectPartInfo
let parts = fi
@@ -830,8 +798,6 @@ impl ObjectInfo {
})
.collect();
// TODO: part checksums
ObjectInfo {
bucket: bucket.to_string(),
name,
@@ -856,12 +822,6 @@ impl ObjectInfo {
transitioned_object,
checksum: fi.checksum.clone(),
storage_class,
restore_ongoing,
restore_expires,
replication_status_internal,
replication_status,
version_purge_status_internal,
version_purge_status,
..Default::default()
}
}

View File

@@ -505,10 +505,6 @@ impl FileInfo {
ReplicationStatusType::Empty
}
}
pub fn shard_file_size(&self, total_length: i64) -> i64 {
self.erasure.shard_file_size(total_length)
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -594,7 +590,7 @@ impl RestoreStatusOps for RestoreStatus {
}
}
pub fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
let tokens: Vec<&str> = restore_hdr.splitn(2, ",").collect();
let progress_tokens: Vec<&str> = tokens[0].splitn(2, "=").collect();
if progress_tokens.len() != 2 {

View File

@@ -14,8 +14,7 @@
use crate::{
ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState,
ReplicationStatusType, Result, TIER_FV_ID, TIER_FV_MARKER, VersionPurgeStatusType, replication_statuses_map,
version_purge_statuses_map,
ReplicationStatusType, Result, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
};
use byteorder::ByteOrder;
use bytes::Bytes;
@@ -910,7 +909,6 @@ impl FileMeta {
path: &str,
version_id: &str,
read_data: bool,
include_free_versions: bool,
all_parts: bool,
) -> Result<FileInfo> {
let vid = {
@@ -923,35 +921,11 @@ impl FileMeta {
let mut is_latest = true;
let mut succ_mod_time = None;
let mut non_free_versions = self.versions.len();
let mut found = false;
let mut found_free_version = None;
let mut found_fi = None;
for ver in self.versions.iter() {
let header = &ver.header;
// TODO: freeVersion
if header.free_version() {
non_free_versions -= 1;
if include_free_versions && found_free_version.is_none() {
let mut found_free_fi = FileMetaVersion::default();
if found_free_fi.unmarshal_msg(&ver.meta).is_ok() && found_free_fi.version_type != VersionType::Invalid {
let mut free_fi = found_free_fi.into_fileinfo(volume, path, all_parts);
free_fi.is_latest = true;
found_free_version = Some(free_fi);
}
}
if header.version_id != Some(vid) {
continue;
}
}
if found {
continue;
}
if !version_id.is_empty() && header.version_id != Some(vid) {
is_latest = false;
@@ -959,8 +933,6 @@ impl FileMeta {
continue;
}
found = true;
let mut fi = ver.into_fileinfo(volume, path, all_parts)?;
fi.is_latest = is_latest;
@@ -975,25 +947,7 @@ impl FileMeta {
.map(bytes::Bytes::from);
}
found_fi = Some(fi);
}
if !found {
if version_id.is_empty() {
if include_free_versions
&& non_free_versions == 0
&& let Some(free_version) = found_free_version
{
return Ok(free_version);
}
return Err(Error::FileNotFound);
} else {
return Err(Error::FileVersionNotFound);
}
}
if let Some(mut fi) = found_fi {
fi.num_versions = non_free_versions;
fi.num_versions = self.versions.len();
return Ok(fi);
}
@@ -1813,27 +1767,14 @@ impl MetaObject {
metadata.insert(k.to_owned(), v.to_owned());
}
let tier_fvidkey = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}").to_lowercase();
let tier_fvmarker_key = format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}").to_lowercase();
for (k, v) in &self.meta_sys {
let lower_k = k.to_lowercase();
if lower_k == tier_fvidkey || lower_k == tier_fvmarker_key {
continue;
}
if lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase() {
continue;
}
if lower_k == AMZ_STORAGE_CLASS.to_lowercase() && v == b"STANDARD" {
if k == AMZ_STORAGE_CLASS && v == b"STANDARD" {
continue;
}
if k.starts_with(RESERVED_METADATA_PREFIX)
|| k.starts_with(RESERVED_METADATA_PREFIX_LOWER)
|| lower_k == VERSION_PURGE_STATUS_KEY.to_lowercase()
|| k == VERSION_PURGE_STATUS_KEY
{
metadata.insert(k.to_owned(), String::from_utf8(v.to_owned()).unwrap_or_default());
}
@@ -2570,31 +2511,15 @@ pub fn merge_file_meta_versions(
merged
}
pub fn file_info_from_raw(
ri: RawFileInfo,
bucket: &str,
object: &str,
read_data: bool,
include_free_versions: bool,
) -> Result<FileInfo> {
get_file_info(
&ri.buf,
bucket,
object,
"",
FileInfoOpts {
data: read_data,
include_free_versions,
},
)
pub async fn file_info_from_raw(ri: RawFileInfo, bucket: &str, object: &str, read_data: bool) -> Result<FileInfo> {
get_file_info(&ri.buf, bucket, object, "", FileInfoOpts { data: read_data }).await
}
pub struct FileInfoOpts {
pub data: bool,
pub include_free_versions: bool,
}
pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result<FileInfo> {
pub async fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opts: FileInfoOpts) -> Result<FileInfo> {
let vid = {
if version_id.is_empty() {
None
@@ -2616,7 +2541,7 @@ pub fn get_file_info(buf: &[u8], volume: &str, path: &str, version_id: &str, opt
});
}
let fi = meta.into_fileinfo(volume, path, version_id, opts.data, opts.include_free_versions, true)?;
let fi = meta.into_fileinfo(volume, path, version_id, opts.data, true)?;
Ok(fi)
}

View File

@@ -12,10 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{
Error, FileInfo, FileInfoOpts, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, get_file_info,
merge_file_meta_versions,
};
use crate::{Error, FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, merge_file_meta_versions};
use rmp::Marker;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
@@ -144,7 +141,8 @@ impl MetaCacheEntry {
});
}
if let Some(fm) = &self.cached {
if self.cached.is_some() {
let fm = self.cached.as_ref().unwrap();
if fm.versions.is_empty() {
return Ok(FileInfo {
volume: bucket.to_owned(),
@@ -156,20 +154,14 @@ impl MetaCacheEntry {
});
}
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false, true)?;
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
return Ok(fi);
}
get_file_info(
&self.metadata,
bucket,
self.name.as_str(),
"",
FileInfoOpts {
data: false,
include_free_versions: false,
},
)
let mut fm = FileMeta::new();
fm.unmarshal_msg(&self.metadata)?;
let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
Ok(fi)
}
pub fn file_info_versions(&self, bucket: &str) -> Result<FileInfoVersions> {

View File

@@ -63,23 +63,16 @@ pub struct Policy {
impl Policy {
pub async fn is_allowed(&self, args: &Args<'_>) -> bool {
// First, check all Deny statements - if any Deny matches, deny the request
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Deny)) {
if !statement.is_allowed(args).await {
return false;
}
}
// Owner has all permissions
if args.is_owner {
if args.deny_only || args.is_owner {
return true;
}
if args.deny_only {
return false;
}
// Check Allow statements
for statement in self.statements.iter().filter(|s| matches!(s.effect, Effect::Allow)) {
if statement.is_allowed(args).await {
return true;
@@ -601,102 +594,6 @@ mod test {
Ok(())
}
#[tokio::test]
async fn test_deny_only_security_fix() -> Result<()> {
let data = r#"
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": ["arn:aws:s3:::bucket1/*"]
}
]
}
"#;
let policy = Policy::parse_config(data.as_bytes())?;
let conditions = HashMap::new();
let claims = HashMap::new();
// Test with deny_only=true but no matching Allow statement
let args_deny_only = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::PutObjectAction),
bucket: "bucket2",
conditions: &conditions,
is_owner: false,
object: "test.txt",
claims: &claims,
deny_only: true, // Should NOT automatically allow
};
// Should return false because deny_only=true, regardless of whether there's a matching Allow statement
assert!(
!policy.is_allowed(&args_deny_only).await,
"deny_only should return false when deny_only=true, regardless of Allow statements"
);
// Test with deny_only=true and matching Allow statement
let args_deny_only_allowed = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::GetObjectAction),
bucket: "bucket1",
conditions: &conditions,
is_owner: false,
object: "test.txt",
claims: &claims,
deny_only: true,
};
// Should return false because deny_only=true prevents checking Allow statements (unless is_owner=true)
assert!(
!policy.is_allowed(&args_deny_only_allowed).await,
"deny_only should return false even with matching Allow statement"
);
// Test with deny_only=false (normal case)
let args_normal = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::GetObjectAction),
bucket: "bucket1",
conditions: &conditions,
is_owner: false,
object: "test.txt",
claims: &claims,
deny_only: false,
};
// Should return true because there's an Allow statement
assert!(
policy.is_allowed(&args_normal).await,
"normal policy evaluation should allow with matching Allow statement"
);
let args_owner_deny_only = Args {
account: "testuser",
groups: &None,
action: Action::S3Action(crate::policy::action::S3Action::PutObjectAction),
bucket: "bucket2",
conditions: &conditions,
is_owner: true, // Owner has all permissions
object: "test.txt",
claims: &claims,
deny_only: true, // Even with deny_only=true, owner should be allowed
};
assert!(
policy.is_allowed(&args_owner_deny_only).await,
"owner should retain all permissions even when deny_only=true"
);
Ok(())
}
#[tokio::test]
async fn test_aws_username_policy_variable() -> Result<()> {
let data = r#"

View File

@@ -37,6 +37,11 @@ impl TargetID {
Self { id, name }
}
/// Convert to string representation
pub fn to_id_string(&self) -> String {
format!("{}:{}", self.id, self.name)
}
/// Create an ARN
pub fn to_arn(&self, region: &str) -> ARN {
ARN {
@@ -75,7 +80,7 @@ impl Serialize for TargetID {
where
S: Serializer,
{
serializer.serialize_str(&self.to_string())
serializer.serialize_str(&self.to_id_string())
}
}
@@ -125,7 +130,7 @@ impl ARN {
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
return String::new();
}
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id)
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string())
}
/// Parsing ARN from string

View File

@@ -72,7 +72,7 @@ rand = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
[target.'cfg(windows)'.dependencies]
windows = { workspace = true, optional = true, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
winapi = { workspace = true, optional = true, features = ["std", "fileapi", "minwindef", "ntdef", "winnt"] }
[lints]
workspace = true
@@ -89,7 +89,7 @@ compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"]
string = ["dep:regex"]
crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper", "dep:sha1"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher", "dep:hex-simd", "dep:crc-fast"]
os = ["dep:nix", "dep:tempfile", "dep:windows"] # operating system utilities
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
integration = [] # integration test features
sys = ["dep:sysinfo"] # system information features
http = ["dep:convert_case", "dep:http", "dep:regex"]

View File

@@ -51,7 +51,6 @@ pub const AMZ_TAG_COUNT: &str = "x-amz-tagging-count";
pub const AMZ_TAG_DIRECTIVE: &str = "X-Amz-Tagging-Directive";
// S3 transition restore
pub const AMZ_RESTORE: &str = "x-amz-restore";
pub const AMZ_RESTORE_EXPIRY_DAYS: &str = "X-Amz-Restore-Expiry-Days";
pub const AMZ_RESTORE_REQUEST_DATE: &str = "X-Amz-Restore-Request-Date";

View File

@@ -14,6 +14,8 @@
use bytes::Bytes;
use futures::{Stream, StreamExt, pin_mut};
#[cfg(test)]
use std::sync::MutexGuard;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
@@ -81,7 +83,7 @@ fn reset_dns_resolver_inner() {
#[cfg(test)]
pub struct MockResolverGuard {
_lock: std::sync::MutexGuard<'static, ()>,
_lock: MutexGuard<'static, ()>,
}
#[cfg(test)]

View File

@@ -1,3 +1,4 @@
#![allow(unsafe_code)] // TODO: audit unsafe code
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,232 +13,149 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unsafe_code)] // TODO: audit unsafe code
use crate::os::{DiskInfo, IOStats};
use super::{DiskInfo, IOStats};
use std::io::Error;
use std::mem;
use std::os::windows::ffi::OsStrExt;
use std::path::Path;
use windows::Win32::Foundation::MAX_PATH;
use windows::Win32::Storage::FileSystem::{GetDiskFreeSpaceExW, GetDiskFreeSpaceW, GetVolumeInformationW, GetVolumePathNameW};
use winapi::shared::minwindef::{DWORD, MAX_PATH};
use winapi::shared::ntdef::ULARGE_INTEGER;
use winapi::um::fileapi::{GetDiskFreeSpaceExW, GetDiskFreeSpaceW, GetVolumeInformationW, GetVolumePathNameW};
use winapi::um::winnt::{LPCWSTR, WCHAR};
/// Returns total and free bytes available in a directory, e.g. `C:\`.
pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
let path_wide = p
let path_display = p.as_ref().display();
let path_wide: Vec<WCHAR> = p
.as_ref()
.to_string_lossy()
.encode_utf16()
.chain(std::iter::once(0))
.collect::<Vec<u16>>();
.to_path_buf()
.into_os_string()
.encode_wide()
.chain(std::iter::once(0)) // Null-terminate the string
.collect();
let mut free_bytes_available = 0u64;
let mut total_number_of_bytes = 0u64;
let mut total_number_of_free_bytes = 0u64;
let mut lp_free_bytes_available: ULARGE_INTEGER = unsafe { mem::zeroed() };
let mut lp_total_number_of_bytes: ULARGE_INTEGER = unsafe { mem::zeroed() };
let mut lp_total_number_of_free_bytes: ULARGE_INTEGER = unsafe { mem::zeroed() };
unsafe {
let success = unsafe {
GetDiskFreeSpaceExW(
windows::core::PCWSTR::from_raw(path_wide.as_ptr()),
Some(&mut free_bytes_available),
Some(&mut total_number_of_bytes),
Some(&mut total_number_of_free_bytes),
path_wide.as_ptr(),
&mut lp_free_bytes_available,
&mut lp_total_number_of_bytes,
&mut lp_total_number_of_free_bytes,
)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
};
if success == 0 {
return Err(Error::last_os_error());
}
let total = total_number_of_bytes;
let free = total_number_of_free_bytes;
let total = unsafe { *lp_total_number_of_bytes.QuadPart() };
let free = unsafe { *lp_total_number_of_free_bytes.QuadPart() };
if free > total {
return Err(Error::other(format!(
"detected free space ({free}) > total drive space ({total}), fs corruption at ({}). please run 'fsck'",
p.as_ref().display()
"detected free space ({free}) > total drive space ({total}), fs corruption at ({path_display}). please run 'fsck'"
)));
}
let mut sectors_per_cluster = 0u32;
let mut bytes_per_sector = 0u32;
let mut number_of_free_clusters = 0u32;
let mut total_number_of_clusters = 0u32;
let mut lp_sectors_per_cluster: DWORD = 0;
let mut lp_bytes_per_sector: DWORD = 0;
let mut lp_number_of_free_clusters: DWORD = 0;
let mut lp_total_number_of_clusters: DWORD = 0;
unsafe {
let success = unsafe {
GetDiskFreeSpaceW(
windows::core::PCWSTR::from_raw(path_wide.as_ptr()),
Some(&mut sectors_per_cluster),
Some(&mut bytes_per_sector),
Some(&mut number_of_free_clusters),
Some(&mut total_number_of_clusters),
path_wide.as_ptr(),
&mut lp_sectors_per_cluster,
&mut lp_bytes_per_sector,
&mut lp_number_of_free_clusters,
&mut lp_total_number_of_clusters,
)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
};
if success == 0 {
return Err(Error::last_os_error());
}
Ok(DiskInfo {
total,
free,
used: total - free,
files: total_number_of_clusters as u64,
ffree: number_of_free_clusters as u64,
fstype: get_fs_type(&path_wide).unwrap_or_default(),
files: lp_total_number_of_clusters as u64,
ffree: lp_number_of_free_clusters as u64,
// TODO This field is currently unused, and since this logic causes a
// NotFound error during startup on Windows systems, it has been commented out here
//
// The error occurs in GetVolumeInformationW where the path parameter
// is of type [WCHAR; MAX_PATH]. For a drive letter, there are excessive
// trailing zeros, which causes the failure here.
//
// fstype: get_fs_type(&path_wide)?,
..Default::default()
})
}
/// Returns leading volume name.
///
/// # Arguments
/// * `v` - A slice of u16 representing the path in UTF-16 encoding
///
/// # Returns
/// * `Ok(Vec<u16>)` containing the volume name in UTF-16 encoding.
/// * `Err` if an error occurs during the operation.
#[allow(dead_code)]
fn get_volume_name(v: &[u16]) -> std::io::Result<Vec<u16>> {
let mut volume_name_buffer = [0u16; MAX_PATH as usize];
fn get_volume_name(v: &[WCHAR]) -> std::io::Result<LPCWSTR> {
let volume_name_size: DWORD = MAX_PATH as _;
let mut lp_volume_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
unsafe {
GetVolumePathNameW(windows::core::PCWSTR::from_raw(v.as_ptr()), &mut volume_name_buffer)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
let success = unsafe { GetVolumePathNameW(v.as_ptr(), lp_volume_name_buffer.as_mut_ptr(), volume_name_size) };
if success == 0 {
return Err(Error::last_os_error());
}
let len = volume_name_buffer
.iter()
.position(|&x| x == 0)
.unwrap_or(volume_name_buffer.len());
Ok(volume_name_buffer[..len].to_vec())
Ok(lp_volume_name_buffer.as_ptr())
}
#[allow(dead_code)]
fn utf16_to_string(v: &[u16]) -> String {
fn utf16_to_string(v: &[WCHAR]) -> String {
let len = v.iter().position(|&x| x == 0).unwrap_or(v.len());
String::from_utf16_lossy(&v[..len])
}
/// Returns the filesystem type of the underlying mounted filesystem
///
/// # Arguments
/// * `p` - A slice of u16 representing the path in UTF-16 encoding
///
/// # Returns
/// * `Ok(String)` containing the filesystem type (e.g., "NTFS", "FAT32").
/// * `Err` if an error occurs during the operation.
#[allow(dead_code)]
fn get_fs_type(p: &[u16]) -> std::io::Result<String> {
fn get_fs_type(p: &[WCHAR]) -> std::io::Result<String> {
let path = get_volume_name(p)?;
let mut volume_serial_number = 0u32;
let mut maximum_component_length = 0u32;
let mut file_system_flags = 0u32;
let mut volume_name_buffer = [0u16; MAX_PATH as usize];
let mut file_system_name_buffer = [0u16; MAX_PATH as usize];
let volume_name_size: DWORD = MAX_PATH as _;
let n_file_system_name_size: DWORD = MAX_PATH as _;
unsafe {
let mut lp_volume_serial_number: DWORD = 0;
let mut lp_maximum_component_length: DWORD = 0;
let mut lp_file_system_flags: DWORD = 0;
let mut lp_volume_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
let mut lp_file_system_name_buffer: [WCHAR; MAX_PATH] = [0; MAX_PATH];
let success = unsafe {
GetVolumeInformationW(
windows::core::PCWSTR::from_raw(path.as_ptr()),
Some(&mut volume_name_buffer),
Some(&mut volume_serial_number),
Some(&mut maximum_component_length),
Some(&mut file_system_flags),
Some(&mut file_system_name_buffer),
path,
lp_volume_name_buffer.as_mut_ptr(),
volume_name_size,
&mut lp_volume_serial_number,
&mut lp_maximum_component_length,
&mut lp_file_system_flags,
lp_file_system_name_buffer.as_mut_ptr(),
n_file_system_name_size,
)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
};
if success == 0 {
return Err(Error::last_os_error());
}
Ok(utf16_to_string(&file_system_name_buffer))
Ok(utf16_to_string(&lp_file_system_name_buffer))
}
/// Determines if two paths are on the same disk.
///
/// # Arguments
/// * `disk1` - The first disk path as a string slice.
/// * `disk2` - The second disk path as a string slice.
///
/// # Returns
/// * `Ok(true)` if both paths are on the same disk.
/// * `Ok(false)` if both paths are on different disks.
/// * `Err` if an error occurs during the operation.
pub fn same_disk(disk1: &str, disk2: &str) -> std::io::Result<bool> {
let path1_wide: Vec<u16> = disk1.encode_utf16().chain(std::iter::once(0)).collect();
let path2_wide: Vec<u16> = disk2.encode_utf16().chain(std::iter::once(0)).collect();
let volume1 = get_volume_name(&path1_wide)?;
let volume2 = get_volume_name(&path2_wide)?;
Ok(volume1 == volume2)
pub fn same_disk(_disk1: &str, _disk2: &str) -> std::io::Result<bool> {
Ok(false)
}
/// Retrieves I/O statistics for a drive identified by its major and minor numbers.
///
/// # Arguments
/// * `major` - The major number of the drive.
/// * `minor` - The minor number of the drive.
///
/// # Returns
/// * `Ok(IOStats)` containing the I/O statistics.
/// * `Err` if an error occurs during the operation.
pub fn get_drive_stats(_major: u32, _minor: u32) -> std::io::Result<IOStats> {
// Windows does not provide direct IO stats via simple API; this is a stub
// For full implementation, consider using PDH or WMI, but that adds complexity
Ok(IOStats::default())
}
#[cfg(test)]
mod tests {
use crate::os::{get_info, same_disk};
#[cfg(target_os = "windows")]
#[test]
fn test_get_info_valid_path() {
let temp_dir = tempfile::tempdir().unwrap();
let info = get_info(temp_dir.path()).unwrap();
// Verify disk info is valid
assert!(info.total > 0);
assert!(info.free > 0);
assert!(info.used > 0);
assert!(info.files > 0);
assert!(info.ffree > 0);
assert!(!info.fstype.is_empty());
}
#[cfg(target_os = "windows")]
#[test]
fn test_get_info_invalid_path() {
use std::path::PathBuf;
let invalid_path = PathBuf::from("Z:\\invalid\\path");
let result = get_info(&invalid_path);
assert!(result.is_err());
}
#[cfg(target_os = "windows")]
#[test]
fn test_same_disk_same_path() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().to_str().unwrap();
let result = same_disk(path, path).unwrap();
assert!(result);
}
#[cfg(target_os = "windows")]
#[test]
fn test_same_disk_different_paths() {
let temp_dir1 = tempfile::tempdir().unwrap();
let temp_dir2 = tempfile::tempdir().unwrap();
let path1 = temp_dir1.path().to_str().unwrap();
let path2 = temp_dir2.path().to_str().unwrap();
let _result = same_disk(path1, path2).unwrap();
// Since both temporary directories are created in the same file system,
// they should be on the same disk in most cases
// Test passes if the function doesn't panic - the actual result depends on test environment
}
#[cfg(target_os = "windows")]
#[test]
fn get_info_with_root_drive() {
let info = get_info("C:\\").unwrap();
assert!(info.total > 0);
assert!(info.free > 0);
assert!(info.used > 0);
assert!(info.files > 0);
assert!(info.ffree > 0);
assert!(!info.fstype.is_empty());
}
}

View File

@@ -110,10 +110,6 @@ RustFS helm chart supports **standalone and distributed mode**. For standalone m
| storageclass.logStorageSize | string | `"256Mi"` | The storage size for logs PVC. |
| storageclass.name | string | `"local-path"` | The name for StorageClass. |
| tolerations | list | `[]` | |
| gatewayApi.enabled | bool | `false` | To enable/disable gateway api support. |
| gatewayApi.gatewayClass | string | `traefik` | Gateway class implementation. |
| gatewayApi.hostname | string | Hostname to access RustFS via gateway api. |
| gatewayApi.secretName | string | Secret tls to via RustFS using HTTPS. |
---
@@ -211,22 +207,6 @@ You should use `--set-file` parameter when running `helm install` command, for e
helm install rustfs rustfs/rustfs -n rustfs --set tls.enabled=true,--set-file tls.crt=./tls.crt,--set-file tls.key=./tls.key
```
# Gateway API support (alpha)
Due to [ingress nginx retirement](https://kubernetes.io/blog/2025/11/11/ingress-nginx-retirement/) in March 2026, so RustFS adds support for [gateway api](https://gateway-api.sigs.k8s.io/). Currently, RustFS only supports traefik as gateway class, more and more gateway class support will be added in the future after those classes are tested. If you want to enable gateway api, specify `gatewayApi.enabled` to `true` while specify `ingress.enabled` to `false`. After installation, you can find the `Gateway` and `HttpRoute` resources,
```
$ kubectl -n rustfs get gateway
NAME CLASS ADDRESS PROGRAMMED AGE
rustfs-gateway traefik True 169m
$ kubectl -n rustfs get httproute
NAME HOSTNAMES AGE
rustfs-route ["example.rustfs.com"] 172m
```
Then, via RustFS instance via `https://example.rustfs.com` or `http://example.rustfs.com`.
# Uninstall
Uninstalling the rustfs installation with command,

View File

@@ -10,10 +10,6 @@ metadata:
{{- end }}
spec:
replicas: 1
{{- with .Values.mode.standalone.strategy }}
strategy:
{{- toYaml . | nindent 4 }}
{{- end }}
selector:
matchLabels:
{{- include "rustfs.selectorLabels" . | nindent 6 }}

View File

@@ -1,23 +0,0 @@
{{- if .Values.gatewayApi.enabled }}
apiVersion: gateway.networking.k8s.io/v1
kind: Gateway
metadata:
name: {{ include "rustfs.fullname" . }}-gateway
spec:
gatewayClassName: {{ .Values.gatewayApi.gatewayClass }}
listeners:
- name: http
port: 80
protocol: HTTP
allowedRoutes:
namespaces:
from: Same
- name: https
port: 443
protocol: HTTPS
tls:
mode: Terminate
certificateRefs:
- name: {{ .Values.gatewayApi.secretName }}
kind: Secret
{{- end }}

View File

@@ -1,19 +0,0 @@
{{- if .Values.gatewayApi.enabled -}}
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: {{ include "rustfs.fullname" . }}-route
spec:
parentRefs:
- name: {{ include "rustfs.fullname" . }}-gateway
hostnames:
- {{ .Values.gatewayApi.hostname }}
rules:
- matches:
- path:
type: PathPrefix
value: /
backendRefs:
- name: rustfs-svc
port: 9001
{{- end }}

View File

@@ -1,4 +1,4 @@
{{- if and (or .Values.gatewayApi.enabled .Values.ingress.tls.enabled) (not .Values.ingress.tls.certManager.enabled) }}
{{- if and .Values.ingress.tls.enabled (not .Values.ingress.tls.certManager.enabled) }}
apiVersion: v1
kind: Secret
metadata:

View File

@@ -11,7 +11,7 @@ image:
# This sets the pull policy for images.
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
tag: "latest"
# This is for the secrets for pulling an image from a private repository more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
imagePullSecrets: []
@@ -30,11 +30,6 @@ fullnameOverride: ""
mode:
standalone:
enabled: false
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 0
maxUnavailable: 1
distributed:
enabled: true
@@ -135,12 +130,6 @@ ingress:
crt: tls.crt
key: tls.key
gatewayApi:
enabled: false
gatewayClass: traefik
hostname: example.rustfs.com
secretName: secret-tls
resources:
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little

View File

@@ -112,6 +112,8 @@ impl Operation for AddServiceAccount {
return Err(s3_error!(InvalidRequest, "iam not init"));
};
let deny_only = constant_time_eq(&cred.access_key, &target_user) || constant_time_eq(&cred.parent_user, &target_user);
if !iam_store
.is_allowed(&Args {
account: &cred.access_key,
@@ -128,7 +130,7 @@ impl Operation for AddServiceAccount {
is_owner: owner,
object: "",
claims: cred.claims.as_ref().unwrap_or(&HashMap::new()),
deny_only: false, // Always require explicit Allow permission
deny_only,
})
.await
{

View File

@@ -118,14 +118,12 @@ impl Operation for AddUser {
return Err(s3_error!(InvalidArgument, "access key is not utf8"));
}
// Security fix: Always require explicit Allow permission for CreateUser
// Do not use deny_only to bypass permission checks, even when creating for self
// This ensures consistent security semantics and prevents privilege escalation
let deny_only = ak == cred.access_key;
validate_admin_request(
&req.headers,
&cred,
owner,
false, // Always require explicit Allow permission
deny_only,
vec![Action::AdminAction(AdminAction::CreateUserAdminAction)],
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
)
@@ -377,14 +375,12 @@ impl Operation for GetUserInfo {
let (cred, owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// Security fix: Always require explicit Allow permission for GetUser
// Users should have explicit GetUser permission to view account information
// This ensures consistent security semantics across all admin operations
let deny_only = ak == cred.access_key;
validate_admin_request(
&req.headers,
&cred,
owner,
false, // Always require explicit Allow permission
deny_only,
vec![Action::AdminAction(AdminAction::GetUserAdminAction)],
req.extensions.get::<Option<RemoteAddr>>().and_then(|opt| opt.map(|a| a.0)),
)

View File

@@ -378,11 +378,20 @@ pub async fn start_http_server(
// Enable TCP Keepalive to detect dead clients (e.g. power loss)
// Idle: 10s, Interval: 5s, Retries: 3
let mut ka = TcpKeepalive::new().with_time(Duration::from_secs(10));
#[cfg(not(target_os = "openbsd"))]
{
ka = ka.with_interval(Duration::from_secs(5)).with_retries(3);
}
let ka = {
#[cfg(not(target_os = "openbsd"))]
let ka = TcpKeepalive::new()
.with_time(Duration::from_secs(10))
.with_interval(Duration::from_secs(5))
.with_retries(3);
// On OpenBSD socket2 only supports configuring the initial
// TCP keepalive timeout; intervals and retries cannot be set.
#[cfg(target_os = "openbsd")]
let ka = TcpKeepalive::new().with_time(Duration::from_secs(10));
ka
};
if let Err(err) = socket_ref.set_tcp_keepalive(&ka) {
warn!(?err, "Failed to set TCP_KEEPALIVE");
@@ -734,26 +743,10 @@ fn check_auth(req: Request<()>) -> std::result::Result<Request<()>, Status> {
Ok(req)
}
/// Determines the listen backlog size.
///
/// It tries to read the system's maximum connection queue length (`somaxconn`).
/// If reading fails, it falls back to a default value (e.g., 1024).
/// This makes the backlog size adaptive to the system configuration.
#[cfg(target_os = "linux")]
fn get_listen_backlog() -> i32 {
const DEFAULT_BACKLOG: i32 = 1024;
// For Linux, read from /proc/sys/net/core/somaxconn
match std::fs::read_to_string("/proc/sys/net/core/somaxconn") {
Ok(s) => s.trim().parse().unwrap_or(DEFAULT_BACKLOG),
Err(_) => DEFAULT_BACKLOG,
}
}
// For macOS and BSD variants use the syscall way of getting the connection queue length.
#[cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
#[allow(unsafe_code)]
fn get_listen_backlog() -> i32 {
fn get_conn_queue_len() -> i32 {
const DEFAULT_BACKLOG: i32 = 1024;
#[cfg(target_os = "openbsd")]
@@ -780,15 +773,37 @@ fn get_listen_backlog() -> i32 {
buf[0]
}
// Fallback for Windows and other operating systems
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
)))]
/// Determines the listen backlog size.
///
/// It tries to read the system's maximum connection queue length (`somaxconn`).
/// If reading fails, it falls back to a default value (e.g., 1024).
/// This makes the backlog size adaptive to the system configuration.
fn get_listen_backlog() -> i32 {
const DEFAULT_BACKLOG: i32 = 1024;
DEFAULT_BACKLOG
#[cfg(target_os = "linux")]
{
const DEFAULT_BACKLOG: i32 = 1024;
// For Linux, read from /proc/sys/net/core/somaxconn
match std::fs::read_to_string("/proc/sys/net/core/somaxconn") {
Ok(s) => s.trim().parse().unwrap_or(DEFAULT_BACKLOG),
Err(_) => DEFAULT_BACKLOG,
}
}
#[cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
{
get_conn_queue_len()
}
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
)))]
{
// Fallback for Windows and other operating systems
DEFAULT_BACKLOG
}
}

View File

@@ -817,14 +817,11 @@ impl S3Access for FS {
authorize_request(req, Action::S3Action(S3Action::ListBucketMultipartUploadsAction)).await
}
/// Checks whether the `ListObjectVersions` request is authorized for the requested bucket.
/// Checks whether the ListObjectVersions request has accesses to the resources.
///
/// Returns `Ok(())` if the request is allowed, or an error if access is denied or another
/// authorization-related issue occurs.
async fn list_object_versions(&self, req: &mut S3Request<ListObjectVersionsInput>) -> S3Result<()> {
let req_info = req.extensions.get_mut::<ReqInfo>().expect("ReqInfo not found");
req_info.bucket = Some(req.input.bucket.clone());
authorize_request(req, Action::S3Action(S3Action::ListBucketVersionsAction)).await
/// This method returns `Ok(())` by default.
async fn list_object_versions(&self, _req: &mut S3Request<ListObjectVersionsInput>) -> S3Result<()> {
Ok(())
}
/// Checks whether the ListObjects request has accesses to the resources.

View File

@@ -852,6 +852,8 @@ impl S3 for FS {
sse_customer_key_md5,
metadata_directive,
metadata,
copy_source_if_match,
copy_source_if_none_match,
..
} = req.input.clone();
let (src_bucket, src_key, version_id) = match copy_source {
@@ -927,6 +929,30 @@ impl S3 for FS {
let mut src_info = gr.object_info.clone();
// Validate copy source conditions
if let Some(if_match) = copy_source_if_match {
if let Some(ref etag) = src_info.etag {
if let Some(strong_etag) = if_match.into_etag() {
if ETag::Strong(etag.clone()) != strong_etag {
return Err(s3_error!(PreconditionFailed));
}
} else {
// Weak ETag or Any (*) in If-Match should fail per RFC 9110
return Err(s3_error!(PreconditionFailed));
}
} else {
return Err(s3_error!(PreconditionFailed));
}
}
if let Some(if_none_match) = copy_source_if_none_match
&& let Some(ref etag) = src_info.etag
&& let Some(strong_etag) = if_none_match.into_etag()
&& ETag::Strong(etag.clone()) == strong_etag
{
return Err(s3_error!(PreconditionFailed));
}
if cp_src_dst_same {
src_info.metadata_only = true;
}

View File

@@ -162,17 +162,9 @@ impl OperationHelper {
.build();
let mut final_builder = builder.api(api_details.clone());
if let Ok(res) = result {
final_builder = final_builder.resp_header(extract_resp_elements(res));
}
if let Some(err) = error_msg {
final_builder = final_builder.error(err);
}
if let Some(sk) = rustfs_credentials::get_global_access_key_opt() {
final_builder = final_builder.access_key(&sk);
}
self.audit_builder = Some(final_builder);
self.api_builder = ApiDetailsBuilder(api_details); // Store final details for Drop use
}

View File

@@ -17,7 +17,7 @@
# - Metadata: User-defined metadata
# - Conditional GET: If-Match, If-None-Match, If-Modified-Since
#
# Total: 118 tests
# Total: 109 tests
test_basic_key_count
test_bucket_create_naming_bad_short_one
@@ -63,15 +63,6 @@ test_bucket_list_prefix_none
test_bucket_list_prefix_not_exist
test_bucket_list_prefix_unreadable
test_bucket_list_special_prefix
test_bucket_list_delimiter_alt
test_bucket_list_delimiter_dot
test_bucket_list_delimiter_empty
test_bucket_list_delimiter_none
test_bucket_list_delimiter_not_exist
test_bucket_list_delimiter_percentage
test_bucket_list_delimiter_prefix_ends_with_delimiter
test_bucket_list_delimiter_unreadable
test_bucket_list_delimiter_whitespace
test_bucket_listv2_continuationtoken
test_bucket_listv2_continuationtoken_empty
test_bucket_listv2_fetchowner_defaultempty

View File

@@ -105,8 +105,16 @@ test_versioning_obj_plain_null_version_removal
test_versioning_obj_suspend_versions
# Teardown issues (list_object_versions on non-versioned buckets)
# These tests pass but have cleanup issues with list_object_versions
test_bucket_list_delimiter_alt
test_bucket_list_delimiter_basic
test_bucket_list_delimiter_dot
test_bucket_list_delimiter_empty
test_bucket_list_delimiter_none
test_bucket_list_delimiter_not_exist
test_bucket_list_delimiter_percentage
test_bucket_list_delimiter_prefix_ends_with_delimiter
test_bucket_list_delimiter_unreadable
test_bucket_list_delimiter_whitespace
test_bucket_list_encoding_basic
test_bucket_listv2_delimiter_alt
test_bucket_listv2_delimiter_basic