Compare commits

...

12 Commits

Author SHA1 Message Date
guojidan
fba201df3d fix: harden data usage aggregation and cache handling (#1102)
Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-11 09:55:25 +08:00
yxrxy
ccbab3232b fix: ListObjectsV2 correctly handles repeated folder names in prefixes (#1104)
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-11 09:38:52 +08:00
loverustfs
421f66ea18 Disable codeql 2025-12-11 09:29:46 +08:00
yxrxy
ede2fa9d0b fix: is-admin api (For STS/temporary credentials, we need to check the… (#1101)
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-11 08:55:41 +08:00
tennisleng
978845b555 fix(lifecycle): Fix ObjectInfo fields and mod_time error handling (#1088)
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-11 07:17:35 +08:00
Jacob
53c126d678 fix: decode percent-encoded paths in get_file_path() (#1072)
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-10 22:30:02 +08:00
0xdx2
9f12a7678c feat(ci): add codeql to scanner code (#1076) 2025-12-10 21:48:18 +08:00
Jörg Thalheim
2c86fe30ec Content encoding (#1089)
Signed-off-by: Jörg Thalheim <joerg@thalheim.io>
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-10 15:21:51 +08:00
tennisleng
ac0c34e734 fix(lifecycle): Return NoSuchLifecycleConfiguration error for missing lifecycle config (#1087)
Co-authored-by: loverustfs <hello@rustfs.com>
2025-12-10 12:35:22 +08:00
majinghe
ae46ea4bd3 fix github action security found by github CodeQL (#1091) 2025-12-10 12:07:28 +08:00
majinghe
8b3d4ea59b enhancement logs output for container deployment (#1090) 2025-12-10 11:14:05 +08:00
houseme
ef261deef6 improve code for is admin (#1082) 2025-12-09 17:34:47 +08:00
22 changed files with 664 additions and 115 deletions

View File

@@ -5,6 +5,9 @@ on:
workflows: ["Build and Release"]
types: [completed]
permissions:
contents: read
env:
new_version: ${{ github.event.workflow_run.head_branch }}

1
Cargo.lock generated
View File

@@ -3053,6 +3053,7 @@ dependencies = [
"rand 0.10.0-rc.5",
"reqwest",
"rmp-serde",
"rustfs-common",
"rustfs-ecstore",
"rustfs-filemeta",
"rustfs-lock",

View File

@@ -81,12 +81,11 @@ ENV RUSTFS_ADDRESS=":9000" \
RUSTFS_CORS_ALLOWED_ORIGINS="*" \
RUSTFS_CONSOLE_CORS_ALLOWED_ORIGINS="*" \
RUSTFS_VOLUMES="/data" \
RUST_LOG="warn" \
RUSTFS_OBS_LOG_DIRECTORY="/logs"
RUST_LOG="warn"
EXPOSE 9000 9001
VOLUME ["/data", "/logs"]
VOLUME ["/data"]
USER rustfs

View File

@@ -166,14 +166,13 @@ ENV RUSTFS_ADDRESS=":9000" \
RUSTFS_CONSOLE_ENABLE="true" \
RUSTFS_VOLUMES="/data" \
RUST_LOG="warn" \
RUSTFS_OBS_LOG_DIRECTORY="/logs" \
RUSTFS_USERNAME="rustfs" \
RUSTFS_GROUPNAME="rustfs" \
RUSTFS_UID="1000" \
RUSTFS_GID="1000"
EXPOSE 9000
VOLUME ["/data", "/logs"]
VOLUME ["/data"]
# Keep root here; entrypoint will drop privileges using chroot --userspec
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -29,7 +29,7 @@ use rustfs_ecstore::{
self as ecstore, StorageAPI,
bucket::versioning::VersioningApi,
bucket::versioning_sys::BucketVersioningSys,
data_usage::{aggregate_local_snapshots, store_data_usage_in_backend},
data_usage::{aggregate_local_snapshots, compute_bucket_usage, store_data_usage_in_backend},
disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
set_disk::SetDisks,
store_api::ObjectInfo,
@@ -137,6 +137,8 @@ pub struct Scanner {
data_usage_stats: Arc<Mutex<HashMap<String, DataUsageInfo>>>,
/// Last data usage statistics collection time
last_data_usage_collection: Arc<RwLock<Option<SystemTime>>>,
/// Backoff timestamp for heavy fallback collection
fallback_backoff_until: Arc<RwLock<Option<SystemTime>>>,
/// Heal manager for auto-heal integration
heal_manager: Option<Arc<HealManager>>,
@@ -192,6 +194,7 @@ impl Scanner {
disk_metrics: Arc::new(Mutex::new(HashMap::new())),
data_usage_stats: Arc::new(Mutex::new(HashMap::new())),
last_data_usage_collection: Arc::new(RwLock::new(None)),
fallback_backoff_until: Arc::new(RwLock::new(None)),
heal_manager,
node_scanner,
stats_aggregator,
@@ -473,6 +476,8 @@ impl Scanner {
size: usage.total_size as i64,
delete_marker: !usage.has_live_object && usage.delete_markers_count > 0,
mod_time: usage.last_modified_ns.and_then(Self::ns_to_offset_datetime),
// Set is_latest to true for live objects - required for lifecycle expiration evaluation
is_latest: usage.has_live_object,
..Default::default()
}
}
@@ -879,6 +884,7 @@ impl Scanner {
/// Collect and persist data usage statistics
async fn collect_and_persist_data_usage(&self) -> Result<()> {
info!("Starting data usage collection and persistence");
let now = SystemTime::now();
// Get ECStore instance
let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() else {
@@ -886,6 +892,10 @@ impl Scanner {
return Ok(());
};
// Helper to avoid hammering the storage layer with repeated realtime scans.
let mut use_cached_on_backoff = false;
let fallback_backoff_secs = Duration::from_secs(300);
// Run local usage scan and aggregate snapshots; fall back to on-demand build when necessary.
let mut data_usage = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
Ok(outcome) => {
@@ -907,16 +917,55 @@ impl Scanner {
"Failed to aggregate local data usage snapshots, falling back to realtime collection: {}",
e
);
self.build_data_usage_from_ecstore(&ecstore).await?
match self.maybe_fallback_collection(now, fallback_backoff_secs, &ecstore).await? {
Some(usage) => usage,
None => {
use_cached_on_backoff = true;
DataUsageInfo::default()
}
}
}
}
}
Err(e) => {
warn!("Local usage scan failed (using realtime collection instead): {}", e);
self.build_data_usage_from_ecstore(&ecstore).await?
match self.maybe_fallback_collection(now, fallback_backoff_secs, &ecstore).await? {
Some(usage) => usage,
None => {
use_cached_on_backoff = true;
DataUsageInfo::default()
}
}
}
};
// If heavy fallback was skipped due to backoff, try to reuse cached stats to avoid empty responses.
if use_cached_on_backoff && data_usage.buckets_usage.is_empty() {
let cached = {
let guard = self.data_usage_stats.lock().await;
guard.values().next().cloned()
};
if let Some(cached_usage) = cached {
data_usage = cached_usage;
}
// If there is still no data, try backend before persisting zeros
if data_usage.buckets_usage.is_empty() {
if let Ok(existing) = rustfs_ecstore::data_usage::load_data_usage_from_backend(ecstore.clone()).await {
if !existing.buckets_usage.is_empty() {
info!("Using existing backend data usage during fallback backoff");
data_usage = existing;
}
}
}
// Avoid overwriting valid backend stats with zeros when fallback is throttled
if data_usage.buckets_usage.is_empty() {
warn!("Skipping data usage persistence: fallback throttled and no cached/backend data available");
return Ok(());
}
}
// Make sure bucket counters reflect aggregated content
data_usage.buckets_count = data_usage.buckets_usage.len() as u64;
if data_usage.last_update.is_none() {
@@ -959,8 +1008,31 @@ impl Scanner {
Ok(())
}
async fn maybe_fallback_collection(
&self,
now: SystemTime,
backoff: Duration,
ecstore: &Arc<rustfs_ecstore::store::ECStore>,
) -> Result<Option<DataUsageInfo>> {
let backoff_until = *self.fallback_backoff_until.read().await;
let within_backoff = backoff_until.map(|ts| now < ts).unwrap_or(false);
if within_backoff {
warn!(
"Skipping heavy data usage fallback within backoff window (until {:?}); using cached stats if available",
backoff_until
);
return Ok(None);
}
let usage = self.build_data_usage_from_ecstore(ecstore).await?;
let mut backoff_guard = self.fallback_backoff_until.write().await;
*backoff_guard = Some(now + backoff);
Ok(Some(usage))
}
/// Build data usage statistics directly from ECStore
async fn build_data_usage_from_ecstore(&self, ecstore: &Arc<rustfs_ecstore::store::ECStore>) -> Result<DataUsageInfo> {
pub async fn build_data_usage_from_ecstore(&self, ecstore: &Arc<rustfs_ecstore::store::ECStore>) -> Result<DataUsageInfo> {
let mut data_usage = DataUsageInfo::default();
// Get bucket list
@@ -973,6 +1045,8 @@ impl Scanner {
data_usage.last_update = Some(SystemTime::now());
let mut total_objects = 0u64;
let mut total_versions = 0u64;
let mut total_delete_markers = 0u64;
let mut total_size = 0u64;
for bucket_info in buckets {
@@ -980,37 +1054,26 @@ impl Scanner {
continue; // Skip system buckets
}
// Try to get actual object count for this bucket
let (object_count, bucket_size) = match ecstore
.clone()
.list_objects_v2(
&bucket_info.name,
"", // prefix
None, // continuation_token
None, // delimiter
100, // max_keys - small limit for performance
false, // fetch_owner
None, // start_after
false, // incl_deleted
)
.await
{
Ok(result) => {
let count = result.objects.len() as u64;
let size = result.objects.iter().map(|obj| obj.size as u64).sum();
(count, size)
}
Err(_) => (0, 0),
};
// Use ecstore pagination helper to avoid truncating at 100 objects
let (object_count, bucket_size, versions_count, delete_markers) =
match compute_bucket_usage(ecstore.clone(), &bucket_info.name).await {
Ok(usage) => (usage.objects_count, usage.size, usage.versions_count, usage.delete_markers_count),
Err(e) => {
warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e);
(0, 0, 0, 0)
}
};
total_objects += object_count;
total_versions += versions_count;
total_delete_markers += delete_markers;
total_size += bucket_size;
let bucket_usage = rustfs_common::data_usage::BucketUsageInfo {
size: bucket_size,
objects_count: object_count,
versions_count: object_count, // Simplified
delete_markers_count: 0,
versions_count,
delete_markers_count: delete_markers,
..Default::default()
};
@@ -1020,7 +1083,8 @@ impl Scanner {
data_usage.objects_total_count = total_objects;
data_usage.objects_total_size = total_size;
data_usage.versions_total_count = total_objects;
data_usage.versions_total_count = total_versions;
data_usage.delete_markers_total_count = total_delete_markers;
}
Err(e) => {
warn!("Failed to list buckets for data usage collection: {}", e);
@@ -2554,6 +2618,7 @@ impl Scanner {
disk_metrics: Arc::clone(&self.disk_metrics),
data_usage_stats: Arc::clone(&self.data_usage_stats),
last_data_usage_collection: Arc::clone(&self.last_data_usage_collection),
fallback_backoff_until: Arc::clone(&self.fallback_backoff_until),
heal_manager: self.heal_manager.clone(),
node_scanner: Arc::clone(&self.node_scanner),
stats_aggregator: Arc::clone(&self.stats_aggregator),

View File

@@ -84,6 +84,9 @@ pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalSc
guard.clone()
};
// Use the first local online disk in the set to avoid missing stats when disk 0 is down
let mut picked = false;
for (disk_index, disk_opt) in disks.into_iter().enumerate() {
let Some(disk) = disk_opt else {
continue;
@@ -93,11 +96,17 @@ pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalSc
continue;
}
// Count objects once by scanning only disk index zero from each set.
if disk_index != 0 {
if picked {
continue;
}
// Skip offline disks; keep looking for an online candidate
if !disk.is_online().await {
continue;
}
picked = true;
let disk_id = match disk.get_disk_id().await.map_err(Error::from)? {
Some(id) => id.to_string(),
None => {

View File

@@ -347,7 +347,8 @@ impl DecentralizedStatsAggregator {
// update cache
*self.cached_stats.write().await = Some(aggregated.clone());
*self.cache_timestamp.write().await = aggregation_timestamp;
// Use the time when aggregation completes as cache timestamp to avoid premature expiry during long runs
*self.cache_timestamp.write().await = SystemTime::now();
Ok(aggregated)
}
@@ -359,7 +360,8 @@ impl DecentralizedStatsAggregator {
// update cache
*self.cached_stats.write().await = Some(aggregated.clone());
*self.cache_timestamp.write().await = now;
// Cache timestamp should reflect completion time rather than aggregation start
*self.cache_timestamp.write().await = SystemTime::now();
Ok(aggregated)
}

View File

@@ -0,0 +1,97 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(test)]
use rustfs_ahm::scanner::data_scanner::Scanner;
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_ecstore::GLOBAL_Endpoints;
use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetadataSys};
use rustfs_ecstore::endpoints::EndpointServerPools;
use rustfs_ecstore::store::ECStore;
use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
/// Build a minimal single-node ECStore over a temp directory and populate objects.
async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECStore>) {
let temp_dir = TempDir::new().expect("temp dir");
let root = temp_dir.path().to_string_lossy().to_string();
// Create endpoints from the temp dir
let (endpoint_pools, _setup) = EndpointServerPools::from_volumes("127.0.0.1:0", vec![root])
.await
.expect("endpoint pools");
// Seed globals required by metadata sys if not already set
if GLOBAL_Endpoints.get().is_none() {
let _ = GLOBAL_Endpoints.set(endpoint_pools.clone());
}
let store = ECStore::new("127.0.0.1:0".parse().unwrap(), endpoint_pools, CancellationToken::new())
.await
.expect("create store");
if rustfs_ecstore::global::new_object_layer_fn().is_none() {
rustfs_ecstore::global::set_object_layer(store.clone()).await;
}
// Initialize metadata system before bucket operations
if GLOBAL_BucketMetadataSys.get().is_none() {
let mut sys = BucketMetadataSys::new(store.clone());
sys.init(Vec::new()).await;
let _ = GLOBAL_BucketMetadataSys.set(Arc::new(RwLock::new(sys)));
}
store
.make_bucket("fallback-bucket", &rustfs_ecstore::store_api::MakeBucketOptions::default())
.await
.expect("make bucket");
for i in 0..count {
let key = format!("obj-{i:04}");
let data = format!("payload-{i}");
let mut reader = PutObjReader::from_vec(data.into_bytes());
store
.put_object("fallback-bucket", &key, &mut reader, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
.expect("put object");
}
(temp_dir, store)
}
#[tokio::test]
async fn fallback_builds_full_counts_over_100_objects() {
let (_tmp, store) = create_store_with_objects(1000).await;
let scanner = Scanner::new(None, None);
// Directly call the fallback builder to ensure pagination works.
let usage: DataUsageInfo = scanner.build_data_usage_from_ecstore(&store).await.expect("fallback usage");
let bucket = usage.buckets_usage.get("fallback-bucket").expect("bucket usage present");
assert!(
usage.objects_total_count >= 1000,
"total objects should be >=1000, got {}",
usage.objects_total_count
);
assert!(
bucket.objects_count >= 1000,
"bucket objects should be >=1000, got {}",
bucket.objects_count
);
}

View File

@@ -25,6 +25,7 @@ workspace = true
[dependencies]
rustfs-ecstore.workspace = true
rustfs-common.workspace = true
flatbuffers.workspace = true
futures.workspace = true
rustfs-lock.workspace = true
@@ -49,4 +50,4 @@ uuid = { workspace = true }
base64 = { workspace = true }
rand = { workspace = true }
chrono = { workspace = true }
md5 = { workspace = true }
md5 = { workspace = true }

View File

@@ -0,0 +1,85 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! End-to-end test for Content-Encoding header handling
//!
//! Tests that the Content-Encoding header is correctly stored during PUT
//! and returned in GET/HEAD responses. This is important for clients that
//! upload pre-compressed content and rely on the header for decompression.
#[cfg(test)]
mod tests {
use crate::common::{RustFSTestEnvironment, init_logging};
use aws_sdk_s3::primitives::ByteStream;
use serial_test::serial;
use tracing::info;
/// Verify Content-Encoding header roundtrips through PUT, GET, and HEAD operations
#[tokio::test]
#[serial]
async fn test_content_encoding_roundtrip() {
init_logging();
info!("Starting Content-Encoding roundtrip test");
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
let client = env.create_s3_client();
let bucket = "content-encoding-test";
let key = "logs/app.log.zst";
let content = b"2024-01-15 10:23:45 INFO Application started\n2024-01-15 10:23:46 DEBUG Loading config\n";
client
.create_bucket()
.bucket(bucket)
.send()
.await
.expect("Failed to create bucket");
info!("Uploading object with Content-Encoding: zstd");
client
.put_object()
.bucket(bucket)
.key(key)
.content_type("text/plain")
.content_encoding("zstd")
.body(ByteStream::from_static(content))
.send()
.await
.expect("PUT failed");
info!("Verifying GET response includes Content-Encoding");
let get_resp = client.get_object().bucket(bucket).key(key).send().await.expect("GET failed");
assert_eq!(get_resp.content_encoding(), Some("zstd"), "GET should return Content-Encoding: zstd");
assert_eq!(get_resp.content_type(), Some("text/plain"), "GET should return correct Content-Type");
let body = get_resp.body.collect().await.unwrap().into_bytes();
assert_eq!(body.as_ref(), content, "Body content mismatch");
info!("Verifying HEAD response includes Content-Encoding");
let head_resp = client
.head_object()
.bucket(bucket)
.key(key)
.send()
.await
.expect("HEAD failed");
assert_eq!(head_resp.content_encoding(), Some("zstd"), "HEAD should return Content-Encoding: zstd");
assert_eq!(head_resp.content_type(), Some("text/plain"), "HEAD should return correct Content-Type");
env.stop_server();
}
}

View File

@@ -0,0 +1,73 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use aws_sdk_s3::primitives::ByteStream;
use rustfs_common::data_usage::DataUsageInfo;
use serial_test::serial;
use crate::common::{RustFSTestEnvironment, TEST_BUCKET, awscurl_get, init_logging};
/// Regression test for data usage accuracy (issue #1012).
/// Launches rustfs, writes 1000 objects, then asserts admin data usage reports the full count.
#[tokio::test(flavor = "multi_thread")]
#[serial]
#[ignore = "Starts a rustfs server and requires awscurl; enable when running full E2E"]
async fn data_usage_reports_all_objects() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_logging();
let mut env = RustFSTestEnvironment::new().await?;
env.start_rustfs_server(vec![]).await?;
let client = env.create_s3_client();
// Create bucket and upload objects
client.create_bucket().bucket(TEST_BUCKET).send().await?;
for i in 0..1000 {
let key = format!("obj-{i:04}");
client
.put_object()
.bucket(TEST_BUCKET)
.key(key)
.body(ByteStream::from_static(b"hello-world"))
.send()
.await?;
}
// Query admin data usage API
let url = format!("{}/rustfs/admin/v3/datausageinfo", env.url);
let resp = awscurl_get(&url, &env.access_key, &env.secret_key).await?;
let usage: DataUsageInfo = serde_json::from_str(&resp)?;
// Assert total object count and per-bucket count are not truncated
let bucket_usage = usage
.buckets_usage
.get(TEST_BUCKET)
.cloned()
.expect("bucket usage should exist");
assert!(
usage.objects_total_count >= 1000,
"total object count should be at least 1000, got {}",
usage.objects_total_count
);
assert!(
bucket_usage.objects_count >= 1000,
"bucket object count should be at least 1000, got {}",
bucket_usage.objects_count
);
env.stop_server();
Ok(())
}

View File

@@ -18,6 +18,10 @@ mod reliant;
#[cfg(test)]
pub mod common;
// Data usage regression tests
#[cfg(test)]
mod data_usage_test;
// KMS-specific test modules
#[cfg(test)]
mod kms;
@@ -25,3 +29,7 @@ mod kms;
// Special characters in path test modules
#[cfg(test)]
mod special_chars_test;
// Content-Encoding header preservation test
#[cfg(test)]
mod content_encoding_test;

View File

@@ -283,7 +283,17 @@ impl Lifecycle for BucketLifecycleConfiguration {
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
);
if obj.mod_time.expect("err").unix_timestamp() == 0 {
// Gracefully handle missing mod_time instead of panicking
let mod_time = match obj.mod_time {
Some(t) => t,
None => {
info!("eval_inner: mod_time is None for object={}, returning default event", obj.name);
return Event::default();
}
};
if mod_time.unix_timestamp() == 0 {
info!("eval_inner: mod_time is 0, returning default event");
return Event::default();
}
@@ -323,7 +333,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
let expected_expiry = expected_expiry_time(mod_time, days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
@@ -446,11 +456,11 @@ impl Lifecycle for BucketLifecycleConfiguration {
});
}
} else if let Some(days) = expiration.days {
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
let expected_expiry: OffsetDateTime = expected_expiry_time(mod_time, days);
info!(
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
days,
obj.mod_time.expect("err!"),
mod_time,
expected_expiry,
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()

View File

@@ -32,6 +32,7 @@ use rustfs_common::data_usage::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary,
};
use rustfs_utils::path::SLASH_SEPARATOR;
use tokio::fs;
use tracing::{error, info, warn};
use crate::error::Error;
@@ -63,6 +64,21 @@ lazy_static::lazy_static! {
/// Store data usage info to backend storage
pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: Arc<ECStore>) -> Result<(), Error> {
// Prevent older data from overwriting newer persisted stats
if let Ok(buf) = read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await {
if let Ok(existing) = serde_json::from_slice::<DataUsageInfo>(&buf) {
if let (Some(new_ts), Some(existing_ts)) = (data_usage_info.last_update, existing.last_update) {
if new_ts <= existing_ts {
info!(
"Skip persisting data usage: incoming last_update {:?} <= existing {:?}",
new_ts, existing_ts
);
return Ok(());
}
}
}
}
let data =
serde_json::to_vec(&data_usage_info).map_err(|e| Error::other(format!("Failed to serialize data usage info: {e}")))?;
@@ -160,6 +176,39 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
}
/// Aggregate usage information from local disk snapshots.
fn merge_snapshot(aggregated: &mut DataUsageInfo, mut snapshot: LocalUsageSnapshot, latest_update: &mut Option<SystemTime>) {
if let Some(update) = snapshot.last_update {
if latest_update.is_none_or(|current| update > current) {
*latest_update = Some(update);
}
}
snapshot.recompute_totals();
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
aggregated.versions_total_count = aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
aggregated.delete_markers_total_count = aggregated
.delete_markers_total_count
.saturating_add(snapshot.delete_markers_total_count);
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
let bucket_size = usage.size;
match aggregated.buckets_usage.entry(bucket.clone()) {
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
Entry::Vacant(entry) => {
entry.insert(usage.clone());
}
}
aggregated
.bucket_sizes
.entry(bucket)
.and_modify(|size| *size = size.saturating_add(bucket_size))
.or_insert(bucket_size);
}
}
pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskUsageStatus>, DataUsageInfo), Error> {
let mut aggregated = DataUsageInfo::default();
let mut latest_update: Option<SystemTime> = None;
@@ -196,7 +245,24 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
snapshot_exists: false,
};
if let Some(mut snapshot) = read_local_snapshot(root.as_path(), &disk_id).await? {
let snapshot_result = read_local_snapshot(root.as_path(), &disk_id).await;
// If a snapshot is corrupted or unreadable, skip it but keep processing others
if let Err(err) = &snapshot_result {
warn!(
"Failed to read data usage snapshot for disk {} (pool {}, set {}, disk {}): {}",
disk_id, pool_idx, set_disks.set_index, disk_index, err
);
// Best-effort cleanup so next scan can rebuild a fresh snapshot instead of repeatedly failing
let snapshot_file = snapshot_path(root.as_path(), &disk_id);
if let Err(remove_err) = fs::remove_file(&snapshot_file).await {
if remove_err.kind() != std::io::ErrorKind::NotFound {
warn!("Failed to remove corrupted snapshot {:?}: {}", snapshot_file, remove_err);
}
}
}
if let Ok(Some(mut snapshot)) = snapshot_result {
status.last_update = snapshot.last_update;
status.snapshot_exists = true;
@@ -213,37 +279,7 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
snapshot.meta.disk_index = Some(disk_index);
}
snapshot.recompute_totals();
if let Some(update) = snapshot.last_update {
if latest_update.is_none_or(|current| update > current) {
latest_update = Some(update);
}
}
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
aggregated.versions_total_count =
aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
aggregated.delete_markers_total_count = aggregated
.delete_markers_total_count
.saturating_add(snapshot.delete_markers_total_count);
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
let bucket_size = usage.size;
match aggregated.buckets_usage.entry(bucket.clone()) {
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
Entry::Vacant(entry) => {
entry.insert(usage.clone());
}
}
aggregated
.bucket_sizes
.entry(bucket)
.and_modify(|size| *size = size.saturating_add(bucket_size))
.or_insert(bucket_size);
}
merge_snapshot(&mut aggregated, snapshot, &mut latest_update);
}
statuses.push(status);
@@ -549,3 +585,94 @@ pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate:
save_config(store, &name, buf).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rustfs_common::data_usage::BucketUsageInfo;
fn aggregate_for_test(
inputs: Vec<(DiskUsageStatus, Result<Option<LocalUsageSnapshot>, Error>)>,
) -> (Vec<DiskUsageStatus>, DataUsageInfo) {
let mut aggregated = DataUsageInfo::default();
let mut latest_update: Option<SystemTime> = None;
let mut statuses = Vec::new();
for (mut status, snapshot_result) in inputs {
if let Ok(Some(snapshot)) = snapshot_result {
status.snapshot_exists = true;
status.last_update = snapshot.last_update;
merge_snapshot(&mut aggregated, snapshot, &mut latest_update);
}
statuses.push(status);
}
aggregated.buckets_count = aggregated.buckets_usage.len() as u64;
aggregated.last_update = latest_update;
aggregated.disk_usage_status = statuses.clone();
(statuses, aggregated)
}
#[test]
fn aggregate_skips_corrupted_snapshot_and_preserves_other_disks() {
let mut good_snapshot = LocalUsageSnapshot::new(LocalUsageSnapshotMeta {
disk_id: "good-disk".to_string(),
pool_index: Some(0),
set_index: Some(0),
disk_index: Some(0),
});
good_snapshot.last_update = Some(SystemTime::now());
good_snapshot.buckets_usage.insert(
"bucket-a".to_string(),
BucketUsageInfo {
objects_count: 3,
versions_count: 3,
size: 42,
..Default::default()
},
);
good_snapshot.recompute_totals();
let bad_snapshot_err: Result<Option<LocalUsageSnapshot>, Error> = Err(Error::other("corrupted snapshot payload"));
let inputs = vec![
(
DiskUsageStatus {
disk_id: "bad-disk".to_string(),
pool_index: Some(0),
set_index: Some(0),
disk_index: Some(1),
last_update: None,
snapshot_exists: false,
},
bad_snapshot_err,
),
(
DiskUsageStatus {
disk_id: "good-disk".to_string(),
pool_index: Some(0),
set_index: Some(0),
disk_index: Some(0),
last_update: None,
snapshot_exists: false,
},
Ok(Some(good_snapshot)),
),
];
let (statuses, aggregated) = aggregate_for_test(inputs);
// Bad disk stays non-existent, good disk is marked present
let bad_status = statuses.iter().find(|s| s.disk_id == "bad-disk").unwrap();
assert!(!bad_status.snapshot_exists);
let good_status = statuses.iter().find(|s| s.disk_id == "good-disk").unwrap();
assert!(good_status.snapshot_exists);
// Aggregated data is from good snapshot only
assert_eq!(aggregated.objects_total_count, 3);
assert_eq!(aggregated.objects_total_size, 42);
assert_eq!(aggregated.buckets_count, 1);
assert_eq!(aggregated.buckets_usage.get("bucket-a").map(|b| (b.objects_count, b.size)), Some((3, 42)));
}
}

View File

@@ -198,15 +198,22 @@ impl Endpoint {
}
}
pub fn get_file_path(&self) -> &str {
let path = self.url.path();
pub fn get_file_path(&self) -> String {
let path: &str = self.url.path();
let decoded: std::borrow::Cow<'_, str> = match urlencoding::decode(path) {
Ok(decoded) => decoded,
Err(e) => {
debug!("Failed to decode path '{}': {}, using original path", path, e);
std::borrow::Cow::Borrowed(path)
}
};
#[cfg(windows)]
if self.url.scheme() == "file" {
let stripped = path.strip_prefix('/').unwrap_or(path);
let stripped: &str = decoded.strip_prefix('/').unwrap_or(&decoded);
debug!("get_file_path windows: path={}", stripped);
return stripped;
return stripped.to_string();
}
path
decoded.into_owned()
}
}
@@ -501,6 +508,45 @@ mod test {
assert_eq!(endpoint.get_type(), EndpointType::Path);
}
#[test]
fn test_endpoint_with_spaces_in_path() {
let path_with_spaces = "/Users/test/Library/Application Support/rustfs/data";
let endpoint = Endpoint::try_from(path_with_spaces).unwrap();
assert_eq!(endpoint.get_file_path(), path_with_spaces);
assert!(endpoint.is_local);
assert_eq!(endpoint.get_type(), EndpointType::Path);
}
#[test]
fn test_endpoint_percent_encoding_roundtrip() {
let path_with_spaces = "/Users/test/Library/Application Support/rustfs/data";
let endpoint = Endpoint::try_from(path_with_spaces).unwrap();
// Verify that the URL internally stores percent-encoded path
assert!(
endpoint.url.path().contains("%20"),
"URL path should contain percent-encoded spaces: {}",
endpoint.url.path()
);
// Verify that get_file_path() decodes the percent-encoded path correctly
assert_eq!(
endpoint.get_file_path(),
"/Users/test/Library/Application Support/rustfs/data",
"get_file_path() should decode percent-encoded spaces"
);
}
#[test]
fn test_endpoint_with_various_special_characters() {
// Test path with multiple special characters that get percent-encoded
let path_with_special = "/tmp/test path/data[1]/file+name&more";
let endpoint = Endpoint::try_from(path_with_special).unwrap();
// get_file_path() should return the original path with decoded characters
assert_eq!(endpoint.get_file_path(), path_with_special);
}
#[test]
fn test_endpoint_update_is_local() {
let mut endpoint = Endpoint::try_from("http://localhost:9000/path").unwrap();

View File

@@ -232,7 +232,7 @@ impl PoolEndpointList {
for endpoints in pool_endpoint_list.inner.iter_mut() {
// Check whether same path is not used in endpoints of a host on different port.
let mut path_ip_map: HashMap<&str, HashSet<IpAddr>> = HashMap::new();
let mut path_ip_map: HashMap<String, HashSet<IpAddr>> = HashMap::new();
let mut host_ip_cache = HashMap::new();
for ep in endpoints.as_ref() {
if !ep.url.has_host() {
@@ -275,8 +275,9 @@ impl PoolEndpointList {
match path_ip_map.entry(path) {
Entry::Occupied(mut e) => {
if e.get().intersection(host_ip_set).count() > 0 {
let path_key = e.key().clone();
return Err(Error::other(format!(
"same path '{path}' can not be served by different port on same address"
"same path '{path_key}' can not be served by different port on same address"
)));
}
e.get_mut().extend(host_ip_set.iter());
@@ -295,7 +296,7 @@ impl PoolEndpointList {
}
let path = ep.get_file_path();
if local_path_set.contains(path) {
if local_path_set.contains(&path) {
return Err(Error::other(format!(
"path '{path}' cannot be served by different address on same server"
)));

View File

@@ -827,7 +827,12 @@ impl ObjectInfo {
for entry in entries.entries() {
if entry.is_object() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let remaining = if entry.name.starts_with(prefix) {
&entry.name[prefix.len()..]
} else {
entry.name.as_str()
};
if let Some(idx) = remaining.find(delimiter.as_str()) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
@@ -878,7 +883,14 @@ impl ObjectInfo {
if entry.is_dir() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
if let Some(idx) = {
let remaining = if entry.name.starts_with(prefix) {
&entry.name[prefix.len()..]
} else {
entry.name.as_str()
};
remaining.find(delimiter.as_str())
} {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
@@ -914,7 +926,12 @@ impl ObjectInfo {
for entry in entries.entries() {
if entry.is_object() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
let remaining = if entry.name.starts_with(prefix) {
&entry.name[prefix.len()..]
} else {
entry.name.as_str()
};
if let Some(idx) = remaining.find(delimiter.as_str()) {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {
@@ -951,7 +968,14 @@ impl ObjectInfo {
if entry.is_dir() {
if let Some(delimiter) = &delimiter {
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
if let Some(idx) = {
let remaining = if entry.name.starts_with(prefix) {
&entry.name[prefix.len()..]
} else {
entry.name.as_str()
};
remaining.find(delimiter.as_str())
} {
let idx = prefix.len() + idx + delimiter.len();
if let Some(curr_prefix) = entry.name.get(0..idx) {
if curr_prefix == prev_prefix {

View File

@@ -85,6 +85,7 @@ services:
- RUSTFS_ACCESS_KEY=devadmin
- RUSTFS_SECRET_KEY=devadmin
- RUSTFS_OBS_LOGGER_LEVEL=debug
- RUSTFS_OBS_LOG_DIRECTORY=/logs
volumes:
- .:/app # Mount source code to /app for development
- deploy/data/dev:/data
@@ -180,20 +181,6 @@ services:
profiles:
- observability
# Redis for caching (optional)
redis:
image: redis:7-alpine
container_name: redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- rustfs-network
restart: unless-stopped
profiles:
- cache
# NGINX reverse proxy (optional)
nginx:
security_opt:
@@ -241,7 +228,5 @@ volumes:
driver: local
grafana_data:
driver: local
redis_data:
driver: local
logs:
driver: local

View File

@@ -29,6 +29,7 @@ x-node-template: &node-template
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_CMD=rustfs
- RUSTFS_OBS_LOG_DIRECTORY=/logs
command: [ "sh", "-c", "sleep 3 && rustfs" ]
healthcheck:
test:

View File

@@ -55,7 +55,20 @@ process_data_volumes() {
# 3) Process log directory (separate from data volumes)
process_log_directory() {
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY:-/logs}"
# Output logs to stdout
if [ -z "$RUSTFS_OBS_LOG_DIRECTORY" ]; then
echo "OBS log directory not configured and logs outputs to stdout"
return
fi
# Output logs to remote endpoint
if [ "${RUSTFS_OBS_LOG_DIRECTORY}" != "${RUSTFS_OBS_LOG_DIRECTORY#*://}" ]; then
echo "Output logs to remote endpoint"
return
fi
# Outputs logs to local directory
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY}"
echo "Initializing log directory: $LOG_DIR"
if [ ! -d "$LOG_DIR" ]; then

View File

@@ -90,7 +90,6 @@ pub mod trace;
pub mod user;
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct IsAdminResponse {
pub is_admin: bool,
pub access_key: String,
@@ -159,14 +158,15 @@ impl Operation for IsAdminHandler {
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let (_cred, _owner) =
let (cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let access_key_to_check = input_cred.access_key.clone();
// Check if the user is admin by comparing with global credentials
let is_admin = if let Some(sys_cred) = get_global_action_cred() {
sys_cred.access_key == access_key_to_check
crate::auth::constant_time_eq(&access_key_to_check, &sys_cred.access_key)
|| crate::auth::constant_time_eq(&cred.parent_user, &sys_cred.access_key)
} else {
false
};
@@ -174,7 +174,7 @@ impl Operation for IsAdminHandler {
let response = IsAdminResponse {
is_admin,
access_key: access_key_to_check,
message: format!("User is {} an administrator", if is_admin { "" } else { "not" }),
message: format!("User is {}an administrator", if is_admin { "" } else { "not " }),
};
let data = serde_json::to_vec(&response)

View File

@@ -2272,6 +2272,7 @@ impl S3 for FS {
content_length: Some(response_content_length),
last_modified,
content_type,
content_encoding: info.content_encoding.clone(),
accept_ranges: Some("bytes".to_string()),
content_range,
e_tag: info.etag.map(|etag| to_s3s_etag(&etag)),
@@ -2487,6 +2488,7 @@ impl S3 for FS {
let output = HeadObjectOutput {
content_length: Some(content_length),
content_type,
content_encoding: info.content_encoding.clone(),
last_modified,
e_tag: info.etag.map(|etag| to_s3s_etag(&etag)),
metadata: filter_object_metadata(&metadata_map),
@@ -4518,18 +4520,16 @@ impl S3 for FS {
.map_err(ApiError::from)?;
let rules = match metadata_sys::get_lifecycle_config(&bucket).await {
Ok((cfg, _)) => Some(cfg.rules),
Ok((cfg, _)) => cfg.rules,
Err(_err) => {
// if BucketMetadataError::BucketLifecycleNotFound.is(&err) {
// return Err(s3_error!(NoSuchLifecycleConfiguration));
// }
// warn!("get_lifecycle_config err {:?}", err);
None
// Return NoSuchLifecycleConfiguration error as expected by S3 clients
// This fixes issue #990 where Ansible S3 roles fail with KeyError: 'Rules'
return Err(s3_error!(NoSuchLifecycleConfiguration));
}
};
Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
rules,
rules: Some(rules),
..Default::default()
}))
}