mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
12 Commits
feat/scan
...
1.0.0-alph
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fba201df3d | ||
|
|
ccbab3232b | ||
|
|
421f66ea18 | ||
|
|
ede2fa9d0b | ||
|
|
978845b555 | ||
|
|
53c126d678 | ||
|
|
9f12a7678c | ||
|
|
2c86fe30ec | ||
|
|
ac0c34e734 | ||
|
|
ae46ea4bd3 | ||
|
|
8b3d4ea59b | ||
|
|
ef261deef6 |
3
.github/workflows/helm-package.yml
vendored
3
.github/workflows/helm-package.yml
vendored
@@ -5,6 +5,9 @@ on:
|
||||
workflows: ["Build and Release"]
|
||||
types: [completed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
new_version: ${{ github.event.workflow_run.head_branch }}
|
||||
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3053,6 +3053,7 @@ dependencies = [
|
||||
"rand 0.10.0-rc.5",
|
||||
"reqwest",
|
||||
"rmp-serde",
|
||||
"rustfs-common",
|
||||
"rustfs-ecstore",
|
||||
"rustfs-filemeta",
|
||||
"rustfs-lock",
|
||||
|
||||
@@ -81,12 +81,11 @@ ENV RUSTFS_ADDRESS=":9000" \
|
||||
RUSTFS_CORS_ALLOWED_ORIGINS="*" \
|
||||
RUSTFS_CONSOLE_CORS_ALLOWED_ORIGINS="*" \
|
||||
RUSTFS_VOLUMES="/data" \
|
||||
RUST_LOG="warn" \
|
||||
RUSTFS_OBS_LOG_DIRECTORY="/logs"
|
||||
RUST_LOG="warn"
|
||||
|
||||
EXPOSE 9000 9001
|
||||
|
||||
VOLUME ["/data", "/logs"]
|
||||
VOLUME ["/data"]
|
||||
|
||||
USER rustfs
|
||||
|
||||
|
||||
@@ -166,14 +166,13 @@ ENV RUSTFS_ADDRESS=":9000" \
|
||||
RUSTFS_CONSOLE_ENABLE="true" \
|
||||
RUSTFS_VOLUMES="/data" \
|
||||
RUST_LOG="warn" \
|
||||
RUSTFS_OBS_LOG_DIRECTORY="/logs" \
|
||||
RUSTFS_USERNAME="rustfs" \
|
||||
RUSTFS_GROUPNAME="rustfs" \
|
||||
RUSTFS_UID="1000" \
|
||||
RUSTFS_GID="1000"
|
||||
|
||||
EXPOSE 9000
|
||||
VOLUME ["/data", "/logs"]
|
||||
VOLUME ["/data"]
|
||||
|
||||
# Keep root here; entrypoint will drop privileges using chroot --userspec
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
|
||||
@@ -29,7 +29,7 @@ use rustfs_ecstore::{
|
||||
self as ecstore, StorageAPI,
|
||||
bucket::versioning::VersioningApi,
|
||||
bucket::versioning_sys::BucketVersioningSys,
|
||||
data_usage::{aggregate_local_snapshots, store_data_usage_in_backend},
|
||||
data_usage::{aggregate_local_snapshots, compute_bucket_usage, store_data_usage_in_backend},
|
||||
disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
|
||||
set_disk::SetDisks,
|
||||
store_api::ObjectInfo,
|
||||
@@ -137,6 +137,8 @@ pub struct Scanner {
|
||||
data_usage_stats: Arc<Mutex<HashMap<String, DataUsageInfo>>>,
|
||||
/// Last data usage statistics collection time
|
||||
last_data_usage_collection: Arc<RwLock<Option<SystemTime>>>,
|
||||
/// Backoff timestamp for heavy fallback collection
|
||||
fallback_backoff_until: Arc<RwLock<Option<SystemTime>>>,
|
||||
/// Heal manager for auto-heal integration
|
||||
heal_manager: Option<Arc<HealManager>>,
|
||||
|
||||
@@ -192,6 +194,7 @@ impl Scanner {
|
||||
disk_metrics: Arc::new(Mutex::new(HashMap::new())),
|
||||
data_usage_stats: Arc::new(Mutex::new(HashMap::new())),
|
||||
last_data_usage_collection: Arc::new(RwLock::new(None)),
|
||||
fallback_backoff_until: Arc::new(RwLock::new(None)),
|
||||
heal_manager,
|
||||
node_scanner,
|
||||
stats_aggregator,
|
||||
@@ -473,6 +476,8 @@ impl Scanner {
|
||||
size: usage.total_size as i64,
|
||||
delete_marker: !usage.has_live_object && usage.delete_markers_count > 0,
|
||||
mod_time: usage.last_modified_ns.and_then(Self::ns_to_offset_datetime),
|
||||
// Set is_latest to true for live objects - required for lifecycle expiration evaluation
|
||||
is_latest: usage.has_live_object,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -879,6 +884,7 @@ impl Scanner {
|
||||
/// Collect and persist data usage statistics
|
||||
async fn collect_and_persist_data_usage(&self) -> Result<()> {
|
||||
info!("Starting data usage collection and persistence");
|
||||
let now = SystemTime::now();
|
||||
|
||||
// Get ECStore instance
|
||||
let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() else {
|
||||
@@ -886,6 +892,10 @@ impl Scanner {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Helper to avoid hammering the storage layer with repeated realtime scans.
|
||||
let mut use_cached_on_backoff = false;
|
||||
let fallback_backoff_secs = Duration::from_secs(300);
|
||||
|
||||
// Run local usage scan and aggregate snapshots; fall back to on-demand build when necessary.
|
||||
let mut data_usage = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
|
||||
Ok(outcome) => {
|
||||
@@ -907,16 +917,55 @@ impl Scanner {
|
||||
"Failed to aggregate local data usage snapshots, falling back to realtime collection: {}",
|
||||
e
|
||||
);
|
||||
self.build_data_usage_from_ecstore(&ecstore).await?
|
||||
match self.maybe_fallback_collection(now, fallback_backoff_secs, &ecstore).await? {
|
||||
Some(usage) => usage,
|
||||
None => {
|
||||
use_cached_on_backoff = true;
|
||||
DataUsageInfo::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Local usage scan failed (using realtime collection instead): {}", e);
|
||||
self.build_data_usage_from_ecstore(&ecstore).await?
|
||||
match self.maybe_fallback_collection(now, fallback_backoff_secs, &ecstore).await? {
|
||||
Some(usage) => usage,
|
||||
None => {
|
||||
use_cached_on_backoff = true;
|
||||
DataUsageInfo::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If heavy fallback was skipped due to backoff, try to reuse cached stats to avoid empty responses.
|
||||
if use_cached_on_backoff && data_usage.buckets_usage.is_empty() {
|
||||
let cached = {
|
||||
let guard = self.data_usage_stats.lock().await;
|
||||
guard.values().next().cloned()
|
||||
};
|
||||
if let Some(cached_usage) = cached {
|
||||
data_usage = cached_usage;
|
||||
}
|
||||
|
||||
// If there is still no data, try backend before persisting zeros
|
||||
if data_usage.buckets_usage.is_empty() {
|
||||
if let Ok(existing) = rustfs_ecstore::data_usage::load_data_usage_from_backend(ecstore.clone()).await {
|
||||
if !existing.buckets_usage.is_empty() {
|
||||
info!("Using existing backend data usage during fallback backoff");
|
||||
data_usage = existing;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid overwriting valid backend stats with zeros when fallback is throttled
|
||||
if data_usage.buckets_usage.is_empty() {
|
||||
warn!("Skipping data usage persistence: fallback throttled and no cached/backend data available");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure bucket counters reflect aggregated content
|
||||
data_usage.buckets_count = data_usage.buckets_usage.len() as u64;
|
||||
if data_usage.last_update.is_none() {
|
||||
@@ -959,8 +1008,31 @@ impl Scanner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn maybe_fallback_collection(
|
||||
&self,
|
||||
now: SystemTime,
|
||||
backoff: Duration,
|
||||
ecstore: &Arc<rustfs_ecstore::store::ECStore>,
|
||||
) -> Result<Option<DataUsageInfo>> {
|
||||
let backoff_until = *self.fallback_backoff_until.read().await;
|
||||
let within_backoff = backoff_until.map(|ts| now < ts).unwrap_or(false);
|
||||
|
||||
if within_backoff {
|
||||
warn!(
|
||||
"Skipping heavy data usage fallback within backoff window (until {:?}); using cached stats if available",
|
||||
backoff_until
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let usage = self.build_data_usage_from_ecstore(ecstore).await?;
|
||||
let mut backoff_guard = self.fallback_backoff_until.write().await;
|
||||
*backoff_guard = Some(now + backoff);
|
||||
Ok(Some(usage))
|
||||
}
|
||||
|
||||
/// Build data usage statistics directly from ECStore
|
||||
async fn build_data_usage_from_ecstore(&self, ecstore: &Arc<rustfs_ecstore::store::ECStore>) -> Result<DataUsageInfo> {
|
||||
pub async fn build_data_usage_from_ecstore(&self, ecstore: &Arc<rustfs_ecstore::store::ECStore>) -> Result<DataUsageInfo> {
|
||||
let mut data_usage = DataUsageInfo::default();
|
||||
|
||||
// Get bucket list
|
||||
@@ -973,6 +1045,8 @@ impl Scanner {
|
||||
data_usage.last_update = Some(SystemTime::now());
|
||||
|
||||
let mut total_objects = 0u64;
|
||||
let mut total_versions = 0u64;
|
||||
let mut total_delete_markers = 0u64;
|
||||
let mut total_size = 0u64;
|
||||
|
||||
for bucket_info in buckets {
|
||||
@@ -980,37 +1054,26 @@ impl Scanner {
|
||||
continue; // Skip system buckets
|
||||
}
|
||||
|
||||
// Try to get actual object count for this bucket
|
||||
let (object_count, bucket_size) = match ecstore
|
||||
.clone()
|
||||
.list_objects_v2(
|
||||
&bucket_info.name,
|
||||
"", // prefix
|
||||
None, // continuation_token
|
||||
None, // delimiter
|
||||
100, // max_keys - small limit for performance
|
||||
false, // fetch_owner
|
||||
None, // start_after
|
||||
false, // incl_deleted
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let count = result.objects.len() as u64;
|
||||
let size = result.objects.iter().map(|obj| obj.size as u64).sum();
|
||||
(count, size)
|
||||
}
|
||||
Err(_) => (0, 0),
|
||||
};
|
||||
// Use ecstore pagination helper to avoid truncating at 100 objects
|
||||
let (object_count, bucket_size, versions_count, delete_markers) =
|
||||
match compute_bucket_usage(ecstore.clone(), &bucket_info.name).await {
|
||||
Ok(usage) => (usage.objects_count, usage.size, usage.versions_count, usage.delete_markers_count),
|
||||
Err(e) => {
|
||||
warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e);
|
||||
(0, 0, 0, 0)
|
||||
}
|
||||
};
|
||||
|
||||
total_objects += object_count;
|
||||
total_versions += versions_count;
|
||||
total_delete_markers += delete_markers;
|
||||
total_size += bucket_size;
|
||||
|
||||
let bucket_usage = rustfs_common::data_usage::BucketUsageInfo {
|
||||
size: bucket_size,
|
||||
objects_count: object_count,
|
||||
versions_count: object_count, // Simplified
|
||||
delete_markers_count: 0,
|
||||
versions_count,
|
||||
delete_markers_count: delete_markers,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -1020,7 +1083,8 @@ impl Scanner {
|
||||
|
||||
data_usage.objects_total_count = total_objects;
|
||||
data_usage.objects_total_size = total_size;
|
||||
data_usage.versions_total_count = total_objects;
|
||||
data_usage.versions_total_count = total_versions;
|
||||
data_usage.delete_markers_total_count = total_delete_markers;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to list buckets for data usage collection: {}", e);
|
||||
@@ -2554,6 +2618,7 @@ impl Scanner {
|
||||
disk_metrics: Arc::clone(&self.disk_metrics),
|
||||
data_usage_stats: Arc::clone(&self.data_usage_stats),
|
||||
last_data_usage_collection: Arc::clone(&self.last_data_usage_collection),
|
||||
fallback_backoff_until: Arc::clone(&self.fallback_backoff_until),
|
||||
heal_manager: self.heal_manager.clone(),
|
||||
node_scanner: Arc::clone(&self.node_scanner),
|
||||
stats_aggregator: Arc::clone(&self.stats_aggregator),
|
||||
|
||||
@@ -84,6 +84,9 @@ pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalSc
|
||||
guard.clone()
|
||||
};
|
||||
|
||||
// Use the first local online disk in the set to avoid missing stats when disk 0 is down
|
||||
let mut picked = false;
|
||||
|
||||
for (disk_index, disk_opt) in disks.into_iter().enumerate() {
|
||||
let Some(disk) = disk_opt else {
|
||||
continue;
|
||||
@@ -93,11 +96,17 @@ pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalSc
|
||||
continue;
|
||||
}
|
||||
|
||||
// Count objects once by scanning only disk index zero from each set.
|
||||
if disk_index != 0 {
|
||||
if picked {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip offline disks; keep looking for an online candidate
|
||||
if !disk.is_online().await {
|
||||
continue;
|
||||
}
|
||||
|
||||
picked = true;
|
||||
|
||||
let disk_id = match disk.get_disk_id().await.map_err(Error::from)? {
|
||||
Some(id) => id.to_string(),
|
||||
None => {
|
||||
|
||||
@@ -347,7 +347,8 @@ impl DecentralizedStatsAggregator {
|
||||
|
||||
// update cache
|
||||
*self.cached_stats.write().await = Some(aggregated.clone());
|
||||
*self.cache_timestamp.write().await = aggregation_timestamp;
|
||||
// Use the time when aggregation completes as cache timestamp to avoid premature expiry during long runs
|
||||
*self.cache_timestamp.write().await = SystemTime::now();
|
||||
|
||||
Ok(aggregated)
|
||||
}
|
||||
@@ -359,7 +360,8 @@ impl DecentralizedStatsAggregator {
|
||||
|
||||
// update cache
|
||||
*self.cached_stats.write().await = Some(aggregated.clone());
|
||||
*self.cache_timestamp.write().await = now;
|
||||
// Cache timestamp should reflect completion time rather than aggregation start
|
||||
*self.cache_timestamp.write().await = SystemTime::now();
|
||||
|
||||
Ok(aggregated)
|
||||
}
|
||||
|
||||
97
crates/ahm/tests/data_usage_fallback_test.rs
Normal file
97
crates/ahm/tests/data_usage_fallback_test.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![cfg(test)]
|
||||
|
||||
use rustfs_ahm::scanner::data_scanner::Scanner;
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use rustfs_ecstore::GLOBAL_Endpoints;
|
||||
use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetadataSys};
|
||||
use rustfs_ecstore::endpoints::EndpointServerPools;
|
||||
use rustfs_ecstore::store::ECStore;
|
||||
use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI};
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Build a minimal single-node ECStore over a temp directory and populate objects.
|
||||
async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECStore>) {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let root = temp_dir.path().to_string_lossy().to_string();
|
||||
|
||||
// Create endpoints from the temp dir
|
||||
let (endpoint_pools, _setup) = EndpointServerPools::from_volumes("127.0.0.1:0", vec![root])
|
||||
.await
|
||||
.expect("endpoint pools");
|
||||
|
||||
// Seed globals required by metadata sys if not already set
|
||||
if GLOBAL_Endpoints.get().is_none() {
|
||||
let _ = GLOBAL_Endpoints.set(endpoint_pools.clone());
|
||||
}
|
||||
|
||||
let store = ECStore::new("127.0.0.1:0".parse().unwrap(), endpoint_pools, CancellationToken::new())
|
||||
.await
|
||||
.expect("create store");
|
||||
|
||||
if rustfs_ecstore::global::new_object_layer_fn().is_none() {
|
||||
rustfs_ecstore::global::set_object_layer(store.clone()).await;
|
||||
}
|
||||
|
||||
// Initialize metadata system before bucket operations
|
||||
if GLOBAL_BucketMetadataSys.get().is_none() {
|
||||
let mut sys = BucketMetadataSys::new(store.clone());
|
||||
sys.init(Vec::new()).await;
|
||||
let _ = GLOBAL_BucketMetadataSys.set(Arc::new(RwLock::new(sys)));
|
||||
}
|
||||
|
||||
store
|
||||
.make_bucket("fallback-bucket", &rustfs_ecstore::store_api::MakeBucketOptions::default())
|
||||
.await
|
||||
.expect("make bucket");
|
||||
|
||||
for i in 0..count {
|
||||
let key = format!("obj-{i:04}");
|
||||
let data = format!("payload-{i}");
|
||||
let mut reader = PutObjReader::from_vec(data.into_bytes());
|
||||
store
|
||||
.put_object("fallback-bucket", &key, &mut reader, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
.expect("put object");
|
||||
}
|
||||
|
||||
(temp_dir, store)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fallback_builds_full_counts_over_100_objects() {
|
||||
let (_tmp, store) = create_store_with_objects(1000).await;
|
||||
let scanner = Scanner::new(None, None);
|
||||
|
||||
// Directly call the fallback builder to ensure pagination works.
|
||||
let usage: DataUsageInfo = scanner.build_data_usage_from_ecstore(&store).await.expect("fallback usage");
|
||||
|
||||
let bucket = usage.buckets_usage.get("fallback-bucket").expect("bucket usage present");
|
||||
|
||||
assert!(
|
||||
usage.objects_total_count >= 1000,
|
||||
"total objects should be >=1000, got {}",
|
||||
usage.objects_total_count
|
||||
);
|
||||
assert!(
|
||||
bucket.objects_count >= 1000,
|
||||
"bucket objects should be >=1000, got {}",
|
||||
bucket.objects_count
|
||||
);
|
||||
}
|
||||
@@ -25,6 +25,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
rustfs-ecstore.workspace = true
|
||||
rustfs-common.workspace = true
|
||||
flatbuffers.workspace = true
|
||||
futures.workspace = true
|
||||
rustfs-lock.workspace = true
|
||||
@@ -49,4 +50,4 @@ uuid = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
md5 = { workspace = true }
|
||||
md5 = { workspace = true }
|
||||
|
||||
85
crates/e2e_test/src/content_encoding_test.rs
Normal file
85
crates/e2e_test/src/content_encoding_test.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! End-to-end test for Content-Encoding header handling
|
||||
//!
|
||||
//! Tests that the Content-Encoding header is correctly stored during PUT
|
||||
//! and returned in GET/HEAD responses. This is important for clients that
|
||||
//! upload pre-compressed content and rely on the header for decompression.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::common::{RustFSTestEnvironment, init_logging};
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use serial_test::serial;
|
||||
use tracing::info;
|
||||
|
||||
/// Verify Content-Encoding header roundtrips through PUT, GET, and HEAD operations
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_content_encoding_roundtrip() {
|
||||
init_logging();
|
||||
info!("Starting Content-Encoding roundtrip test");
|
||||
|
||||
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
|
||||
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
|
||||
|
||||
let client = env.create_s3_client();
|
||||
let bucket = "content-encoding-test";
|
||||
let key = "logs/app.log.zst";
|
||||
let content = b"2024-01-15 10:23:45 INFO Application started\n2024-01-15 10:23:46 DEBUG Loading config\n";
|
||||
|
||||
client
|
||||
.create_bucket()
|
||||
.bucket(bucket)
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to create bucket");
|
||||
|
||||
info!("Uploading object with Content-Encoding: zstd");
|
||||
client
|
||||
.put_object()
|
||||
.bucket(bucket)
|
||||
.key(key)
|
||||
.content_type("text/plain")
|
||||
.content_encoding("zstd")
|
||||
.body(ByteStream::from_static(content))
|
||||
.send()
|
||||
.await
|
||||
.expect("PUT failed");
|
||||
|
||||
info!("Verifying GET response includes Content-Encoding");
|
||||
let get_resp = client.get_object().bucket(bucket).key(key).send().await.expect("GET failed");
|
||||
|
||||
assert_eq!(get_resp.content_encoding(), Some("zstd"), "GET should return Content-Encoding: zstd");
|
||||
assert_eq!(get_resp.content_type(), Some("text/plain"), "GET should return correct Content-Type");
|
||||
|
||||
let body = get_resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(body.as_ref(), content, "Body content mismatch");
|
||||
|
||||
info!("Verifying HEAD response includes Content-Encoding");
|
||||
let head_resp = client
|
||||
.head_object()
|
||||
.bucket(bucket)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.expect("HEAD failed");
|
||||
|
||||
assert_eq!(head_resp.content_encoding(), Some("zstd"), "HEAD should return Content-Encoding: zstd");
|
||||
assert_eq!(head_resp.content_type(), Some("text/plain"), "HEAD should return correct Content-Type");
|
||||
|
||||
env.stop_server();
|
||||
}
|
||||
}
|
||||
73
crates/e2e_test/src/data_usage_test.rs
Normal file
73
crates/e2e_test/src/data_usage_test.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use serial_test::serial;
|
||||
|
||||
use crate::common::{RustFSTestEnvironment, TEST_BUCKET, awscurl_get, init_logging};
|
||||
|
||||
/// Regression test for data usage accuracy (issue #1012).
|
||||
/// Launches rustfs, writes 1000 objects, then asserts admin data usage reports the full count.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[serial]
|
||||
#[ignore = "Starts a rustfs server and requires awscurl; enable when running full E2E"]
|
||||
async fn data_usage_reports_all_objects() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
|
||||
let mut env = RustFSTestEnvironment::new().await?;
|
||||
env.start_rustfs_server(vec![]).await?;
|
||||
|
||||
let client = env.create_s3_client();
|
||||
|
||||
// Create bucket and upload objects
|
||||
client.create_bucket().bucket(TEST_BUCKET).send().await?;
|
||||
|
||||
for i in 0..1000 {
|
||||
let key = format!("obj-{i:04}");
|
||||
client
|
||||
.put_object()
|
||||
.bucket(TEST_BUCKET)
|
||||
.key(key)
|
||||
.body(ByteStream::from_static(b"hello-world"))
|
||||
.send()
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Query admin data usage API
|
||||
let url = format!("{}/rustfs/admin/v3/datausageinfo", env.url);
|
||||
let resp = awscurl_get(&url, &env.access_key, &env.secret_key).await?;
|
||||
let usage: DataUsageInfo = serde_json::from_str(&resp)?;
|
||||
|
||||
// Assert total object count and per-bucket count are not truncated
|
||||
let bucket_usage = usage
|
||||
.buckets_usage
|
||||
.get(TEST_BUCKET)
|
||||
.cloned()
|
||||
.expect("bucket usage should exist");
|
||||
|
||||
assert!(
|
||||
usage.objects_total_count >= 1000,
|
||||
"total object count should be at least 1000, got {}",
|
||||
usage.objects_total_count
|
||||
);
|
||||
assert!(
|
||||
bucket_usage.objects_count >= 1000,
|
||||
"bucket object count should be at least 1000, got {}",
|
||||
bucket_usage.objects_count
|
||||
);
|
||||
|
||||
env.stop_server();
|
||||
Ok(())
|
||||
}
|
||||
@@ -18,6 +18,10 @@ mod reliant;
|
||||
#[cfg(test)]
|
||||
pub mod common;
|
||||
|
||||
// Data usage regression tests
|
||||
#[cfg(test)]
|
||||
mod data_usage_test;
|
||||
|
||||
// KMS-specific test modules
|
||||
#[cfg(test)]
|
||||
mod kms;
|
||||
@@ -25,3 +29,7 @@ mod kms;
|
||||
// Special characters in path test modules
|
||||
#[cfg(test)]
|
||||
mod special_chars_test;
|
||||
|
||||
// Content-Encoding header preservation test
|
||||
#[cfg(test)]
|
||||
mod content_encoding_test;
|
||||
|
||||
@@ -283,7 +283,17 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
|
||||
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
|
||||
);
|
||||
if obj.mod_time.expect("err").unix_timestamp() == 0 {
|
||||
|
||||
// Gracefully handle missing mod_time instead of panicking
|
||||
let mod_time = match obj.mod_time {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
info!("eval_inner: mod_time is None for object={}, returning default event", obj.name);
|
||||
return Event::default();
|
||||
}
|
||||
};
|
||||
|
||||
if mod_time.unix_timestamp() == 0 {
|
||||
info!("eval_inner: mod_time is 0, returning default event");
|
||||
return Event::default();
|
||||
}
|
||||
@@ -323,7 +333,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
}
|
||||
|
||||
if let Some(days) = expiration.days {
|
||||
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
|
||||
let expected_expiry = expected_expiry_time(mod_time, days /*, date*/);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
@@ -446,11 +456,11 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
});
|
||||
}
|
||||
} else if let Some(days) = expiration.days {
|
||||
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
|
||||
let expected_expiry: OffsetDateTime = expected_expiry_time(mod_time, days);
|
||||
info!(
|
||||
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
|
||||
days,
|
||||
obj.mod_time.expect("err!"),
|
||||
mod_time,
|
||||
expected_expiry,
|
||||
now,
|
||||
now.unix_timestamp() > expected_expiry.unix_timestamp()
|
||||
|
||||
@@ -32,6 +32,7 @@ use rustfs_common::data_usage::{
|
||||
BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary,
|
||||
};
|
||||
use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use tokio::fs;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::error::Error;
|
||||
@@ -63,6 +64,21 @@ lazy_static::lazy_static! {
|
||||
|
||||
/// Store data usage info to backend storage
|
||||
pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: Arc<ECStore>) -> Result<(), Error> {
|
||||
// Prevent older data from overwriting newer persisted stats
|
||||
if let Ok(buf) = read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await {
|
||||
if let Ok(existing) = serde_json::from_slice::<DataUsageInfo>(&buf) {
|
||||
if let (Some(new_ts), Some(existing_ts)) = (data_usage_info.last_update, existing.last_update) {
|
||||
if new_ts <= existing_ts {
|
||||
info!(
|
||||
"Skip persisting data usage: incoming last_update {:?} <= existing {:?}",
|
||||
new_ts, existing_ts
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data =
|
||||
serde_json::to_vec(&data_usage_info).map_err(|e| Error::other(format!("Failed to serialize data usage info: {e}")))?;
|
||||
|
||||
@@ -160,6 +176,39 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
|
||||
}
|
||||
|
||||
/// Aggregate usage information from local disk snapshots.
|
||||
fn merge_snapshot(aggregated: &mut DataUsageInfo, mut snapshot: LocalUsageSnapshot, latest_update: &mut Option<SystemTime>) {
|
||||
if let Some(update) = snapshot.last_update {
|
||||
if latest_update.is_none_or(|current| update > current) {
|
||||
*latest_update = Some(update);
|
||||
}
|
||||
}
|
||||
|
||||
snapshot.recompute_totals();
|
||||
|
||||
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
|
||||
aggregated.versions_total_count = aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
|
||||
aggregated.delete_markers_total_count = aggregated
|
||||
.delete_markers_total_count
|
||||
.saturating_add(snapshot.delete_markers_total_count);
|
||||
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
|
||||
|
||||
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
|
||||
let bucket_size = usage.size;
|
||||
match aggregated.buckets_usage.entry(bucket.clone()) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(usage.clone());
|
||||
}
|
||||
}
|
||||
|
||||
aggregated
|
||||
.bucket_sizes
|
||||
.entry(bucket)
|
||||
.and_modify(|size| *size = size.saturating_add(bucket_size))
|
||||
.or_insert(bucket_size);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskUsageStatus>, DataUsageInfo), Error> {
|
||||
let mut aggregated = DataUsageInfo::default();
|
||||
let mut latest_update: Option<SystemTime> = None;
|
||||
@@ -196,7 +245,24 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
|
||||
snapshot_exists: false,
|
||||
};
|
||||
|
||||
if let Some(mut snapshot) = read_local_snapshot(root.as_path(), &disk_id).await? {
|
||||
let snapshot_result = read_local_snapshot(root.as_path(), &disk_id).await;
|
||||
|
||||
// If a snapshot is corrupted or unreadable, skip it but keep processing others
|
||||
if let Err(err) = &snapshot_result {
|
||||
warn!(
|
||||
"Failed to read data usage snapshot for disk {} (pool {}, set {}, disk {}): {}",
|
||||
disk_id, pool_idx, set_disks.set_index, disk_index, err
|
||||
);
|
||||
// Best-effort cleanup so next scan can rebuild a fresh snapshot instead of repeatedly failing
|
||||
let snapshot_file = snapshot_path(root.as_path(), &disk_id);
|
||||
if let Err(remove_err) = fs::remove_file(&snapshot_file).await {
|
||||
if remove_err.kind() != std::io::ErrorKind::NotFound {
|
||||
warn!("Failed to remove corrupted snapshot {:?}: {}", snapshot_file, remove_err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(Some(mut snapshot)) = snapshot_result {
|
||||
status.last_update = snapshot.last_update;
|
||||
status.snapshot_exists = true;
|
||||
|
||||
@@ -213,37 +279,7 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
|
||||
snapshot.meta.disk_index = Some(disk_index);
|
||||
}
|
||||
|
||||
snapshot.recompute_totals();
|
||||
|
||||
if let Some(update) = snapshot.last_update {
|
||||
if latest_update.is_none_or(|current| update > current) {
|
||||
latest_update = Some(update);
|
||||
}
|
||||
}
|
||||
|
||||
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
|
||||
aggregated.versions_total_count =
|
||||
aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
|
||||
aggregated.delete_markers_total_count = aggregated
|
||||
.delete_markers_total_count
|
||||
.saturating_add(snapshot.delete_markers_total_count);
|
||||
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
|
||||
|
||||
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
|
||||
let bucket_size = usage.size;
|
||||
match aggregated.buckets_usage.entry(bucket.clone()) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(usage.clone());
|
||||
}
|
||||
}
|
||||
|
||||
aggregated
|
||||
.bucket_sizes
|
||||
.entry(bucket)
|
||||
.and_modify(|size| *size = size.saturating_add(bucket_size))
|
||||
.or_insert(bucket_size);
|
||||
}
|
||||
merge_snapshot(&mut aggregated, snapshot, &mut latest_update);
|
||||
}
|
||||
|
||||
statuses.push(status);
|
||||
@@ -549,3 +585,94 @@ pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate:
|
||||
save_config(store, &name, buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rustfs_common::data_usage::BucketUsageInfo;
|
||||
|
||||
fn aggregate_for_test(
|
||||
inputs: Vec<(DiskUsageStatus, Result<Option<LocalUsageSnapshot>, Error>)>,
|
||||
) -> (Vec<DiskUsageStatus>, DataUsageInfo) {
|
||||
let mut aggregated = DataUsageInfo::default();
|
||||
let mut latest_update: Option<SystemTime> = None;
|
||||
let mut statuses = Vec::new();
|
||||
|
||||
for (mut status, snapshot_result) in inputs {
|
||||
if let Ok(Some(snapshot)) = snapshot_result {
|
||||
status.snapshot_exists = true;
|
||||
status.last_update = snapshot.last_update;
|
||||
merge_snapshot(&mut aggregated, snapshot, &mut latest_update);
|
||||
}
|
||||
statuses.push(status);
|
||||
}
|
||||
|
||||
aggregated.buckets_count = aggregated.buckets_usage.len() as u64;
|
||||
aggregated.last_update = latest_update;
|
||||
aggregated.disk_usage_status = statuses.clone();
|
||||
|
||||
(statuses, aggregated)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aggregate_skips_corrupted_snapshot_and_preserves_other_disks() {
|
||||
let mut good_snapshot = LocalUsageSnapshot::new(LocalUsageSnapshotMeta {
|
||||
disk_id: "good-disk".to_string(),
|
||||
pool_index: Some(0),
|
||||
set_index: Some(0),
|
||||
disk_index: Some(0),
|
||||
});
|
||||
good_snapshot.last_update = Some(SystemTime::now());
|
||||
good_snapshot.buckets_usage.insert(
|
||||
"bucket-a".to_string(),
|
||||
BucketUsageInfo {
|
||||
objects_count: 3,
|
||||
versions_count: 3,
|
||||
size: 42,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
good_snapshot.recompute_totals();
|
||||
|
||||
let bad_snapshot_err: Result<Option<LocalUsageSnapshot>, Error> = Err(Error::other("corrupted snapshot payload"));
|
||||
|
||||
let inputs = vec![
|
||||
(
|
||||
DiskUsageStatus {
|
||||
disk_id: "bad-disk".to_string(),
|
||||
pool_index: Some(0),
|
||||
set_index: Some(0),
|
||||
disk_index: Some(1),
|
||||
last_update: None,
|
||||
snapshot_exists: false,
|
||||
},
|
||||
bad_snapshot_err,
|
||||
),
|
||||
(
|
||||
DiskUsageStatus {
|
||||
disk_id: "good-disk".to_string(),
|
||||
pool_index: Some(0),
|
||||
set_index: Some(0),
|
||||
disk_index: Some(0),
|
||||
last_update: None,
|
||||
snapshot_exists: false,
|
||||
},
|
||||
Ok(Some(good_snapshot)),
|
||||
),
|
||||
];
|
||||
|
||||
let (statuses, aggregated) = aggregate_for_test(inputs);
|
||||
|
||||
// Bad disk stays non-existent, good disk is marked present
|
||||
let bad_status = statuses.iter().find(|s| s.disk_id == "bad-disk").unwrap();
|
||||
assert!(!bad_status.snapshot_exists);
|
||||
let good_status = statuses.iter().find(|s| s.disk_id == "good-disk").unwrap();
|
||||
assert!(good_status.snapshot_exists);
|
||||
|
||||
// Aggregated data is from good snapshot only
|
||||
assert_eq!(aggregated.objects_total_count, 3);
|
||||
assert_eq!(aggregated.objects_total_size, 42);
|
||||
assert_eq!(aggregated.buckets_count, 1);
|
||||
assert_eq!(aggregated.buckets_usage.get("bucket-a").map(|b| (b.objects_count, b.size)), Some((3, 42)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,15 +198,22 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_file_path(&self) -> &str {
|
||||
let path = self.url.path();
|
||||
pub fn get_file_path(&self) -> String {
|
||||
let path: &str = self.url.path();
|
||||
let decoded: std::borrow::Cow<'_, str> = match urlencoding::decode(path) {
|
||||
Ok(decoded) => decoded,
|
||||
Err(e) => {
|
||||
debug!("Failed to decode path '{}': {}, using original path", path, e);
|
||||
std::borrow::Cow::Borrowed(path)
|
||||
}
|
||||
};
|
||||
#[cfg(windows)]
|
||||
if self.url.scheme() == "file" {
|
||||
let stripped = path.strip_prefix('/').unwrap_or(path);
|
||||
let stripped: &str = decoded.strip_prefix('/').unwrap_or(&decoded);
|
||||
debug!("get_file_path windows: path={}", stripped);
|
||||
return stripped;
|
||||
return stripped.to_string();
|
||||
}
|
||||
path
|
||||
decoded.into_owned()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -501,6 +508,45 @@ mod test {
|
||||
assert_eq!(endpoint.get_type(), EndpointType::Path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_with_spaces_in_path() {
|
||||
let path_with_spaces = "/Users/test/Library/Application Support/rustfs/data";
|
||||
let endpoint = Endpoint::try_from(path_with_spaces).unwrap();
|
||||
assert_eq!(endpoint.get_file_path(), path_with_spaces);
|
||||
assert!(endpoint.is_local);
|
||||
assert_eq!(endpoint.get_type(), EndpointType::Path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_percent_encoding_roundtrip() {
|
||||
let path_with_spaces = "/Users/test/Library/Application Support/rustfs/data";
|
||||
let endpoint = Endpoint::try_from(path_with_spaces).unwrap();
|
||||
|
||||
// Verify that the URL internally stores percent-encoded path
|
||||
assert!(
|
||||
endpoint.url.path().contains("%20"),
|
||||
"URL path should contain percent-encoded spaces: {}",
|
||||
endpoint.url.path()
|
||||
);
|
||||
|
||||
// Verify that get_file_path() decodes the percent-encoded path correctly
|
||||
assert_eq!(
|
||||
endpoint.get_file_path(),
|
||||
"/Users/test/Library/Application Support/rustfs/data",
|
||||
"get_file_path() should decode percent-encoded spaces"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_with_various_special_characters() {
|
||||
// Test path with multiple special characters that get percent-encoded
|
||||
let path_with_special = "/tmp/test path/data[1]/file+name&more";
|
||||
let endpoint = Endpoint::try_from(path_with_special).unwrap();
|
||||
|
||||
// get_file_path() should return the original path with decoded characters
|
||||
assert_eq!(endpoint.get_file_path(), path_with_special);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_update_is_local() {
|
||||
let mut endpoint = Endpoint::try_from("http://localhost:9000/path").unwrap();
|
||||
|
||||
@@ -232,7 +232,7 @@ impl PoolEndpointList {
|
||||
|
||||
for endpoints in pool_endpoint_list.inner.iter_mut() {
|
||||
// Check whether same path is not used in endpoints of a host on different port.
|
||||
let mut path_ip_map: HashMap<&str, HashSet<IpAddr>> = HashMap::new();
|
||||
let mut path_ip_map: HashMap<String, HashSet<IpAddr>> = HashMap::new();
|
||||
let mut host_ip_cache = HashMap::new();
|
||||
for ep in endpoints.as_ref() {
|
||||
if !ep.url.has_host() {
|
||||
@@ -275,8 +275,9 @@ impl PoolEndpointList {
|
||||
match path_ip_map.entry(path) {
|
||||
Entry::Occupied(mut e) => {
|
||||
if e.get().intersection(host_ip_set).count() > 0 {
|
||||
let path_key = e.key().clone();
|
||||
return Err(Error::other(format!(
|
||||
"same path '{path}' can not be served by different port on same address"
|
||||
"same path '{path_key}' can not be served by different port on same address"
|
||||
)));
|
||||
}
|
||||
e.get_mut().extend(host_ip_set.iter());
|
||||
@@ -295,7 +296,7 @@ impl PoolEndpointList {
|
||||
}
|
||||
|
||||
let path = ep.get_file_path();
|
||||
if local_path_set.contains(path) {
|
||||
if local_path_set.contains(&path) {
|
||||
return Err(Error::other(format!(
|
||||
"path '{path}' cannot be served by different address on same server"
|
||||
)));
|
||||
|
||||
@@ -827,7 +827,12 @@ impl ObjectInfo {
|
||||
for entry in entries.entries() {
|
||||
if entry.is_object() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
if let Some(idx) = remaining.find(delimiter.as_str()) {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
@@ -878,7 +883,14 @@ impl ObjectInfo {
|
||||
|
||||
if entry.is_dir() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
if let Some(idx) = {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
remaining.find(delimiter.as_str())
|
||||
} {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
@@ -914,7 +926,12 @@ impl ObjectInfo {
|
||||
for entry in entries.entries() {
|
||||
if entry.is_object() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
if let Some(idx) = remaining.find(delimiter.as_str()) {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
@@ -951,7 +968,14 @@ impl ObjectInfo {
|
||||
|
||||
if entry.is_dir() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
if let Some(idx) = {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
remaining.find(delimiter.as_str())
|
||||
} {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
|
||||
@@ -85,6 +85,7 @@ services:
|
||||
- RUSTFS_ACCESS_KEY=devadmin
|
||||
- RUSTFS_SECRET_KEY=devadmin
|
||||
- RUSTFS_OBS_LOGGER_LEVEL=debug
|
||||
- RUSTFS_OBS_LOG_DIRECTORY=/logs
|
||||
volumes:
|
||||
- .:/app # Mount source code to /app for development
|
||||
- deploy/data/dev:/data
|
||||
@@ -180,20 +181,6 @@ services:
|
||||
profiles:
|
||||
- observability
|
||||
|
||||
# Redis for caching (optional)
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
container_name: redis
|
||||
ports:
|
||||
- "6379:6379"
|
||||
volumes:
|
||||
- redis_data:/data
|
||||
networks:
|
||||
- rustfs-network
|
||||
restart: unless-stopped
|
||||
profiles:
|
||||
- cache
|
||||
|
||||
# NGINX reverse proxy (optional)
|
||||
nginx:
|
||||
security_opt:
|
||||
@@ -241,7 +228,5 @@ volumes:
|
||||
driver: local
|
||||
grafana_data:
|
||||
driver: local
|
||||
redis_data:
|
||||
driver: local
|
||||
logs:
|
||||
driver: local
|
||||
|
||||
@@ -29,6 +29,7 @@ x-node-template: &node-template
|
||||
- RUSTFS_ACCESS_KEY=rustfsadmin
|
||||
- RUSTFS_SECRET_KEY=rustfsadmin
|
||||
- RUSTFS_CMD=rustfs
|
||||
- RUSTFS_OBS_LOG_DIRECTORY=/logs
|
||||
command: [ "sh", "-c", "sleep 3 && rustfs" ]
|
||||
healthcheck:
|
||||
test:
|
||||
|
||||
@@ -55,7 +55,20 @@ process_data_volumes() {
|
||||
|
||||
# 3) Process log directory (separate from data volumes)
|
||||
process_log_directory() {
|
||||
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY:-/logs}"
|
||||
# Output logs to stdout
|
||||
if [ -z "$RUSTFS_OBS_LOG_DIRECTORY" ]; then
|
||||
echo "OBS log directory not configured and logs outputs to stdout"
|
||||
return
|
||||
fi
|
||||
|
||||
# Output logs to remote endpoint
|
||||
if [ "${RUSTFS_OBS_LOG_DIRECTORY}" != "${RUSTFS_OBS_LOG_DIRECTORY#*://}" ]; then
|
||||
echo "Output logs to remote endpoint"
|
||||
return
|
||||
fi
|
||||
|
||||
# Outputs logs to local directory
|
||||
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY}"
|
||||
|
||||
echo "Initializing log directory: $LOG_DIR"
|
||||
if [ ! -d "$LOG_DIR" ]; then
|
||||
|
||||
@@ -90,7 +90,6 @@ pub mod trace;
|
||||
pub mod user;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct IsAdminResponse {
|
||||
pub is_admin: bool,
|
||||
pub access_key: String,
|
||||
@@ -159,14 +158,15 @@ impl Operation for IsAdminHandler {
|
||||
return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
};
|
||||
|
||||
let (_cred, _owner) =
|
||||
let (cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let access_key_to_check = input_cred.access_key.clone();
|
||||
|
||||
// Check if the user is admin by comparing with global credentials
|
||||
let is_admin = if let Some(sys_cred) = get_global_action_cred() {
|
||||
sys_cred.access_key == access_key_to_check
|
||||
crate::auth::constant_time_eq(&access_key_to_check, &sys_cred.access_key)
|
||||
|| crate::auth::constant_time_eq(&cred.parent_user, &sys_cred.access_key)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
@@ -174,7 +174,7 @@ impl Operation for IsAdminHandler {
|
||||
let response = IsAdminResponse {
|
||||
is_admin,
|
||||
access_key: access_key_to_check,
|
||||
message: format!("User is {} an administrator", if is_admin { "" } else { "not" }),
|
||||
message: format!("User is {}an administrator", if is_admin { "" } else { "not " }),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response)
|
||||
|
||||
@@ -2272,6 +2272,7 @@ impl S3 for FS {
|
||||
content_length: Some(response_content_length),
|
||||
last_modified,
|
||||
content_type,
|
||||
content_encoding: info.content_encoding.clone(),
|
||||
accept_ranges: Some("bytes".to_string()),
|
||||
content_range,
|
||||
e_tag: info.etag.map(|etag| to_s3s_etag(&etag)),
|
||||
@@ -2487,6 +2488,7 @@ impl S3 for FS {
|
||||
let output = HeadObjectOutput {
|
||||
content_length: Some(content_length),
|
||||
content_type,
|
||||
content_encoding: info.content_encoding.clone(),
|
||||
last_modified,
|
||||
e_tag: info.etag.map(|etag| to_s3s_etag(&etag)),
|
||||
metadata: filter_object_metadata(&metadata_map),
|
||||
@@ -4518,18 +4520,16 @@ impl S3 for FS {
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let rules = match metadata_sys::get_lifecycle_config(&bucket).await {
|
||||
Ok((cfg, _)) => Some(cfg.rules),
|
||||
Ok((cfg, _)) => cfg.rules,
|
||||
Err(_err) => {
|
||||
// if BucketMetadataError::BucketLifecycleNotFound.is(&err) {
|
||||
// return Err(s3_error!(NoSuchLifecycleConfiguration));
|
||||
// }
|
||||
// warn!("get_lifecycle_config err {:?}", err);
|
||||
None
|
||||
// Return NoSuchLifecycleConfiguration error as expected by S3 clients
|
||||
// This fixes issue #990 where Ansible S3 roles fail with KeyError: 'Rules'
|
||||
return Err(s3_error!(NoSuchLifecycleConfiguration));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
|
||||
rules,
|
||||
rules: Some(rules),
|
||||
..Default::default()
|
||||
}))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user