mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
Compare commits
12 Commits
1.0.0-alph
...
feat/scan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
00787cbce4 | ||
|
|
3ac004510a | ||
|
|
d8f8bfa5b7 | ||
|
|
1768e7bbdb | ||
|
|
3326737c01 | ||
|
|
91770ffd1b | ||
|
|
7940b69bf8 | ||
|
|
427d31d09c | ||
|
|
dbdcecb9c5 | ||
|
|
ad34f1b031 | ||
|
|
2a5ccd2211 | ||
|
|
c43166c4c6 |
3
.github/workflows/helm-package.yml
vendored
3
.github/workflows/helm-package.yml
vendored
@@ -5,9 +5,6 @@ on:
|
||||
workflows: ["Build and Release"]
|
||||
types: [completed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
new_version: ${{ github.event.workflow_run.head_branch }}
|
||||
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3053,7 +3053,6 @@ dependencies = [
|
||||
"rand 0.10.0-rc.5",
|
||||
"reqwest",
|
||||
"rmp-serde",
|
||||
"rustfs-common",
|
||||
"rustfs-ecstore",
|
||||
"rustfs-filemeta",
|
||||
"rustfs-lock",
|
||||
|
||||
@@ -81,11 +81,12 @@ ENV RUSTFS_ADDRESS=":9000" \
|
||||
RUSTFS_CORS_ALLOWED_ORIGINS="*" \
|
||||
RUSTFS_CONSOLE_CORS_ALLOWED_ORIGINS="*" \
|
||||
RUSTFS_VOLUMES="/data" \
|
||||
RUST_LOG="warn"
|
||||
RUST_LOG="warn" \
|
||||
RUSTFS_OBS_LOG_DIRECTORY="/logs"
|
||||
|
||||
EXPOSE 9000 9001
|
||||
|
||||
VOLUME ["/data"]
|
||||
VOLUME ["/data", "/logs"]
|
||||
|
||||
USER rustfs
|
||||
|
||||
|
||||
@@ -166,13 +166,14 @@ ENV RUSTFS_ADDRESS=":9000" \
|
||||
RUSTFS_CONSOLE_ENABLE="true" \
|
||||
RUSTFS_VOLUMES="/data" \
|
||||
RUST_LOG="warn" \
|
||||
RUSTFS_OBS_LOG_DIRECTORY="/logs" \
|
||||
RUSTFS_USERNAME="rustfs" \
|
||||
RUSTFS_GROUPNAME="rustfs" \
|
||||
RUSTFS_UID="1000" \
|
||||
RUSTFS_GID="1000"
|
||||
|
||||
EXPOSE 9000
|
||||
VOLUME ["/data"]
|
||||
VOLUME ["/data", "/logs"]
|
||||
|
||||
# Keep root here; entrypoint will drop privileges using chroot --userspec
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
|
||||
@@ -31,7 +31,7 @@ use tokio::{
|
||||
time::interval,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Priority queue wrapper for heal requests
|
||||
/// Uses BinaryHeap for priority-based ordering while maintaining FIFO for same-priority items
|
||||
@@ -418,7 +418,12 @@ impl HealManager {
|
||||
|
||||
/// Get statistics
|
||||
pub async fn get_statistics(&self) -> HealStatistics {
|
||||
self.statistics.read().await.clone()
|
||||
let stats = self.statistics.read().await.clone();
|
||||
debug!(
|
||||
"HealManager stats snapshot: total_tasks={}, successful_tasks={}, failed_tasks={}, running_tasks={}",
|
||||
stats.total_tasks, stats.successful_tasks, stats.failed_tasks, stats.running_tasks
|
||||
);
|
||||
stats
|
||||
}
|
||||
|
||||
/// Get active task count
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,7 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::Result;
|
||||
use crate::{
|
||||
Result,
|
||||
scanner::metrics::{BucketMetrics, MetricsCollector},
|
||||
};
|
||||
use rustfs_common::data_usage::SizeSummary;
|
||||
use rustfs_common::metrics::IlmAction;
|
||||
use rustfs_ecstore::bucket::{
|
||||
@@ -27,15 +30,26 @@ use rustfs_ecstore::bucket::{
|
||||
versioning::VersioningApi,
|
||||
versioning_sys::BucketVersioningSys,
|
||||
};
|
||||
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
|
||||
use rustfs_filemeta::FileInfo;
|
||||
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
use rustfs_ecstore::bucket::{
|
||||
replication::{GLOBAL_REPLICATION_POOL, ReplicationConfig, get_heal_replicate_object_info},
|
||||
utils::is_meta_bucketname,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::info;
|
||||
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
|
||||
use rustfs_filemeta::{FileInfo, ReplicationStatusType, replication_statuses_map};
|
||||
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, HeaderExt, VERSION_PURGE_STATUS_KEY};
|
||||
use s3s::dto::DefaultRetention;
|
||||
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
time::Duration as StdDuration,
|
||||
};
|
||||
use time::{Duration as TimeDuration, OffsetDateTime};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB
|
||||
@@ -44,21 +58,94 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102
|
||||
pub struct ScannerItem {
|
||||
pub bucket: String,
|
||||
pub object_name: String,
|
||||
pub replication: Option<ReplicationConfig>,
|
||||
pub lifecycle: Option<Arc<LifecycleConfig>>,
|
||||
pub versioning: Option<Arc<VersioningConfiguration>>,
|
||||
pub object_lock_config: Option<DefaultRetention>,
|
||||
pub replication_pending_grace: StdDuration,
|
||||
pub replication_metrics: Option<ReplicationMetricsHandle>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ReplicationMetricsHandle {
|
||||
inner: Arc<ReplicationMetricsInner>,
|
||||
}
|
||||
|
||||
struct ReplicationMetricsInner {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>,
|
||||
}
|
||||
|
||||
impl ReplicationMetricsHandle {
|
||||
pub fn new(metrics: Arc<MetricsCollector>, bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(ReplicationMetricsInner { metrics, bucket_metrics }),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record_status(&self, bucket: &str, status: ReplicationStatusType, lagging: bool) {
|
||||
match status {
|
||||
ReplicationStatusType::Pending => self.inner.metrics.increment_replication_pending_objects(1),
|
||||
ReplicationStatusType::Failed => self.inner.metrics.increment_replication_failed_objects(1),
|
||||
_ => {}
|
||||
}
|
||||
if lagging {
|
||||
self.inner.metrics.increment_replication_lagging_objects(1);
|
||||
}
|
||||
|
||||
let mut guard = self.inner.bucket_metrics.lock().await;
|
||||
let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics {
|
||||
bucket: bucket.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
match status {
|
||||
ReplicationStatusType::Pending => {
|
||||
entry.replication_pending = entry.replication_pending.saturating_add(1);
|
||||
}
|
||||
ReplicationStatusType::Failed => {
|
||||
entry.replication_failed = entry.replication_failed.saturating_add(1);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if lagging {
|
||||
entry.replication_lagging = entry.replication_lagging.saturating_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record_task_submission(&self, bucket: &str) {
|
||||
self.inner.metrics.increment_replication_tasks_queued(1);
|
||||
let mut guard = self.inner.bucket_metrics.lock().await;
|
||||
let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics {
|
||||
bucket: bucket.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
entry.replication_tasks_queued = entry.replication_tasks_queued.saturating_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
impl ScannerItem {
|
||||
const INTERNAL_REPLICATION_STATUS_KEY: &'static str = "x-rustfs-internal-replication-status";
|
||||
|
||||
pub fn new(
|
||||
bucket: String,
|
||||
replication: Option<ReplicationConfig>,
|
||||
lifecycle: Option<Arc<LifecycleConfig>>,
|
||||
versioning: Option<Arc<VersioningConfiguration>>,
|
||||
object_lock_config: Option<DefaultRetention>,
|
||||
replication_pending_grace: StdDuration,
|
||||
replication_metrics: Option<ReplicationMetricsHandle>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bucket,
|
||||
object_name: "".to_string(),
|
||||
replication,
|
||||
lifecycle,
|
||||
versioning,
|
||||
object_lock_config,
|
||||
replication_pending_grace,
|
||||
replication_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,6 +251,23 @@ impl ScannerItem {
|
||||
}
|
||||
|
||||
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
|
||||
let object_locked = self.is_object_lock_protected(oi);
|
||||
|
||||
if let Err(err) = self.heal_replication(oi).await {
|
||||
warn!(
|
||||
"heal_replication failed for {}/{} (version {:?}): {}",
|
||||
oi.bucket, oi.name, oi.version_id, err
|
||||
);
|
||||
}
|
||||
|
||||
if object_locked {
|
||||
info!(
|
||||
"apply_actions: Skipping lifecycle for {}/{} because object lock retention or legal hold is active",
|
||||
oi.bucket, oi.name
|
||||
);
|
||||
return (false, oi.size);
|
||||
}
|
||||
|
||||
let (action, _size) = self.apply_lifecycle(oi).await;
|
||||
|
||||
info!(
|
||||
@@ -174,16 +278,6 @@ impl ScannerItem {
|
||||
oi.user_defined.clone()
|
||||
);
|
||||
|
||||
// Create a mutable clone if you need to modify fields
|
||||
/*let mut oi = oi.clone();
|
||||
oi.replication_status = ReplicationStatusType::from(
|
||||
oi.user_defined
|
||||
.get("x-amz-bucket-replication-status")
|
||||
.unwrap_or(&"PENDING".to_string()),
|
||||
);
|
||||
info!("apply status is: {:?}", oi.replication_status);
|
||||
self.heal_replication(&oi, _size_s).await;*/
|
||||
|
||||
if action.delete_all() {
|
||||
return (true, 0);
|
||||
}
|
||||
@@ -200,7 +294,7 @@ impl ScannerItem {
|
||||
|
||||
info!("apply_lifecycle: Lifecycle config exists for object: {}", oi.name);
|
||||
|
||||
let (olcfg, rcfg) = if self.bucket != ".minio.sys" {
|
||||
let (olcfg, rcfg) = if !is_meta_bucketname(&self.bucket) {
|
||||
(
|
||||
get_object_lock_config(&self.bucket).await.ok(),
|
||||
None, // FIXME: replication config
|
||||
@@ -266,4 +360,202 @@ impl ScannerItem {
|
||||
|
||||
(lc_evt.action, new_size)
|
||||
}
|
||||
|
||||
fn is_object_lock_protected(&self, oi: &ObjectInfo) -> bool {
|
||||
enforce_retention_for_deletion(oi)
|
||||
}
|
||||
|
||||
async fn heal_replication(&self, oi: &ObjectInfo) -> Result<()> {
|
||||
warn!("heal_replication: healing replication for {}/{}", oi.bucket, oi.name);
|
||||
warn!("heal_replication: ObjectInfo oi: {:?}", oi);
|
||||
|
||||
let enriched = Self::hydrate_replication_metadata(oi);
|
||||
let pending_lagging = self.is_pending_lagging(&enriched);
|
||||
|
||||
if let Some(handle) = &self.replication_metrics {
|
||||
handle
|
||||
.record_status(&self.bucket, enriched.replication_status.clone(), pending_lagging)
|
||||
.await;
|
||||
}
|
||||
|
||||
debug!(
|
||||
"heal_replication: evaluating {}/{} with status {:?} and internal {:?}",
|
||||
enriched.bucket, enriched.name, enriched.replication_status, enriched.replication_status_internal
|
||||
);
|
||||
|
||||
// if !self.needs_replication_heal(&enriched, pending_lagging) {
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
// let replication_cfg = match get_replication_config(&self.bucket).await {
|
||||
// Ok((cfg, _)) => Some(cfg),
|
||||
// Err(err) => {
|
||||
// debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err);
|
||||
// None
|
||||
// }
|
||||
// };
|
||||
|
||||
// if replication_cfg.is_none() {
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
// let bucket_targets = match get_bucket_targets_config(&self.bucket).await {
|
||||
// Ok(targets) => Some(targets),
|
||||
// Err(err) => {
|
||||
// debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err);
|
||||
// None
|
||||
// }
|
||||
// };
|
||||
|
||||
// let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets);
|
||||
|
||||
let replication_cfg = self.replication.clone().unwrap_or_default();
|
||||
|
||||
if replication_cfg.config.is_none() && replication_cfg.remotes.is_none() {
|
||||
debug!("heal_replication: no replication config for {}/{}", enriched.bucket, enriched.name);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let replicate_info = get_heal_replicate_object_info(&enriched, &replication_cfg).await;
|
||||
let should_replicate = replicate_info.dsc.replicate_any()
|
||||
|| matches!(
|
||||
enriched.replication_status,
|
||||
ReplicationStatusType::Failed | ReplicationStatusType::Pending
|
||||
);
|
||||
if !should_replicate {
|
||||
debug!("heal_replication: no actionable targets for {}/{}", enriched.bucket, enriched.name);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
|
||||
pool.queue_replica_task(replicate_info).await;
|
||||
if let Some(handle) = &self.replication_metrics {
|
||||
handle.record_task_submission(&self.bucket).await;
|
||||
}
|
||||
warn!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name);
|
||||
} else {
|
||||
warn!(
|
||||
"heal_replication: GLOBAL_REPLICATION_POOL not initialized, skipping heal for {}/{}",
|
||||
enriched.bucket, enriched.name
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn needs_replication_heal(&self, oi: &ObjectInfo, pending_lagging: bool) -> bool {
|
||||
if matches!(oi.replication_status, ReplicationStatusType::Failed) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if pending_lagging && matches!(oi.replication_status, ReplicationStatusType::Pending) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Some(raw) = oi.replication_status_internal.as_ref() {
|
||||
let statuses = replication_statuses_map(raw);
|
||||
if statuses
|
||||
.values()
|
||||
.any(|status| matches!(status, ReplicationStatusType::Failed))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if pending_lagging
|
||||
&& statuses
|
||||
.values()
|
||||
.any(|status| matches!(status, ReplicationStatusType::Pending))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn hydrate_replication_metadata(oi: &ObjectInfo) -> ObjectInfo {
|
||||
let mut enriched = oi.clone();
|
||||
|
||||
if enriched.replication_status.is_empty() {
|
||||
if let Some(status) = enriched.user_defined.lookup(AMZ_BUCKET_REPLICATION_STATUS) {
|
||||
enriched.replication_status = ReplicationStatusType::from(status);
|
||||
}
|
||||
}
|
||||
|
||||
if enriched.replication_status_internal.is_none() {
|
||||
if let Some(raw) = enriched.user_defined.lookup(Self::INTERNAL_REPLICATION_STATUS_KEY) {
|
||||
if !raw.is_empty() {
|
||||
enriched.replication_status_internal = Some(raw.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if enriched.version_purge_status_internal.is_none() {
|
||||
if let Some(raw) = enriched.user_defined.lookup(VERSION_PURGE_STATUS_KEY) {
|
||||
if !raw.is_empty() {
|
||||
enriched.version_purge_status_internal = Some(raw.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enriched
|
||||
}
|
||||
|
||||
fn is_pending_lagging(&self, oi: &ObjectInfo) -> bool {
|
||||
if !matches!(oi.replication_status, ReplicationStatusType::Pending) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let Some(mod_time) = oi.mod_time else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| {
|
||||
warn!(
|
||||
"replication_pending_grace is invalid, using default value: 0 seconds, grace: {:?}",
|
||||
self.replication_pending_grace
|
||||
);
|
||||
TimeDuration::seconds(0)
|
||||
});
|
||||
if grace.is_zero() {
|
||||
return true;
|
||||
}
|
||||
|
||||
let elapsed = OffsetDateTime::now_utc() - mod_time;
|
||||
elapsed >= grace
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn replication_metrics_handle_tracks_counts() {
|
||||
let metrics = Arc::new(MetricsCollector::new());
|
||||
let bucket_metrics = Arc::new(Mutex::new(HashMap::new()));
|
||||
let handle = ReplicationMetricsHandle::new(metrics.clone(), bucket_metrics.clone());
|
||||
|
||||
handle
|
||||
.record_status("test-bucket", ReplicationStatusType::Pending, true)
|
||||
.await;
|
||||
handle
|
||||
.record_status("test-bucket", ReplicationStatusType::Failed, false)
|
||||
.await;
|
||||
handle.record_task_submission("test-bucket").await;
|
||||
|
||||
let snapshot = metrics.get_metrics();
|
||||
assert_eq!(snapshot.replication_pending_objects, 1);
|
||||
assert_eq!(snapshot.replication_failed_objects, 1);
|
||||
assert_eq!(snapshot.replication_lagging_objects, 1);
|
||||
assert_eq!(snapshot.replication_tasks_queued, 1);
|
||||
|
||||
let guard = bucket_metrics.lock().await;
|
||||
let bucket_entry = guard.get("test-bucket").expect("bucket metrics exists");
|
||||
assert_eq!(bucket_entry.replication_pending, 1);
|
||||
assert_eq!(bucket_entry.replication_failed, 1);
|
||||
assert_eq!(bucket_entry.replication_lagging, 1);
|
||||
assert_eq!(bucket_entry.replication_tasks_queued, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ struct DiskScanResult {
|
||||
pub struct LocalObjectRecord {
|
||||
pub usage: LocalObjectUsage,
|
||||
pub object_info: Option<rustfs_ecstore::store_api::ObjectInfo>,
|
||||
pub file_info: Option<FileInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -84,9 +85,6 @@ pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalSc
|
||||
guard.clone()
|
||||
};
|
||||
|
||||
// Use the first local online disk in the set to avoid missing stats when disk 0 is down
|
||||
let mut picked = false;
|
||||
|
||||
for (disk_index, disk_opt) in disks.into_iter().enumerate() {
|
||||
let Some(disk) = disk_opt else {
|
||||
continue;
|
||||
@@ -96,17 +94,11 @@ pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalSc
|
||||
continue;
|
||||
}
|
||||
|
||||
if picked {
|
||||
// Count objects once by scanning only disk index zero from each set.
|
||||
if disk_index != 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip offline disks; keep looking for an online candidate
|
||||
if !disk.is_online().await {
|
||||
continue;
|
||||
}
|
||||
|
||||
picked = true;
|
||||
|
||||
let disk_id = match disk.get_disk_id().await.map_err(Error::from)? {
|
||||
Some(id) => id.to_string(),
|
||||
None => {
|
||||
@@ -232,9 +224,11 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
|
||||
record.usage.last_modified_ns = mtime_ns;
|
||||
state.objects.insert(rel_path.clone(), record.usage.clone());
|
||||
emitted.insert(rel_path.clone());
|
||||
warn!("compute_object_usage: record: {:?}", record.clone());
|
||||
objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record);
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("compute_object_usage: None, rel_path: {:?}", rel_path);
|
||||
state.objects.remove(&rel_path);
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -249,24 +243,27 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
|
||||
warn!("Failed to read xl.meta {:?}: {}", xl_path, err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("should_parse: false, rel_path: {:?}", rel_path);
|
||||
}
|
||||
}
|
||||
|
||||
state.objects.retain(|key, _| visited.contains(key));
|
||||
state.last_scan_ns = Some(now_ns);
|
||||
|
||||
for (key, usage) in &state.objects {
|
||||
if emitted.contains(key) {
|
||||
continue;
|
||||
}
|
||||
objects_by_bucket
|
||||
.entry(usage.bucket.clone())
|
||||
.or_default()
|
||||
.push(LocalObjectRecord {
|
||||
usage: usage.clone(),
|
||||
object_info: None,
|
||||
});
|
||||
}
|
||||
// for (key, usage) in &state.objects {
|
||||
// if emitted.contains(key) {
|
||||
// continue;
|
||||
// }
|
||||
// objects_by_bucket
|
||||
// .entry(usage.bucket.clone())
|
||||
// .or_default()
|
||||
// .push(LocalObjectRecord {
|
||||
// usage: usage.clone(),
|
||||
// object_info: None,
|
||||
// file_info: None,
|
||||
// });
|
||||
// }
|
||||
|
||||
let snapshot = build_snapshot(meta, &state.objects, now);
|
||||
status.snapshot_exists = true;
|
||||
@@ -328,6 +325,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
|
||||
let versioned = fi.version_id.is_some();
|
||||
ObjectInfo::from_file_info(fi, bucket, object, versioned)
|
||||
});
|
||||
let file_info = latest_file_info.clone();
|
||||
|
||||
Ok(Some(LocalObjectRecord {
|
||||
usage: LocalObjectUsage {
|
||||
@@ -340,6 +338,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
|
||||
has_live_object,
|
||||
},
|
||||
object_info,
|
||||
file_info,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,14 @@ pub struct ScannerMetrics {
|
||||
pub healthy_objects: u64,
|
||||
/// Total corrupted objects found
|
||||
pub corrupted_objects: u64,
|
||||
/// Replication heal tasks queued
|
||||
pub replication_tasks_queued: u64,
|
||||
/// Objects observed with pending replication
|
||||
pub replication_pending_objects: u64,
|
||||
/// Objects observed with failed replication
|
||||
pub replication_failed_objects: u64,
|
||||
/// Objects with replication pending longer than grace period
|
||||
pub replication_lagging_objects: u64,
|
||||
/// Last scan activity time
|
||||
pub last_activity: Option<SystemTime>,
|
||||
/// Current scan cycle
|
||||
@@ -86,6 +94,14 @@ pub struct BucketMetrics {
|
||||
pub heal_tasks_completed: u64,
|
||||
/// Heal tasks failed for this bucket
|
||||
pub heal_tasks_failed: u64,
|
||||
/// Objects observed with pending replication status
|
||||
pub replication_pending: u64,
|
||||
/// Objects observed with failed replication status
|
||||
pub replication_failed: u64,
|
||||
/// Objects exceeding replication grace period
|
||||
pub replication_lagging: u64,
|
||||
/// Replication heal tasks queued for this bucket
|
||||
pub replication_tasks_queued: u64,
|
||||
}
|
||||
|
||||
/// Disk-specific metrics
|
||||
@@ -127,6 +143,10 @@ pub struct MetricsCollector {
|
||||
total_cycles: AtomicU64,
|
||||
healthy_objects: AtomicU64,
|
||||
corrupted_objects: AtomicU64,
|
||||
replication_tasks_queued: AtomicU64,
|
||||
replication_pending_objects: AtomicU64,
|
||||
replication_failed_objects: AtomicU64,
|
||||
replication_lagging_objects: AtomicU64,
|
||||
}
|
||||
|
||||
impl MetricsCollector {
|
||||
@@ -146,6 +166,10 @@ impl MetricsCollector {
|
||||
total_cycles: AtomicU64::new(0),
|
||||
healthy_objects: AtomicU64::new(0),
|
||||
corrupted_objects: AtomicU64::new(0),
|
||||
replication_tasks_queued: AtomicU64::new(0),
|
||||
replication_pending_objects: AtomicU64::new(0),
|
||||
replication_failed_objects: AtomicU64::new(0),
|
||||
replication_lagging_objects: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,6 +218,26 @@ impl MetricsCollector {
|
||||
self.heal_tasks_failed.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication tasks queued
|
||||
pub fn increment_replication_tasks_queued(&self, count: u64) {
|
||||
self.replication_tasks_queued.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication pending objects
|
||||
pub fn increment_replication_pending_objects(&self, count: u64) {
|
||||
self.replication_pending_objects.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication failed objects
|
||||
pub fn increment_replication_failed_objects(&self, count: u64) {
|
||||
self.replication_failed_objects.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication lagging objects
|
||||
pub fn increment_replication_lagging_objects(&self, count: u64) {
|
||||
self.replication_lagging_objects.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Set current cycle
|
||||
pub fn set_current_cycle(&self, cycle: u64) {
|
||||
self.current_cycle.store(cycle, Ordering::Relaxed);
|
||||
@@ -228,6 +272,10 @@ impl MetricsCollector {
|
||||
heal_tasks_failed: self.heal_tasks_failed.load(Ordering::Relaxed),
|
||||
healthy_objects: self.healthy_objects.load(Ordering::Relaxed),
|
||||
corrupted_objects: self.corrupted_objects.load(Ordering::Relaxed),
|
||||
replication_tasks_queued: self.replication_tasks_queued.load(Ordering::Relaxed),
|
||||
replication_pending_objects: self.replication_pending_objects.load(Ordering::Relaxed),
|
||||
replication_failed_objects: self.replication_failed_objects.load(Ordering::Relaxed),
|
||||
replication_lagging_objects: self.replication_lagging_objects.load(Ordering::Relaxed),
|
||||
last_activity: Some(SystemTime::now()),
|
||||
current_cycle: self.current_cycle.load(Ordering::Relaxed),
|
||||
total_cycles: self.total_cycles.load(Ordering::Relaxed),
|
||||
@@ -255,6 +303,10 @@ impl MetricsCollector {
|
||||
self.total_cycles.store(0, Ordering::Relaxed);
|
||||
self.healthy_objects.store(0, Ordering::Relaxed);
|
||||
self.corrupted_objects.store(0, Ordering::Relaxed);
|
||||
self.replication_tasks_queued.store(0, Ordering::Relaxed);
|
||||
self.replication_pending_objects.store(0, Ordering::Relaxed);
|
||||
self.replication_failed_objects.store(0, Ordering::Relaxed);
|
||||
self.replication_lagging_objects.store(0, Ordering::Relaxed);
|
||||
|
||||
info!("Scanner metrics reset");
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::scanner::{
|
||||
};
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use rustfs_ecstore::StorageAPI;
|
||||
use rustfs_ecstore::bucket::utils::is_meta_bucketname;
|
||||
use rustfs_ecstore::disk::{DiskAPI, DiskStore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
@@ -879,7 +880,7 @@ impl NodeScanner {
|
||||
let bucket_name = &bucket_info.name;
|
||||
|
||||
// skip system internal buckets
|
||||
if bucket_name == ".minio.sys" {
|
||||
if is_meta_bucketname(bucket_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -347,8 +347,7 @@ impl DecentralizedStatsAggregator {
|
||||
|
||||
// update cache
|
||||
*self.cached_stats.write().await = Some(aggregated.clone());
|
||||
// Use the time when aggregation completes as cache timestamp to avoid premature expiry during long runs
|
||||
*self.cache_timestamp.write().await = SystemTime::now();
|
||||
*self.cache_timestamp.write().await = aggregation_timestamp;
|
||||
|
||||
Ok(aggregated)
|
||||
}
|
||||
@@ -360,8 +359,7 @@ impl DecentralizedStatsAggregator {
|
||||
|
||||
// update cache
|
||||
*self.cached_stats.write().await = Some(aggregated.clone());
|
||||
// Cache timestamp should reflect completion time rather than aggregation start
|
||||
*self.cache_timestamp.write().await = SystemTime::now();
|
||||
*self.cache_timestamp.write().await = now;
|
||||
|
||||
Ok(aggregated)
|
||||
}
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![cfg(test)]
|
||||
|
||||
use rustfs_ahm::scanner::data_scanner::Scanner;
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use rustfs_ecstore::GLOBAL_Endpoints;
|
||||
use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetadataSys};
|
||||
use rustfs_ecstore::endpoints::EndpointServerPools;
|
||||
use rustfs_ecstore::store::ECStore;
|
||||
use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI};
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Build a minimal single-node ECStore over a temp directory and populate objects.
|
||||
async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECStore>) {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let root = temp_dir.path().to_string_lossy().to_string();
|
||||
|
||||
// Create endpoints from the temp dir
|
||||
let (endpoint_pools, _setup) = EndpointServerPools::from_volumes("127.0.0.1:0", vec![root])
|
||||
.await
|
||||
.expect("endpoint pools");
|
||||
|
||||
// Seed globals required by metadata sys if not already set
|
||||
if GLOBAL_Endpoints.get().is_none() {
|
||||
let _ = GLOBAL_Endpoints.set(endpoint_pools.clone());
|
||||
}
|
||||
|
||||
let store = ECStore::new("127.0.0.1:0".parse().unwrap(), endpoint_pools, CancellationToken::new())
|
||||
.await
|
||||
.expect("create store");
|
||||
|
||||
if rustfs_ecstore::global::new_object_layer_fn().is_none() {
|
||||
rustfs_ecstore::global::set_object_layer(store.clone()).await;
|
||||
}
|
||||
|
||||
// Initialize metadata system before bucket operations
|
||||
if GLOBAL_BucketMetadataSys.get().is_none() {
|
||||
let mut sys = BucketMetadataSys::new(store.clone());
|
||||
sys.init(Vec::new()).await;
|
||||
let _ = GLOBAL_BucketMetadataSys.set(Arc::new(RwLock::new(sys)));
|
||||
}
|
||||
|
||||
store
|
||||
.make_bucket("fallback-bucket", &rustfs_ecstore::store_api::MakeBucketOptions::default())
|
||||
.await
|
||||
.expect("make bucket");
|
||||
|
||||
for i in 0..count {
|
||||
let key = format!("obj-{i:04}");
|
||||
let data = format!("payload-{i}");
|
||||
let mut reader = PutObjReader::from_vec(data.into_bytes());
|
||||
store
|
||||
.put_object("fallback-bucket", &key, &mut reader, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
.expect("put object");
|
||||
}
|
||||
|
||||
(temp_dir, store)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fallback_builds_full_counts_over_100_objects() {
|
||||
let (_tmp, store) = create_store_with_objects(1000).await;
|
||||
let scanner = Scanner::new(None, None);
|
||||
|
||||
// Directly call the fallback builder to ensure pagination works.
|
||||
let usage: DataUsageInfo = scanner.build_data_usage_from_ecstore(&store).await.expect("fallback usage");
|
||||
|
||||
let bucket = usage.buckets_usage.get("fallback-bucket").expect("bucket usage present");
|
||||
|
||||
assert!(
|
||||
usage.objects_total_count >= 1000,
|
||||
"total objects should be >=1000, got {}",
|
||||
usage.objects_total_count
|
||||
);
|
||||
assert!(
|
||||
bucket.objects_count >= 1000,
|
||||
"bucket objects should be >=1000, got {}",
|
||||
bucket.objects_count
|
||||
);
|
||||
}
|
||||
@@ -12,31 +12,52 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use rustfs_ahm::heal::{
|
||||
manager::{HealConfig, HealManager},
|
||||
storage::{ECStoreHealStorage, HealStorageAPI},
|
||||
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
|
||||
use async_trait::async_trait;
|
||||
use rustfs_ahm::{
|
||||
heal::{
|
||||
manager::{HealConfig, HealManager},
|
||||
storage::{ECStoreHealStorage, HealStorageAPI},
|
||||
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
|
||||
},
|
||||
scanner::{ScanMode, Scanner},
|
||||
};
|
||||
use rustfs_common::heal_channel::{HealOpts, HealScanMode};
|
||||
use rustfs_ecstore::bucket::metadata_sys::{self, set_bucket_metadata};
|
||||
use rustfs_ecstore::bucket::replication::{
|
||||
DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority,
|
||||
};
|
||||
use rustfs_ecstore::bucket::target::{BucketTarget, BucketTargetType, BucketTargets};
|
||||
use rustfs_ecstore::bucket::utils::serialize;
|
||||
use rustfs_ecstore::error::Error as EcstoreError;
|
||||
use rustfs_ecstore::{
|
||||
disk::endpoint::Endpoint,
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
store::ECStore,
|
||||
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
|
||||
};
|
||||
use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType};
|
||||
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use s3s::dto::{
|
||||
BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration,
|
||||
ReplicationRule, ReplicationRuleStatus, VersioningConfiguration,
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
os::unix::fs::PermissionsExt,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::fs;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-replication-heal-target";
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
@@ -145,6 +166,225 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
|
||||
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
|
||||
}
|
||||
|
||||
fn delete_first_part_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf {
|
||||
for disk_path in disk_paths {
|
||||
let obj_dir = disk_path.join(bucket).join(object);
|
||||
if !obj_dir.exists() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(part_path) = WalkDir::new(&obj_dir)
|
||||
.min_depth(2)
|
||||
.max_depth(2)
|
||||
.into_iter()
|
||||
.filter_map(Result::ok)
|
||||
.find(|entry| {
|
||||
entry.file_type().is_file()
|
||||
&& entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.map(|name| name.starts_with("part."))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|entry| entry.into_path())
|
||||
{
|
||||
std::fs::remove_file(&part_path).expect("Failed to delete part file");
|
||||
return part_path;
|
||||
}
|
||||
}
|
||||
|
||||
panic!("Failed to locate part file for {}/{}", bucket, object);
|
||||
}
|
||||
|
||||
fn delete_xl_meta_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf {
|
||||
for disk_path in disk_paths {
|
||||
let xl_meta_path = disk_path.join(bucket).join(object).join("xl.meta");
|
||||
if xl_meta_path.exists() {
|
||||
std::fs::remove_file(&xl_meta_path).expect("Failed to delete xl.meta file");
|
||||
return xl_meta_path;
|
||||
}
|
||||
}
|
||||
|
||||
panic!("Failed to locate xl.meta for {}/{}", bucket, object);
|
||||
}
|
||||
|
||||
struct FormatPathGuard {
|
||||
original: PathBuf,
|
||||
backup: PathBuf,
|
||||
}
|
||||
|
||||
impl FormatPathGuard {
|
||||
fn new(original: PathBuf) -> std::io::Result<Self> {
|
||||
let backup = original.with_extension("bak");
|
||||
if backup.exists() {
|
||||
std::fs::remove_file(&backup)?;
|
||||
}
|
||||
std::fs::rename(&original, &backup)?;
|
||||
Ok(Self { original, backup })
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FormatPathGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.backup.exists() {
|
||||
let _ = std::fs::rename(&self.backup, &self.original);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PermissionGuard {
|
||||
path: PathBuf,
|
||||
original_mode: u32,
|
||||
}
|
||||
|
||||
impl PermissionGuard {
|
||||
fn new(path: PathBuf, new_mode: u32) -> std::io::Result<Self> {
|
||||
let metadata = std::fs::metadata(&path)?;
|
||||
let original_mode = metadata.permissions().mode();
|
||||
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(new_mode))?;
|
||||
Ok(Self { path, original_mode })
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PermissionGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.path.exists() {
|
||||
let _ = std::fs::set_permissions(&self.path, std::fs::Permissions::from_mode(self.original_mode));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct RecordingReplicationPool {
|
||||
replica_tasks: Mutex<Vec<ReplicateObjectInfo>>,
|
||||
delete_tasks: Mutex<Vec<DeletedObjectReplicationInfo>>,
|
||||
}
|
||||
|
||||
impl RecordingReplicationPool {
|
||||
async fn take_replica_tasks(&self) -> Vec<ReplicateObjectInfo> {
|
||||
let mut guard = self.replica_tasks.lock().await;
|
||||
guard.drain(..).collect()
|
||||
}
|
||||
|
||||
async fn clear(&self) {
|
||||
self.replica_tasks.lock().await.clear();
|
||||
self.delete_tasks.lock().await.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReplicationPoolTrait for RecordingReplicationPool {
|
||||
async fn queue_replica_task(&self, ri: ReplicateObjectInfo) {
|
||||
self.replica_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) {
|
||||
self.delete_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {}
|
||||
|
||||
async fn init_resync(
|
||||
self: Arc<Self>,
|
||||
_cancellation_token: CancellationToken,
|
||||
_buckets: Vec<String>,
|
||||
) -> Result<(), EcstoreError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_test_replication_pool() -> Arc<RecordingReplicationPool> {
|
||||
static TEST_POOL: OnceLock<Arc<RecordingReplicationPool>> = OnceLock::new();
|
||||
|
||||
if let Some(pool) = TEST_POOL.get() {
|
||||
pool.clear().await;
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
let pool = Arc::new(RecordingReplicationPool::default());
|
||||
let dyn_pool: Arc<DynReplicationPool> = pool.clone();
|
||||
let global_pool = GLOBAL_REPLICATION_POOL
|
||||
.get_or_init(|| {
|
||||
let pool_clone = dyn_pool.clone();
|
||||
async move { pool_clone }
|
||||
})
|
||||
.await
|
||||
.clone();
|
||||
|
||||
assert!(
|
||||
Arc::ptr_eq(&dyn_pool, &global_pool),
|
||||
"GLOBAL_REPLICATION_POOL initialized before test replication pool"
|
||||
);
|
||||
|
||||
let _ = TEST_POOL.set(pool.clone());
|
||||
pool.clear().await;
|
||||
pool
|
||||
}
|
||||
|
||||
async fn configure_bucket_replication(bucket: &str, target_arn: &str) {
|
||||
let meta = metadata_sys::get(bucket)
|
||||
.await
|
||||
.expect("bucket metadata should exist for replication configuration");
|
||||
let mut metadata = (*meta).clone();
|
||||
|
||||
let replication_rule = ReplicationRule {
|
||||
delete_marker_replication: None,
|
||||
delete_replication: None,
|
||||
destination: Destination {
|
||||
access_control_translation: None,
|
||||
account: None,
|
||||
bucket: target_arn.to_string(),
|
||||
encryption_configuration: None,
|
||||
metrics: None,
|
||||
replication_time: None,
|
||||
storage_class: None,
|
||||
},
|
||||
existing_object_replication: Some(ExistingObjectReplication {
|
||||
status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED),
|
||||
}),
|
||||
filter: None,
|
||||
id: Some("heal-replication-rule".to_string()),
|
||||
prefix: Some(String::new()),
|
||||
priority: Some(1),
|
||||
source_selection_criteria: None,
|
||||
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
|
||||
};
|
||||
|
||||
let replication_cfg = ReplicationConfiguration {
|
||||
role: target_arn.to_string(),
|
||||
rules: vec![replication_rule],
|
||||
};
|
||||
|
||||
let bucket_targets = BucketTargets {
|
||||
targets: vec![BucketTarget {
|
||||
source_bucket: bucket.to_string(),
|
||||
endpoint: "replication.invalid".to_string(),
|
||||
target_bucket: "replication-target".to_string(),
|
||||
arn: target_arn.to_string(),
|
||||
target_type: BucketTargetType::ReplicationService,
|
||||
..Default::default()
|
||||
}],
|
||||
};
|
||||
|
||||
metadata.replication_config = Some(replication_cfg.clone());
|
||||
metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config");
|
||||
metadata.replication_config_updated_at = OffsetDateTime::now_utc();
|
||||
metadata.bucket_target_config = Some(bucket_targets.clone());
|
||||
metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets");
|
||||
metadata.bucket_targets_config_updated_at = OffsetDateTime::now_utc();
|
||||
let versioning_cfg = VersioningConfiguration {
|
||||
status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
|
||||
..Default::default()
|
||||
};
|
||||
metadata.versioning_config = Some(versioning_cfg.clone());
|
||||
metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config");
|
||||
metadata.versioning_config_updated_at = OffsetDateTime::now_utc();
|
||||
|
||||
set_bucket_metadata(bucket.to_string(), metadata)
|
||||
.await
|
||||
.expect("failed to update bucket metadata for replication");
|
||||
}
|
||||
|
||||
mod serial_tests {
|
||||
use super::*;
|
||||
|
||||
@@ -430,4 +670,380 @@ mod serial_tests {
|
||||
|
||||
info!("Direct heal storage API test passed");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_heal_task_when_part_missing() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-heal-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-heal-object.txt";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner auto-heal data").await;
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before simulating failures");
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
|
||||
let deleted_part_path = delete_first_part_file(&disk_paths, &bucket_name, object_name);
|
||||
assert!(!deleted_part_path.exists(), "Deleted part file should not exist before healing");
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan after part deletion should finish and enqueue heal task");
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should submit heal tasks when data parts go missing"
|
||||
);
|
||||
|
||||
// Allow heal manager to restore the missing part
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
assert!(
|
||||
deleted_part_path.exists(),
|
||||
"Missing part should be restored after heal: {:?}",
|
||||
deleted_part_path
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_metadata_heal_when_xl_meta_missing() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-meta-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-meta-object.txt";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner metadata heal data").await;
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before metadata deletion");
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
|
||||
let deleted_meta_path = delete_xl_meta_file(&disk_paths, &bucket_name, object_name);
|
||||
assert!(!deleted_meta_path.exists(), "Deleted xl.meta should not exist before healing");
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan after metadata deletion should finish and enqueue heal task");
|
||||
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should submit metadata heal tasks when xl.meta is missing"
|
||||
);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
assert!(
|
||||
deleted_meta_path.exists(),
|
||||
"xl.meta should be restored after heal: {:?}",
|
||||
deleted_meta_path
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_triggers_replication_heal_when_status_failed() {
|
||||
let (_disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-replication-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-replication-heal-object";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
configure_bucket_replication(&bucket_name, TEST_REPLICATION_TARGET_ARN).await;
|
||||
|
||||
let replication_pool = ensure_test_replication_pool().await;
|
||||
replication_pool.clear().await;
|
||||
|
||||
let mut opts = ObjectOptions::default();
|
||||
opts.user_defined.insert(
|
||||
AMZ_BUCKET_REPLICATION_STATUS.to_string(),
|
||||
ReplicationStatusType::Failed.as_str().to_string(),
|
||||
);
|
||||
let replication_status_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER);
|
||||
opts.user_defined.insert(
|
||||
replication_status_key.clone(),
|
||||
format!("{}={};", TEST_REPLICATION_TARGET_ARN, ReplicationStatusType::Failed.as_str()),
|
||||
);
|
||||
let mut reader = PutObjReader::from_vec(b"replication heal data".to_vec());
|
||||
ecstore
|
||||
.put_object(&bucket_name, object_name, &mut reader, &opts)
|
||||
.await
|
||||
.expect("Failed to upload replication test object");
|
||||
|
||||
let object_info = ecstore
|
||||
.get_object_info(&bucket_name, object_name, &ObjectOptions::default())
|
||||
.await
|
||||
.expect("Failed to read object info for replication test");
|
||||
assert_eq!(
|
||||
object_info
|
||||
.user_defined
|
||||
.get(AMZ_BUCKET_REPLICATION_STATUS)
|
||||
.map(|s| s.as_str()),
|
||||
Some(ReplicationStatusType::Failed.as_str()),
|
||||
"Uploaded object should contain replication status metadata"
|
||||
);
|
||||
assert!(
|
||||
object_info
|
||||
.user_defined
|
||||
.get(&replication_status_key)
|
||||
.map(|s| s.contains(ReplicationStatusType::Failed.as_str()))
|
||||
.unwrap_or(false),
|
||||
"Uploaded object should preserve internal replication status metadata"
|
||||
);
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan cycle should succeed and evaluate replication state");
|
||||
|
||||
let replica_tasks = replication_pool.take_replica_tasks().await;
|
||||
assert!(
|
||||
replica_tasks
|
||||
.iter()
|
||||
.any(|info| info.bucket == bucket_name && info.name == object_name),
|
||||
"Scanner should enqueue replication heal task when replication status is FAILED (recorded tasks: {:?})",
|
||||
replica_tasks
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_erasure_set_heal_when_disk_offline() {
|
||||
let (disk_paths, _ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let format_path = disk_paths[0].join(".rustfs.sys").join("format.json");
|
||||
assert!(format_path.exists(), "format.json should exist before simulating offline disk");
|
||||
let _format_guard = FormatPathGuard::new(format_path.clone()).expect("failed to move format.json");
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Normal).await;
|
||||
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan cycle should complete even when a disk is offline");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should enqueue erasure set heal when disk is offline (before {}, after {})",
|
||||
baseline_stats.total_tasks,
|
||||
updated_stats.total_tasks
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_erasure_set_heal_when_listing_volumes_fails() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-list-volumes-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-list-volumes-object";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"disk list volumes failure").await;
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before simulating disk permission issues");
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
|
||||
let disk_root = disk_paths[0].clone();
|
||||
assert!(disk_root.exists(), "Disk root should exist so we can simulate permission failures");
|
||||
|
||||
{
|
||||
let _root_perm_guard =
|
||||
PermissionGuard::new(disk_root.clone(), 0o000).expect("Failed to change disk root permissions");
|
||||
|
||||
let scan_result = scanner.scan_cycle().await;
|
||||
assert!(
|
||||
scan_result.is_ok(),
|
||||
"Scan cycle should continue even if disk volumes cannot be listed: {:?}",
|
||||
scan_result
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should enqueue erasure set heal when listing volumes fails (before {}, after {})",
|
||||
baseline_stats.total_tasks,
|
||||
updated_stats.total_tasks
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_erasure_set_heal_when_disk_access_fails() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-access-error-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-access-error-object.txt";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"disk access failure").await;
|
||||
|
||||
let bucket_path = disk_paths[0].join(&bucket_name);
|
||||
assert!(bucket_path.exists(), "Bucket path should exist on disk for access test");
|
||||
let _perm_guard = PermissionGuard::new(bucket_path.clone(), 0o000).expect("Failed to change permissions");
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
let scan_result = scanner.scan_cycle().await;
|
||||
assert!(
|
||||
scan_result.is_ok(),
|
||||
"Scan cycle should complete even if a disk volume has access errors: {:?}",
|
||||
scan_result
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should enqueue erasure set heal when disk access fails (before {}, after {})",
|
||||
baseline_stats.total_tasks,
|
||||
updated_stats.total_tasks
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_detects_missing_bucket_directory_and_queues_bucket_heal() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-missing-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, "seed-object", b"bucket heal data").await;
|
||||
|
||||
let scanner_heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let scanner_heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(scanner_heal_cfg)));
|
||||
scanner_heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(scanner_heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Normal).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before deleting bucket directory");
|
||||
let baseline_stats = scanner_heal_manager.get_statistics().await;
|
||||
|
||||
let missing_dir = disk_paths[0].join(&bucket_name);
|
||||
assert!(missing_dir.exists());
|
||||
std::fs::remove_dir_all(&missing_dir).expect("Failed to remove bucket directory for heal simulation");
|
||||
assert!(!missing_dir.exists(), "Bucket directory should be removed on disk to trigger heal");
|
||||
|
||||
scanner
|
||||
.run_volume_consistency_check()
|
||||
.await
|
||||
.expect("Volume consistency check should run after bucket removal");
|
||||
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||
|
||||
let updated_stats = scanner_heal_manager.get_statistics().await;
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should submit bucket heal tasks when a bucket directory is missing"
|
||||
);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
assert!(missing_dir.exists(), "Bucket directory should be restored after heal");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,18 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig};
|
||||
use rustfs_ecstore::{
|
||||
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
|
||||
bucket::metadata_sys,
|
||||
bucket::{
|
||||
metadata::BUCKET_LIFECYCLE_CONFIG,
|
||||
metadata_sys,
|
||||
replication::{
|
||||
DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority,
|
||||
},
|
||||
target::{BucketTarget, BucketTargetType, BucketTargets},
|
||||
utils::serialize,
|
||||
},
|
||||
disk::endpoint::Endpoint,
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
global::GLOBAL_TierConfigMgr,
|
||||
@@ -23,18 +31,27 @@ use rustfs_ecstore::{
|
||||
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
|
||||
tier::tier_config::{TierConfig, TierMinIO, TierType},
|
||||
};
|
||||
use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType};
|
||||
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use s3s::dto::{
|
||||
BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration,
|
||||
ReplicationRule, ReplicationRuleStatus, VersioningConfiguration,
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
use time::{Duration as TimeDuration, OffsetDateTime};
|
||||
use tokio::fs;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-lifecycle-replication-test";
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
@@ -159,6 +176,167 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
|
||||
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct RecordingReplicationPool {
|
||||
replica_tasks: Mutex<Vec<ReplicateObjectInfo>>,
|
||||
delete_tasks: Mutex<Vec<DeletedObjectReplicationInfo>>,
|
||||
}
|
||||
|
||||
impl RecordingReplicationPool {
|
||||
async fn take_replica_tasks(&self) -> Vec<ReplicateObjectInfo> {
|
||||
let mut guard = self.replica_tasks.lock().await;
|
||||
guard.drain(..).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReplicationPoolTrait for RecordingReplicationPool {
|
||||
async fn queue_replica_task(&self, ri: ReplicateObjectInfo) {
|
||||
self.replica_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) {
|
||||
self.delete_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {}
|
||||
|
||||
async fn init_resync(
|
||||
self: Arc<Self>,
|
||||
_cancellation_token: CancellationToken,
|
||||
_buckets: Vec<String>,
|
||||
) -> Result<(), rustfs_ecstore::error::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_test_replication_pool() -> Arc<RecordingReplicationPool> {
|
||||
static POOL: OnceLock<Arc<RecordingReplicationPool>> = OnceLock::new();
|
||||
if let Some(existing) = POOL.get() {
|
||||
existing.replica_tasks.lock().await.clear();
|
||||
existing.delete_tasks.lock().await.clear();
|
||||
return existing.clone();
|
||||
}
|
||||
|
||||
let pool = Arc::new(RecordingReplicationPool::default());
|
||||
let dyn_pool: Arc<DynReplicationPool> = pool.clone();
|
||||
GLOBAL_REPLICATION_POOL
|
||||
.get_or_init(|| {
|
||||
let pool_clone = dyn_pool.clone();
|
||||
async move { pool_clone }
|
||||
})
|
||||
.await;
|
||||
let _ = POOL.set(pool.clone());
|
||||
pool
|
||||
}
|
||||
|
||||
async fn configure_bucket_replication(bucket: &str) {
|
||||
let meta = metadata_sys::get(bucket)
|
||||
.await
|
||||
.expect("bucket metadata should exist for replication configuration");
|
||||
let mut metadata = (*meta).clone();
|
||||
|
||||
let replication_rule = ReplicationRule {
|
||||
delete_marker_replication: None,
|
||||
delete_replication: None,
|
||||
destination: Destination {
|
||||
access_control_translation: None,
|
||||
account: None,
|
||||
bucket: TEST_REPLICATION_TARGET_ARN.to_string(),
|
||||
encryption_configuration: None,
|
||||
metrics: None,
|
||||
replication_time: None,
|
||||
storage_class: None,
|
||||
},
|
||||
existing_object_replication: Some(ExistingObjectReplication {
|
||||
status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED),
|
||||
}),
|
||||
filter: None,
|
||||
id: Some("lifecycle-replication-rule".to_string()),
|
||||
prefix: Some(String::new()),
|
||||
priority: Some(1),
|
||||
source_selection_criteria: None,
|
||||
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
|
||||
};
|
||||
|
||||
let replication_cfg = ReplicationConfiguration {
|
||||
role: TEST_REPLICATION_TARGET_ARN.to_string(),
|
||||
rules: vec![replication_rule],
|
||||
};
|
||||
|
||||
let bucket_targets = BucketTargets {
|
||||
targets: vec![BucketTarget {
|
||||
source_bucket: bucket.to_string(),
|
||||
endpoint: "replication.invalid".to_string(),
|
||||
target_bucket: "replication-target".to_string(),
|
||||
arn: TEST_REPLICATION_TARGET_ARN.to_string(),
|
||||
target_type: BucketTargetType::ReplicationService,
|
||||
..Default::default()
|
||||
}],
|
||||
};
|
||||
|
||||
metadata.replication_config = Some(replication_cfg.clone());
|
||||
metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config");
|
||||
metadata.bucket_target_config = Some(bucket_targets.clone());
|
||||
metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets");
|
||||
|
||||
let versioning_cfg = VersioningConfiguration {
|
||||
status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
|
||||
..Default::default()
|
||||
};
|
||||
metadata.versioning_config = Some(versioning_cfg.clone());
|
||||
metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config");
|
||||
|
||||
metadata_sys::set_bucket_metadata(bucket.to_string(), metadata)
|
||||
.await
|
||||
.expect("failed to persist bucket metadata with replication config");
|
||||
}
|
||||
|
||||
async fn upload_object_with_replication_status(
|
||||
ecstore: &Arc<ECStore>,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
status: ReplicationStatusType,
|
||||
) {
|
||||
let mut reader = PutObjReader::from_vec(b"replication-state".to_vec());
|
||||
let mut opts = ObjectOptions::default();
|
||||
opts.user_defined
|
||||
.insert(AMZ_BUCKET_REPLICATION_STATUS.to_string(), status.as_str().to_string());
|
||||
let internal_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER);
|
||||
opts.user_defined
|
||||
.insert(internal_key, format!("{}={};", TEST_REPLICATION_TARGET_ARN, status.as_str()));
|
||||
|
||||
(**ecstore)
|
||||
.put_object(bucket, object, &mut reader, &opts)
|
||||
.await
|
||||
.expect("failed to upload replication test object");
|
||||
}
|
||||
|
||||
async fn upload_object_with_retention(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8], retain_for: Duration) {
|
||||
use s3s::header::{X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE};
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
|
||||
let mut reader = PutObjReader::from_vec(data.to_vec());
|
||||
let mut opts = ObjectOptions::default();
|
||||
let retain_duration = TimeDuration::try_from(retain_for).unwrap_or_else(|_| TimeDuration::seconds(0));
|
||||
let retain_until = OffsetDateTime::now_utc() + retain_duration;
|
||||
let retain_until_str = retain_until.format(&Rfc3339).expect("format retain date");
|
||||
let lock_mode_key = X_AMZ_OBJECT_LOCK_MODE.as_str().to_string();
|
||||
let lock_mode_lower = lock_mode_key.to_lowercase();
|
||||
opts.user_defined.insert(lock_mode_lower, "GOVERNANCE".to_string());
|
||||
opts.user_defined.insert(lock_mode_key, "GOVERNANCE".to_string());
|
||||
|
||||
let retain_key = X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_string();
|
||||
let retain_key_lower = retain_key.to_lowercase();
|
||||
opts.user_defined.insert(retain_key_lower, retain_until_str.clone());
|
||||
opts.user_defined.insert(retain_key, retain_until_str);
|
||||
|
||||
(**ecstore)
|
||||
.put_object(bucket, object, &mut reader, &opts)
|
||||
.await
|
||||
.expect("Failed to upload retained object");
|
||||
}
|
||||
|
||||
/// Test helper: Set bucket lifecycle configuration
|
||||
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
@@ -694,4 +872,127 @@ mod serial_tests {
|
||||
|
||||
println!("Lifecycle transition basic test completed");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_respects_object_lock_retention() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-lock-retention-{}", &suffix[..8]);
|
||||
let object_name = "test/locked-object.txt";
|
||||
let test_data = b"retained payload";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_object_with_retention(&ecstore, bucket_name.as_str(), object_name, test_data, Duration::from_secs(3600)).await;
|
||||
|
||||
assert!(
|
||||
object_exists(&ecstore, bucket_name.as_str(), object_name).await,
|
||||
"Object should exist before lifecycle processing"
|
||||
);
|
||||
|
||||
set_bucket_lifecycle(bucket_name.as_str())
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
|
||||
let scanner_config = ScannerConfig {
|
||||
scan_interval: Duration::from_millis(100),
|
||||
deep_scan_interval: Duration::from_millis(500),
|
||||
max_concurrent_scans: 1,
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = Scanner::new(Some(scanner_config), None);
|
||||
scanner.start().await.expect("Failed to start scanner");
|
||||
|
||||
for _ in 0..3 {
|
||||
scanner.scan_cycle().await.expect("scan cycle should succeed");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
|
||||
assert!(
|
||||
object_exists(&ecstore, bucket_name.as_str(), object_name).await,
|
||||
"Object with active retention should not be deleted by lifecycle"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_triggers_replication_heal_for_lagging_and_failed_objects() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("lc-replication-{}", &suffix[..8]);
|
||||
create_test_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
configure_bucket_replication(bucket_name.as_str()).await;
|
||||
let replication_pool = ensure_test_replication_pool().await;
|
||||
|
||||
upload_object_with_replication_status(
|
||||
&ecstore,
|
||||
bucket_name.as_str(),
|
||||
"test/lagging-pending",
|
||||
ReplicationStatusType::Pending,
|
||||
)
|
||||
.await;
|
||||
upload_object_with_replication_status(
|
||||
&ecstore,
|
||||
bucket_name.as_str(),
|
||||
"test/failed-object",
|
||||
ReplicationStatusType::Failed,
|
||||
)
|
||||
.await;
|
||||
|
||||
let scanner_config = ScannerConfig {
|
||||
scan_interval: Duration::from_millis(100),
|
||||
deep_scan_interval: Duration::from_millis(500),
|
||||
max_concurrent_scans: 2,
|
||||
replication_pending_grace: Duration::from_secs(0),
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = Scanner::new(Some(scanner_config), None);
|
||||
|
||||
scanner.scan_cycle().await.expect("scan cycle should complete");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let replica_tasks = replication_pool.take_replica_tasks().await;
|
||||
assert!(
|
||||
replica_tasks.iter().any(|t| t.name == "test/lagging-pending"),
|
||||
"Pending object should be enqueued for replication heal: {:?}",
|
||||
replica_tasks
|
||||
);
|
||||
assert!(
|
||||
replica_tasks.iter().any(|t| t.name == "test/failed-object"),
|
||||
"Failed object should be enqueued for replication heal: {:?}",
|
||||
replica_tasks
|
||||
);
|
||||
|
||||
let metrics = scanner.get_metrics().await;
|
||||
assert_eq!(
|
||||
metrics.replication_tasks_queued,
|
||||
replica_tasks.len() as u64,
|
||||
"Replication tasks queued metric should match recorded tasks"
|
||||
);
|
||||
assert!(
|
||||
metrics.replication_pending_objects >= 1,
|
||||
"Pending replication metric should be incremented"
|
||||
);
|
||||
assert!(metrics.replication_failed_objects >= 1, "Failed replication metric should be incremented");
|
||||
assert!(
|
||||
metrics.replication_lagging_objects >= 1,
|
||||
"Lagging replication metric should track pending object beyond grace"
|
||||
);
|
||||
|
||||
let bucket_metrics = metrics
|
||||
.bucket_metrics
|
||||
.get(&bucket_name)
|
||||
.expect("bucket metrics should contain replication counters");
|
||||
assert!(
|
||||
bucket_metrics.replication_pending >= 1 && bucket_metrics.replication_failed >= 1,
|
||||
"Bucket-level replication metrics should reflect observed statuses"
|
||||
);
|
||||
assert_eq!(
|
||||
bucket_metrics.replication_tasks_queued,
|
||||
replica_tasks.len() as u64,
|
||||
"Bucket-level queued counter should match enqueued tasks"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
rustfs-ecstore.workspace = true
|
||||
rustfs-common.workspace = true
|
||||
flatbuffers.workspace = true
|
||||
futures.workspace = true
|
||||
rustfs-lock.workspace = true
|
||||
@@ -50,4 +49,4 @@ uuid = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
md5 = { workspace = true }
|
||||
md5 = { workspace = true }
|
||||
@@ -1,85 +0,0 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! End-to-end test for Content-Encoding header handling
|
||||
//!
|
||||
//! Tests that the Content-Encoding header is correctly stored during PUT
|
||||
//! and returned in GET/HEAD responses. This is important for clients that
|
||||
//! upload pre-compressed content and rely on the header for decompression.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::common::{RustFSTestEnvironment, init_logging};
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use serial_test::serial;
|
||||
use tracing::info;
|
||||
|
||||
/// Verify Content-Encoding header roundtrips through PUT, GET, and HEAD operations
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_content_encoding_roundtrip() {
|
||||
init_logging();
|
||||
info!("Starting Content-Encoding roundtrip test");
|
||||
|
||||
let mut env = RustFSTestEnvironment::new().await.expect("Failed to create test environment");
|
||||
env.start_rustfs_server(vec![]).await.expect("Failed to start RustFS");
|
||||
|
||||
let client = env.create_s3_client();
|
||||
let bucket = "content-encoding-test";
|
||||
let key = "logs/app.log.zst";
|
||||
let content = b"2024-01-15 10:23:45 INFO Application started\n2024-01-15 10:23:46 DEBUG Loading config\n";
|
||||
|
||||
client
|
||||
.create_bucket()
|
||||
.bucket(bucket)
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to create bucket");
|
||||
|
||||
info!("Uploading object with Content-Encoding: zstd");
|
||||
client
|
||||
.put_object()
|
||||
.bucket(bucket)
|
||||
.key(key)
|
||||
.content_type("text/plain")
|
||||
.content_encoding("zstd")
|
||||
.body(ByteStream::from_static(content))
|
||||
.send()
|
||||
.await
|
||||
.expect("PUT failed");
|
||||
|
||||
info!("Verifying GET response includes Content-Encoding");
|
||||
let get_resp = client.get_object().bucket(bucket).key(key).send().await.expect("GET failed");
|
||||
|
||||
assert_eq!(get_resp.content_encoding(), Some("zstd"), "GET should return Content-Encoding: zstd");
|
||||
assert_eq!(get_resp.content_type(), Some("text/plain"), "GET should return correct Content-Type");
|
||||
|
||||
let body = get_resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(body.as_ref(), content, "Body content mismatch");
|
||||
|
||||
info!("Verifying HEAD response includes Content-Encoding");
|
||||
let head_resp = client
|
||||
.head_object()
|
||||
.bucket(bucket)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.expect("HEAD failed");
|
||||
|
||||
assert_eq!(head_resp.content_encoding(), Some("zstd"), "HEAD should return Content-Encoding: zstd");
|
||||
assert_eq!(head_resp.content_type(), Some("text/plain"), "HEAD should return correct Content-Type");
|
||||
|
||||
env.stop_server();
|
||||
}
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use serial_test::serial;
|
||||
|
||||
use crate::common::{RustFSTestEnvironment, TEST_BUCKET, awscurl_get, init_logging};
|
||||
|
||||
/// Regression test for data usage accuracy (issue #1012).
|
||||
/// Launches rustfs, writes 1000 objects, then asserts admin data usage reports the full count.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[serial]
|
||||
#[ignore = "Starts a rustfs server and requires awscurl; enable when running full E2E"]
|
||||
async fn data_usage_reports_all_objects() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
init_logging();
|
||||
|
||||
let mut env = RustFSTestEnvironment::new().await?;
|
||||
env.start_rustfs_server(vec![]).await?;
|
||||
|
||||
let client = env.create_s3_client();
|
||||
|
||||
// Create bucket and upload objects
|
||||
client.create_bucket().bucket(TEST_BUCKET).send().await?;
|
||||
|
||||
for i in 0..1000 {
|
||||
let key = format!("obj-{i:04}");
|
||||
client
|
||||
.put_object()
|
||||
.bucket(TEST_BUCKET)
|
||||
.key(key)
|
||||
.body(ByteStream::from_static(b"hello-world"))
|
||||
.send()
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Query admin data usage API
|
||||
let url = format!("{}/rustfs/admin/v3/datausageinfo", env.url);
|
||||
let resp = awscurl_get(&url, &env.access_key, &env.secret_key).await?;
|
||||
let usage: DataUsageInfo = serde_json::from_str(&resp)?;
|
||||
|
||||
// Assert total object count and per-bucket count are not truncated
|
||||
let bucket_usage = usage
|
||||
.buckets_usage
|
||||
.get(TEST_BUCKET)
|
||||
.cloned()
|
||||
.expect("bucket usage should exist");
|
||||
|
||||
assert!(
|
||||
usage.objects_total_count >= 1000,
|
||||
"total object count should be at least 1000, got {}",
|
||||
usage.objects_total_count
|
||||
);
|
||||
assert!(
|
||||
bucket_usage.objects_count >= 1000,
|
||||
"bucket object count should be at least 1000, got {}",
|
||||
bucket_usage.objects_count
|
||||
);
|
||||
|
||||
env.stop_server();
|
||||
Ok(())
|
||||
}
|
||||
@@ -18,10 +18,6 @@ mod reliant;
|
||||
#[cfg(test)]
|
||||
pub mod common;
|
||||
|
||||
// Data usage regression tests
|
||||
#[cfg(test)]
|
||||
mod data_usage_test;
|
||||
|
||||
// KMS-specific test modules
|
||||
#[cfg(test)]
|
||||
mod kms;
|
||||
@@ -29,7 +25,3 @@ mod kms;
|
||||
// Special characters in path test modules
|
||||
#[cfg(test)]
|
||||
mod special_chars_test;
|
||||
|
||||
// Content-Encoding header preservation test
|
||||
#[cfg(test)]
|
||||
mod content_encoding_test;
|
||||
|
||||
@@ -283,17 +283,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
|
||||
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
|
||||
);
|
||||
|
||||
// Gracefully handle missing mod_time instead of panicking
|
||||
let mod_time = match obj.mod_time {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
info!("eval_inner: mod_time is None for object={}, returning default event", obj.name);
|
||||
return Event::default();
|
||||
}
|
||||
};
|
||||
|
||||
if mod_time.unix_timestamp() == 0 {
|
||||
if obj.mod_time.expect("err").unix_timestamp() == 0 {
|
||||
info!("eval_inner: mod_time is 0, returning default event");
|
||||
return Event::default();
|
||||
}
|
||||
@@ -333,7 +323,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
}
|
||||
|
||||
if let Some(days) = expiration.days {
|
||||
let expected_expiry = expected_expiry_time(mod_time, days /*, date*/);
|
||||
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
|
||||
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
|
||||
events.push(Event {
|
||||
action: IlmAction::DeleteVersionAction,
|
||||
@@ -456,11 +446,11 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
});
|
||||
}
|
||||
} else if let Some(days) = expiration.days {
|
||||
let expected_expiry: OffsetDateTime = expected_expiry_time(mod_time, days);
|
||||
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
|
||||
info!(
|
||||
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
|
||||
days,
|
||||
mod_time,
|
||||
obj.mod_time.expect("err!"),
|
||||
expected_expiry,
|
||||
now,
|
||||
now.unix_timestamp() > expected_expiry.unix_timestamp()
|
||||
|
||||
@@ -37,7 +37,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
|
||||
|
||||
let mut mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str().to_lowercase().as_str());
|
||||
if mode_str.is_none() {
|
||||
mode_str = Some(&meta[X_AMZ_OBJECT_LOCK_MODE.as_str()]);
|
||||
mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str());
|
||||
}
|
||||
let mode = if let Some(mode_str) = mode_str {
|
||||
parse_ret_mode(mode_str.as_str())
|
||||
@@ -50,7 +50,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
|
||||
|
||||
let mut till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_lowercase().as_str());
|
||||
if till_str.is_none() {
|
||||
till_str = Some(&meta[X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str()]);
|
||||
till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str());
|
||||
}
|
||||
if let Some(till_str) = till_str {
|
||||
let t = OffsetDateTime::parse(till_str, &format_description::well_known::Iso8601::DEFAULT);
|
||||
@@ -67,7 +67,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
|
||||
pub fn get_object_legalhold_meta(meta: HashMap<String, String>) -> ObjectLockLegalHold {
|
||||
let mut hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str().to_lowercase().as_str());
|
||||
if hold_str.is_none() {
|
||||
hold_str = Some(&meta[X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()]);
|
||||
hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str());
|
||||
}
|
||||
if let Some(hold_str) = hold_str {
|
||||
return ObjectLockLegalHold {
|
||||
|
||||
@@ -32,7 +32,6 @@ use rustfs_common::data_usage::{
|
||||
BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary,
|
||||
};
|
||||
use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use tokio::fs;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::error::Error;
|
||||
@@ -64,21 +63,6 @@ lazy_static::lazy_static! {
|
||||
|
||||
/// Store data usage info to backend storage
|
||||
pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: Arc<ECStore>) -> Result<(), Error> {
|
||||
// Prevent older data from overwriting newer persisted stats
|
||||
if let Ok(buf) = read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await {
|
||||
if let Ok(existing) = serde_json::from_slice::<DataUsageInfo>(&buf) {
|
||||
if let (Some(new_ts), Some(existing_ts)) = (data_usage_info.last_update, existing.last_update) {
|
||||
if new_ts <= existing_ts {
|
||||
info!(
|
||||
"Skip persisting data usage: incoming last_update {:?} <= existing {:?}",
|
||||
new_ts, existing_ts
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data =
|
||||
serde_json::to_vec(&data_usage_info).map_err(|e| Error::other(format!("Failed to serialize data usage info: {e}")))?;
|
||||
|
||||
@@ -176,39 +160,6 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
|
||||
}
|
||||
|
||||
/// Aggregate usage information from local disk snapshots.
|
||||
fn merge_snapshot(aggregated: &mut DataUsageInfo, mut snapshot: LocalUsageSnapshot, latest_update: &mut Option<SystemTime>) {
|
||||
if let Some(update) = snapshot.last_update {
|
||||
if latest_update.is_none_or(|current| update > current) {
|
||||
*latest_update = Some(update);
|
||||
}
|
||||
}
|
||||
|
||||
snapshot.recompute_totals();
|
||||
|
||||
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
|
||||
aggregated.versions_total_count = aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
|
||||
aggregated.delete_markers_total_count = aggregated
|
||||
.delete_markers_total_count
|
||||
.saturating_add(snapshot.delete_markers_total_count);
|
||||
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
|
||||
|
||||
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
|
||||
let bucket_size = usage.size;
|
||||
match aggregated.buckets_usage.entry(bucket.clone()) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(usage.clone());
|
||||
}
|
||||
}
|
||||
|
||||
aggregated
|
||||
.bucket_sizes
|
||||
.entry(bucket)
|
||||
.and_modify(|size| *size = size.saturating_add(bucket_size))
|
||||
.or_insert(bucket_size);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskUsageStatus>, DataUsageInfo), Error> {
|
||||
let mut aggregated = DataUsageInfo::default();
|
||||
let mut latest_update: Option<SystemTime> = None;
|
||||
@@ -245,24 +196,7 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
|
||||
snapshot_exists: false,
|
||||
};
|
||||
|
||||
let snapshot_result = read_local_snapshot(root.as_path(), &disk_id).await;
|
||||
|
||||
// If a snapshot is corrupted or unreadable, skip it but keep processing others
|
||||
if let Err(err) = &snapshot_result {
|
||||
warn!(
|
||||
"Failed to read data usage snapshot for disk {} (pool {}, set {}, disk {}): {}",
|
||||
disk_id, pool_idx, set_disks.set_index, disk_index, err
|
||||
);
|
||||
// Best-effort cleanup so next scan can rebuild a fresh snapshot instead of repeatedly failing
|
||||
let snapshot_file = snapshot_path(root.as_path(), &disk_id);
|
||||
if let Err(remove_err) = fs::remove_file(&snapshot_file).await {
|
||||
if remove_err.kind() != std::io::ErrorKind::NotFound {
|
||||
warn!("Failed to remove corrupted snapshot {:?}: {}", snapshot_file, remove_err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(Some(mut snapshot)) = snapshot_result {
|
||||
if let Some(mut snapshot) = read_local_snapshot(root.as_path(), &disk_id).await? {
|
||||
status.last_update = snapshot.last_update;
|
||||
status.snapshot_exists = true;
|
||||
|
||||
@@ -279,7 +213,37 @@ pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskU
|
||||
snapshot.meta.disk_index = Some(disk_index);
|
||||
}
|
||||
|
||||
merge_snapshot(&mut aggregated, snapshot, &mut latest_update);
|
||||
snapshot.recompute_totals();
|
||||
|
||||
if let Some(update) = snapshot.last_update {
|
||||
if latest_update.is_none_or(|current| update > current) {
|
||||
latest_update = Some(update);
|
||||
}
|
||||
}
|
||||
|
||||
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
|
||||
aggregated.versions_total_count =
|
||||
aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
|
||||
aggregated.delete_markers_total_count = aggregated
|
||||
.delete_markers_total_count
|
||||
.saturating_add(snapshot.delete_markers_total_count);
|
||||
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
|
||||
|
||||
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
|
||||
let bucket_size = usage.size;
|
||||
match aggregated.buckets_usage.entry(bucket.clone()) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(usage.clone());
|
||||
}
|
||||
}
|
||||
|
||||
aggregated
|
||||
.bucket_sizes
|
||||
.entry(bucket)
|
||||
.and_modify(|size| *size = size.saturating_add(bucket_size))
|
||||
.or_insert(bucket_size);
|
||||
}
|
||||
}
|
||||
|
||||
statuses.push(status);
|
||||
@@ -585,94 +549,3 @@ pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate:
|
||||
save_config(store, &name, buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rustfs_common::data_usage::BucketUsageInfo;
|
||||
|
||||
fn aggregate_for_test(
|
||||
inputs: Vec<(DiskUsageStatus, Result<Option<LocalUsageSnapshot>, Error>)>,
|
||||
) -> (Vec<DiskUsageStatus>, DataUsageInfo) {
|
||||
let mut aggregated = DataUsageInfo::default();
|
||||
let mut latest_update: Option<SystemTime> = None;
|
||||
let mut statuses = Vec::new();
|
||||
|
||||
for (mut status, snapshot_result) in inputs {
|
||||
if let Ok(Some(snapshot)) = snapshot_result {
|
||||
status.snapshot_exists = true;
|
||||
status.last_update = snapshot.last_update;
|
||||
merge_snapshot(&mut aggregated, snapshot, &mut latest_update);
|
||||
}
|
||||
statuses.push(status);
|
||||
}
|
||||
|
||||
aggregated.buckets_count = aggregated.buckets_usage.len() as u64;
|
||||
aggregated.last_update = latest_update;
|
||||
aggregated.disk_usage_status = statuses.clone();
|
||||
|
||||
(statuses, aggregated)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aggregate_skips_corrupted_snapshot_and_preserves_other_disks() {
|
||||
let mut good_snapshot = LocalUsageSnapshot::new(LocalUsageSnapshotMeta {
|
||||
disk_id: "good-disk".to_string(),
|
||||
pool_index: Some(0),
|
||||
set_index: Some(0),
|
||||
disk_index: Some(0),
|
||||
});
|
||||
good_snapshot.last_update = Some(SystemTime::now());
|
||||
good_snapshot.buckets_usage.insert(
|
||||
"bucket-a".to_string(),
|
||||
BucketUsageInfo {
|
||||
objects_count: 3,
|
||||
versions_count: 3,
|
||||
size: 42,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
good_snapshot.recompute_totals();
|
||||
|
||||
let bad_snapshot_err: Result<Option<LocalUsageSnapshot>, Error> = Err(Error::other("corrupted snapshot payload"));
|
||||
|
||||
let inputs = vec![
|
||||
(
|
||||
DiskUsageStatus {
|
||||
disk_id: "bad-disk".to_string(),
|
||||
pool_index: Some(0),
|
||||
set_index: Some(0),
|
||||
disk_index: Some(1),
|
||||
last_update: None,
|
||||
snapshot_exists: false,
|
||||
},
|
||||
bad_snapshot_err,
|
||||
),
|
||||
(
|
||||
DiskUsageStatus {
|
||||
disk_id: "good-disk".to_string(),
|
||||
pool_index: Some(0),
|
||||
set_index: Some(0),
|
||||
disk_index: Some(0),
|
||||
last_update: None,
|
||||
snapshot_exists: false,
|
||||
},
|
||||
Ok(Some(good_snapshot)),
|
||||
),
|
||||
];
|
||||
|
||||
let (statuses, aggregated) = aggregate_for_test(inputs);
|
||||
|
||||
// Bad disk stays non-existent, good disk is marked present
|
||||
let bad_status = statuses.iter().find(|s| s.disk_id == "bad-disk").unwrap();
|
||||
assert!(!bad_status.snapshot_exists);
|
||||
let good_status = statuses.iter().find(|s| s.disk_id == "good-disk").unwrap();
|
||||
assert!(good_status.snapshot_exists);
|
||||
|
||||
// Aggregated data is from good snapshot only
|
||||
assert_eq!(aggregated.objects_total_count, 3);
|
||||
assert_eq!(aggregated.objects_total_size, 42);
|
||||
assert_eq!(aggregated.buckets_count, 1);
|
||||
assert_eq!(aggregated.buckets_usage.get("bucket-a").map(|b| (b.objects_count, b.size)), Some((3, 42)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,22 +198,15 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_file_path(&self) -> String {
|
||||
let path: &str = self.url.path();
|
||||
let decoded: std::borrow::Cow<'_, str> = match urlencoding::decode(path) {
|
||||
Ok(decoded) => decoded,
|
||||
Err(e) => {
|
||||
debug!("Failed to decode path '{}': {}, using original path", path, e);
|
||||
std::borrow::Cow::Borrowed(path)
|
||||
}
|
||||
};
|
||||
pub fn get_file_path(&self) -> &str {
|
||||
let path = self.url.path();
|
||||
#[cfg(windows)]
|
||||
if self.url.scheme() == "file" {
|
||||
let stripped: &str = decoded.strip_prefix('/').unwrap_or(&decoded);
|
||||
let stripped = path.strip_prefix('/').unwrap_or(path);
|
||||
debug!("get_file_path windows: path={}", stripped);
|
||||
return stripped.to_string();
|
||||
return stripped;
|
||||
}
|
||||
decoded.into_owned()
|
||||
path
|
||||
}
|
||||
}
|
||||
|
||||
@@ -508,45 +501,6 @@ mod test {
|
||||
assert_eq!(endpoint.get_type(), EndpointType::Path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_with_spaces_in_path() {
|
||||
let path_with_spaces = "/Users/test/Library/Application Support/rustfs/data";
|
||||
let endpoint = Endpoint::try_from(path_with_spaces).unwrap();
|
||||
assert_eq!(endpoint.get_file_path(), path_with_spaces);
|
||||
assert!(endpoint.is_local);
|
||||
assert_eq!(endpoint.get_type(), EndpointType::Path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_percent_encoding_roundtrip() {
|
||||
let path_with_spaces = "/Users/test/Library/Application Support/rustfs/data";
|
||||
let endpoint = Endpoint::try_from(path_with_spaces).unwrap();
|
||||
|
||||
// Verify that the URL internally stores percent-encoded path
|
||||
assert!(
|
||||
endpoint.url.path().contains("%20"),
|
||||
"URL path should contain percent-encoded spaces: {}",
|
||||
endpoint.url.path()
|
||||
);
|
||||
|
||||
// Verify that get_file_path() decodes the percent-encoded path correctly
|
||||
assert_eq!(
|
||||
endpoint.get_file_path(),
|
||||
"/Users/test/Library/Application Support/rustfs/data",
|
||||
"get_file_path() should decode percent-encoded spaces"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_with_various_special_characters() {
|
||||
// Test path with multiple special characters that get percent-encoded
|
||||
let path_with_special = "/tmp/test path/data[1]/file+name&more";
|
||||
let endpoint = Endpoint::try_from(path_with_special).unwrap();
|
||||
|
||||
// get_file_path() should return the original path with decoded characters
|
||||
assert_eq!(endpoint.get_file_path(), path_with_special);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_endpoint_update_is_local() {
|
||||
let mut endpoint = Endpoint::try_from("http://localhost:9000/path").unwrap();
|
||||
|
||||
@@ -232,7 +232,7 @@ impl PoolEndpointList {
|
||||
|
||||
for endpoints in pool_endpoint_list.inner.iter_mut() {
|
||||
// Check whether same path is not used in endpoints of a host on different port.
|
||||
let mut path_ip_map: HashMap<String, HashSet<IpAddr>> = HashMap::new();
|
||||
let mut path_ip_map: HashMap<&str, HashSet<IpAddr>> = HashMap::new();
|
||||
let mut host_ip_cache = HashMap::new();
|
||||
for ep in endpoints.as_ref() {
|
||||
if !ep.url.has_host() {
|
||||
@@ -275,9 +275,8 @@ impl PoolEndpointList {
|
||||
match path_ip_map.entry(path) {
|
||||
Entry::Occupied(mut e) => {
|
||||
if e.get().intersection(host_ip_set).count() > 0 {
|
||||
let path_key = e.key().clone();
|
||||
return Err(Error::other(format!(
|
||||
"same path '{path_key}' can not be served by different port on same address"
|
||||
"same path '{path}' can not be served by different port on same address"
|
||||
)));
|
||||
}
|
||||
e.get_mut().extend(host_ip_set.iter());
|
||||
@@ -296,7 +295,7 @@ impl PoolEndpointList {
|
||||
}
|
||||
|
||||
let path = ep.get_file_path();
|
||||
if local_path_set.contains(&path) {
|
||||
if local_path_set.contains(path) {
|
||||
return Err(Error::other(format!(
|
||||
"path '{path}' cannot be served by different address on same server"
|
||||
)));
|
||||
|
||||
@@ -34,8 +34,8 @@ use rustfs_madmin::heal_commands::HealResultItem;
|
||||
use rustfs_rio::Checksum;
|
||||
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::http::AMZ_STORAGE_CLASS;
|
||||
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_STORAGE_CLASS};
|
||||
use rustfs_utils::path::decode_dir_object;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
@@ -745,7 +745,22 @@ impl ObjectInfo {
|
||||
let inlined = fi.inline_data();
|
||||
|
||||
// TODO:expires
|
||||
// TODO:ReplicationState
|
||||
|
||||
let mut replication_status_internal = None;
|
||||
let mut version_purge_status_internal = None;
|
||||
if let Some(replication_state) = fi.replication_state_internal.as_ref() {
|
||||
replication_status_internal = replication_state.replication_status_internal.clone();
|
||||
version_purge_status_internal = replication_state.version_purge_status_internal.clone();
|
||||
}
|
||||
let mut replication_status = fi.replication_status();
|
||||
if replication_status.is_empty()
|
||||
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS)
|
||||
&& status == ReplicationStatusType::Replica.as_str()
|
||||
{
|
||||
replication_status = ReplicationStatusType::Replica;
|
||||
}
|
||||
|
||||
let version_purge_status = fi.version_purge_status();
|
||||
|
||||
let transitioned_object = TransitionedObject {
|
||||
name: fi.transitioned_objname.clone(),
|
||||
@@ -810,6 +825,10 @@ impl ObjectInfo {
|
||||
transitioned_object,
|
||||
checksum: fi.checksum.clone(),
|
||||
storage_class,
|
||||
replication_status_internal,
|
||||
version_purge_status_internal,
|
||||
replication_status,
|
||||
version_purge_status,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -827,12 +846,7 @@ impl ObjectInfo {
|
||||
for entry in entries.entries() {
|
||||
if entry.is_object() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
if let Some(idx) = remaining.find(delimiter.as_str()) {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
@@ -883,14 +897,7 @@ impl ObjectInfo {
|
||||
|
||||
if entry.is_dir() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
if let Some(idx) = {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
remaining.find(delimiter.as_str())
|
||||
} {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
@@ -926,12 +933,7 @@ impl ObjectInfo {
|
||||
for entry in entries.entries() {
|
||||
if entry.is_object() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
if let Some(idx) = remaining.find(delimiter.as_str()) {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
@@ -968,14 +970,7 @@ impl ObjectInfo {
|
||||
|
||||
if entry.is_dir() {
|
||||
if let Some(delimiter) = &delimiter {
|
||||
if let Some(idx) = {
|
||||
let remaining = if entry.name.starts_with(prefix) {
|
||||
&entry.name[prefix.len()..]
|
||||
} else {
|
||||
entry.name.as_str()
|
||||
};
|
||||
remaining.find(delimiter.as_str())
|
||||
} {
|
||||
if let Some(idx) = entry.name.trim_start_matches(prefix).find(delimiter) {
|
||||
let idx = prefix.len() + idx + delimiter.len();
|
||||
if let Some(curr_prefix) = entry.name.get(0..idx) {
|
||||
if curr_prefix == prev_prefix {
|
||||
|
||||
@@ -314,13 +314,15 @@ impl ECStore {
|
||||
|
||||
// contextCanceled
|
||||
|
||||
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(
|
||||
&list_result.entries.unwrap_or_default(),
|
||||
bucket,
|
||||
prefix,
|
||||
delimiter.clone(),
|
||||
)
|
||||
.await;
|
||||
let entries = list_result.entries.unwrap_or_default();
|
||||
for entry in entries.entries() {
|
||||
if entry.is_object() {
|
||||
let fi = entry.to_fileinfo(bucket).unwrap();
|
||||
tracing::warn!("list_objects_generic file_info: {:?}", fi);
|
||||
}
|
||||
}
|
||||
|
||||
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(&entries, bucket, prefix, delimiter.clone()).await;
|
||||
|
||||
let is_truncated = {
|
||||
if max_keys > 0 && get_objects.len() > max_keys as usize {
|
||||
|
||||
@@ -85,7 +85,6 @@ services:
|
||||
- RUSTFS_ACCESS_KEY=devadmin
|
||||
- RUSTFS_SECRET_KEY=devadmin
|
||||
- RUSTFS_OBS_LOGGER_LEVEL=debug
|
||||
- RUSTFS_OBS_LOG_DIRECTORY=/logs
|
||||
volumes:
|
||||
- .:/app # Mount source code to /app for development
|
||||
- deploy/data/dev:/data
|
||||
@@ -181,6 +180,20 @@ services:
|
||||
profiles:
|
||||
- observability
|
||||
|
||||
# Redis for caching (optional)
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
container_name: redis
|
||||
ports:
|
||||
- "6379:6379"
|
||||
volumes:
|
||||
- redis_data:/data
|
||||
networks:
|
||||
- rustfs-network
|
||||
restart: unless-stopped
|
||||
profiles:
|
||||
- cache
|
||||
|
||||
# NGINX reverse proxy (optional)
|
||||
nginx:
|
||||
security_opt:
|
||||
@@ -228,5 +241,7 @@ volumes:
|
||||
driver: local
|
||||
grafana_data:
|
||||
driver: local
|
||||
redis_data:
|
||||
driver: local
|
||||
logs:
|
||||
driver: local
|
||||
|
||||
@@ -29,7 +29,6 @@ x-node-template: &node-template
|
||||
- RUSTFS_ACCESS_KEY=rustfsadmin
|
||||
- RUSTFS_SECRET_KEY=rustfsadmin
|
||||
- RUSTFS_CMD=rustfs
|
||||
- RUSTFS_OBS_LOG_DIRECTORY=/logs
|
||||
command: [ "sh", "-c", "sleep 3 && rustfs" ]
|
||||
healthcheck:
|
||||
test:
|
||||
|
||||
@@ -55,20 +55,7 @@ process_data_volumes() {
|
||||
|
||||
# 3) Process log directory (separate from data volumes)
|
||||
process_log_directory() {
|
||||
# Output logs to stdout
|
||||
if [ -z "$RUSTFS_OBS_LOG_DIRECTORY" ]; then
|
||||
echo "OBS log directory not configured and logs outputs to stdout"
|
||||
return
|
||||
fi
|
||||
|
||||
# Output logs to remote endpoint
|
||||
if [ "${RUSTFS_OBS_LOG_DIRECTORY}" != "${RUSTFS_OBS_LOG_DIRECTORY#*://}" ]; then
|
||||
echo "Output logs to remote endpoint"
|
||||
return
|
||||
fi
|
||||
|
||||
# Outputs logs to local directory
|
||||
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY}"
|
||||
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY:-/logs}"
|
||||
|
||||
echo "Initializing log directory: $LOG_DIR"
|
||||
if [ ! -d "$LOG_DIR" ]; then
|
||||
|
||||
@@ -90,6 +90,7 @@ pub mod trace;
|
||||
pub mod user;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub struct IsAdminResponse {
|
||||
pub is_admin: bool,
|
||||
pub access_key: String,
|
||||
@@ -158,15 +159,14 @@ impl Operation for IsAdminHandler {
|
||||
return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
};
|
||||
|
||||
let (cred, _owner) =
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let access_key_to_check = input_cred.access_key.clone();
|
||||
|
||||
// Check if the user is admin by comparing with global credentials
|
||||
let is_admin = if let Some(sys_cred) = get_global_action_cred() {
|
||||
crate::auth::constant_time_eq(&access_key_to_check, &sys_cred.access_key)
|
||||
|| crate::auth::constant_time_eq(&cred.parent_user, &sys_cred.access_key)
|
||||
sys_cred.access_key == access_key_to_check
|
||||
} else {
|
||||
false
|
||||
};
|
||||
@@ -174,7 +174,7 @@ impl Operation for IsAdminHandler {
|
||||
let response = IsAdminResponse {
|
||||
is_admin,
|
||||
access_key: access_key_to_check,
|
||||
message: format!("User is {}an administrator", if is_admin { "" } else { "not " }),
|
||||
message: format!("User is {} an administrator", if is_admin { "" } else { "not" }),
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&response)
|
||||
|
||||
@@ -2272,7 +2272,6 @@ impl S3 for FS {
|
||||
content_length: Some(response_content_length),
|
||||
last_modified,
|
||||
content_type,
|
||||
content_encoding: info.content_encoding.clone(),
|
||||
accept_ranges: Some("bytes".to_string()),
|
||||
content_range,
|
||||
e_tag: info.etag.map(|etag| to_s3s_etag(&etag)),
|
||||
@@ -2488,7 +2487,6 @@ impl S3 for FS {
|
||||
let output = HeadObjectOutput {
|
||||
content_length: Some(content_length),
|
||||
content_type,
|
||||
content_encoding: info.content_encoding.clone(),
|
||||
last_modified,
|
||||
e_tag: info.etag.map(|etag| to_s3s_etag(&etag)),
|
||||
metadata: filter_object_metadata(&metadata_map),
|
||||
@@ -4520,16 +4518,18 @@ impl S3 for FS {
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let rules = match metadata_sys::get_lifecycle_config(&bucket).await {
|
||||
Ok((cfg, _)) => cfg.rules,
|
||||
Ok((cfg, _)) => Some(cfg.rules),
|
||||
Err(_err) => {
|
||||
// Return NoSuchLifecycleConfiguration error as expected by S3 clients
|
||||
// This fixes issue #990 where Ansible S3 roles fail with KeyError: 'Rules'
|
||||
return Err(s3_error!(NoSuchLifecycleConfiguration));
|
||||
// if BucketMetadataError::BucketLifecycleNotFound.is(&err) {
|
||||
// return Err(s3_error!(NoSuchLifecycleConfiguration));
|
||||
// }
|
||||
// warn!("get_lifecycle_config err {:?}", err);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
|
||||
rules: Some(rules),
|
||||
rules,
|
||||
..Default::default()
|
||||
}))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user