diff --git a/Makefile b/Makefile index 363cd557..60bfce83 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,12 @@ check: .PHONY: test test: @echo "🧪 Running tests..." - cargo nextest run --all --exclude e2e_test + @if command -v cargo-nextest >/dev/null 2>&1; then \ + cargo nextest run --all --exclude e2e_test; \ + else \ + echo "ℹ️ cargo-nextest not found; falling back to 'cargo test'"; \ + cargo test --workspace --exclude e2e_test -- --nocapture; \ + fi cargo test --all --doc .PHONY: pre-commit diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index df22dddd..fd9d1d6c 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -25,6 +25,8 @@ use ecstore::{ use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend}; use rustfs_filemeta::{MetacacheReader, VersionType}; use tokio::sync::{Mutex, RwLock}; +use tokio::task::yield_now; +use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -70,18 +72,27 @@ pub struct ScannerConfig { pub scan_mode: ScanMode, /// Whether to enable data usage statistics collection pub enable_data_usage_stats: bool, + /// Skip objects modified within this window to avoid racing with active writes + pub skip_recently_modified_within: Duration, + /// Throttle: after scanning this many objects, sleep for a short delay to reduce IO contention + pub throttle_every_n_objects: u32, + /// Throttle delay duration per throttle tick + pub throttle_delay: Duration, } impl Default for ScannerConfig { fn default() -> Self { Self { - scan_interval: Duration::from_secs(60), // 1 minute - deep_scan_interval: Duration::from_secs(3600), // 1 hour - max_concurrent_scans: 20, + scan_interval: Duration::from_secs(3600), // 1 hour + deep_scan_interval: Duration::from_secs(86400), // 1 day + max_concurrent_scans: 10, enable_healing: true, enable_metrics: true, scan_mode: ScanMode::Normal, enable_data_usage_stats: true, + skip_recently_modified_within: Duration::from_secs(600), // 10 minutes + throttle_every_n_objects: 200, + throttle_delay: Duration::from_millis(2), } } } @@ -1244,6 +1255,11 @@ impl Scanner { let mut objects_scanned = 0u64; let mut objects_with_issues = 0u64; let mut object_metadata = HashMap::new(); + // snapshot throttling/grace config + let cfg_snapshot = self.config.read().await.clone(); + let throttle_n = cfg_snapshot.throttle_every_n_objects.max(1); + let throttle_delay = cfg_snapshot.throttle_delay; + let skip_recent = cfg_snapshot.skip_recently_modified_within; // Process each object entry while let Ok(Some(mut entry)) = reader.peek().await { @@ -1254,6 +1270,28 @@ impl Scanner { // Parse object metadata if let Ok(file_meta) = entry.xl_meta() { + // Skip recently modified objects to avoid racing with active writes + if let Some(latest_mt) = file_meta.versions.iter().filter_map(|v| v.header.mod_time).max() { + let ts_nanos = latest_mt.unix_timestamp_nanos(); + let latest_st = if ts_nanos >= 0 { + std::time::UNIX_EPOCH + Duration::from_nanos(ts_nanos as u64) + } else { + std::time::UNIX_EPOCH + }; + if let Ok(elapsed) = SystemTime::now().duration_since(latest_st) { + if elapsed < skip_recent { + debug!( + "Skipping recently modified object {}/{} (elapsed {:?} < {:?})", + bucket, entry.name, elapsed, skip_recent + ); + if (objects_scanned as u32) % throttle_n == 0 { + sleep(throttle_delay).await; + yield_now().await; + } + continue; + } + } + } if file_meta.versions.is_empty() { objects_with_issues += 1; warn!("Object {} has no versions", entry.name); @@ -1319,6 +1357,12 @@ impl Scanner { } } } + + // lightweight throttling to reduce IO contention + if (objects_scanned as u32) % throttle_n == 0 { + sleep(throttle_delay).await; + yield_now().await; + } } // Update metrics @@ -1406,6 +1450,14 @@ impl Scanner { // Step 2: Identify missing objects and perform EC verification let mut objects_needing_heal = 0u64; let mut objects_with_ec_issues = 0u64; + // snapshot config for gating and throttling + let cfg_snapshot = self.config.read().await.clone(); + let skip_recent = cfg_snapshot.skip_recently_modified_within; + let throttle_n = cfg_snapshot.throttle_every_n_objects.max(1); + let throttle_delay = cfg_snapshot.throttle_delay; + let deep_mode = cfg_snapshot.scan_mode == ScanMode::Deep; + // cfg_snapshot is a plain value clone; no guard to explicitly drop here. + let mut iter_count: u64 = 0; for (bucket, objects) in &all_objects { // Skip internal RustFS system bucket to avoid lengthy checks on temporary/trash objects @@ -1413,6 +1465,7 @@ impl Scanner { continue; } for object_name in objects { + iter_count += 1; let key = (bucket.clone(), object_name.clone()); let empty_vec = Vec::new(); let locations = object_locations.get(&key).unwrap_or(&empty_vec); @@ -1444,8 +1497,51 @@ impl Scanner { continue; } + // Skip recently modified objects to avoid racing with writes + let is_recent = (|| { + for &disk_idx in locations { + if let Some(bucket_map) = all_disk_objects.get(disk_idx) { + if let Some(file_map) = bucket_map.get(bucket) { + if let Some(fm) = file_map.get(object_name) { + if let Some(mt) = fm.versions.iter().filter_map(|v| v.header.mod_time).max() { + let ts_nanos = mt.unix_timestamp_nanos(); + let mt_st = if ts_nanos >= 0 { + std::time::UNIX_EPOCH + Duration::from_nanos(ts_nanos as u64) + } else { + std::time::UNIX_EPOCH + }; + if let Ok(elapsed) = SystemTime::now().duration_since(mt_st) { + if elapsed < skip_recent { + return true; + } + } + } + } + } + } + } + false + })(); + if is_recent { + debug!("Skipping missing-objects heal check for recently modified {}/{}", bucket, object_name); + if (iter_count as u32) % throttle_n == 0 { + sleep(throttle_delay).await; + yield_now().await; + } + continue; + } + + // Only attempt missing-object heal checks in Deep scan mode + if !deep_mode { + if (iter_count as u32) % throttle_n == 0 { + sleep(throttle_delay).await; + yield_now().await; + } + continue; + } + // Check if object is missing from some disks - if locations.len() < disks.len() { + if !locations.is_empty() && locations.len() < disks.len() { // Before submitting heal, confirm the object still exists logically. let should_heal = if let Some(store) = rustfs_ecstore::new_object_layer_fn() { match store.get_object_info(bucket, object_name, &Default::default()).await { @@ -1473,6 +1569,10 @@ impl Scanner { }; if !should_heal { + if (iter_count as u32) % throttle_n == 0 { + sleep(throttle_delay).await; + yield_now().await; + } continue; } objects_needing_heal += 1; @@ -1516,6 +1616,11 @@ impl Scanner { warn!("Object integrity verification failed for object {}/{}: {}", bucket, object_name, e); } } + + if (iter_count as u32) % throttle_n == 0 { + sleep(throttle_delay).await; + yield_now().await; + } } } diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index edda32dc..e80400e3 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -108,11 +108,17 @@ async fn setup_test_env() -> (Vec, Arc, Arc, bucket_name: &str) { - (**ecstore) - .make_bucket(bucket_name, &Default::default()) - .await - .expect("Failed to create test bucket"); - info!("Created test bucket: {}", bucket_name); + match (**ecstore).make_bucket(bucket_name, &Default::default()).await { + Ok(_) => info!("Created test bucket: {}", bucket_name), + Err(e) => { + // If the bucket already exists from a previous test run in the shared env, ignore. + if matches!(e, rustfs_ecstore::error::StorageError::BucketExists(_)) { + info!("Bucket already exists, continuing: {}", bucket_name); + } else { + panic!("Failed to create test bucket: {e:?}"); + } + } + } } /// Test helper: Upload test object diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index 88087f80..7c39333c 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -151,6 +151,7 @@ async fn object_exists(ecstore: &Arc, bucket: &str, object: &str) -> bo #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] +#[ignore = "Please run it manually."] async fn test_lifecycle_expiry_basic() { let (_disk_paths, ecstore) = setup_test_env().await; diff --git a/crates/ecstore/src/client/transition_api.rs b/crates/ecstore/src/client/transition_api.rs index fba6eda2..5508075c 100644 --- a/crates/ecstore/src/client/transition_api.rs +++ b/crates/ecstore/src/client/transition_api.rs @@ -569,7 +569,16 @@ impl TransitionClient { } pub fn is_virtual_host_style_request(&self, url: &Url, bucket_name: &str) -> bool { - if bucket_name == "" { + // Contract: + // - return true if we should use virtual-hosted-style addressing (bucket as subdomain) + // Heuristics (aligned with AWS S3/MinIO clients): + // - explicit DNS mode => true + // - explicit PATH mode => false + // - AUTO: + // - bucket must be non-empty and DNS compatible + // - endpoint host must be a DNS name (not an IPv4/IPv6 literal) + // - when using TLS (https), buckets with dots are avoided due to wildcard/cert issues + if bucket_name.is_empty() { return false; } @@ -581,7 +590,46 @@ impl TransitionClient { return false; } - false + // AUTO + let host = match url.host_str() { + Some(h) => h, + None => return false, + }; + + // If endpoint is an IP address, do not use virtual host style + let is_ip = host.parse::().is_ok(); + if is_ip { + return false; + } + + // Basic DNS bucket validation: lowercase letters, numbers, dot and hyphen; must start/end alnum + let is_dns_compatible = { + let bytes = bucket_name.as_bytes(); + let start_end_ok = bucket_name + .chars() + .next() + .map(|c| c.is_ascii_lowercase() || c.is_ascii_digit()) + .unwrap_or(false) + && bucket_name + .chars() + .last() + .map(|c| c.is_ascii_lowercase() || c.is_ascii_digit()) + .unwrap_or(false); + let middle_ok = bytes + .iter() + .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || *b == b'-' || *b == b'.'); + start_end_ok && middle_ok && bucket_name.len() >= 3 && bucket_name.len() <= 63 + }; + if !is_dns_compatible { + return false; + } + + // When using TLS, avoid buckets with dots to prevent cert/SNI mismatch unless a wildcard cert is ensured. + if self.secure && bucket_name.contains('.') { + return false; + } + + true } pub fn cred_context(&self) -> CredContext { @@ -987,3 +1035,56 @@ impl tower::Service> for SendRequest { #[derive(Serialize, Deserialize)] pub struct Document(pub String); + +#[cfg(test)] +mod tests { + use super::*; + + fn mk_client(endpoint: &str, secure: bool, lookup: BucketLookupType) -> TransitionClient { + let creds = Credentials::new(Static(Default::default())); + let opts = Options { + creds, + secure, + bucket_lookup: lookup, + ..Default::default() + }; + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async { TransitionClient::new(endpoint, opts).await.unwrap() }) + } + + #[test] + fn test_is_virtual_host_auto_http_domain_dns_bucket() { + let cl = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupAuto); + assert!(cl.is_virtual_host_style_request(&cl.endpoint_url(), "test")); + } + + #[test] + fn test_is_virtual_host_auto_http_ip() { + let cl = mk_client("127.0.0.1:9000", false, BucketLookupType::BucketLookupAuto); + assert!(!cl.is_virtual_host_style_request(&cl.endpoint_url(), "test")); + } + + #[test] + fn test_is_virtual_host_auto_https_bucket_with_dot_disallowed() { + let cl = mk_client("s3.example.com:443", true, BucketLookupType::BucketLookupAuto); + assert!(!cl.is_virtual_host_style_request(&cl.endpoint_url(), "te.st")); + } + + #[test] + fn test_is_virtual_host_dns_forced() { + let cl = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupDNS); + assert!(cl.is_virtual_host_style_request(&cl.endpoint_url(), "test")); + } + + #[test] + fn test_target_url_vhost_and_path() { + let cl_v = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupDNS); + let url_v = cl_v.make_target_url("test", "obj.txt", "", true, &HashMap::new()).unwrap(); + assert_eq!(url_v.as_str(), "http://test.s3.example.com:9000/obj.txt"); + + let cl_p = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupPath); + let url_p = cl_p.make_target_url("test", "obj.txt", "", false, &HashMap::new()).unwrap(); + assert_eq!(url_p.as_str(), "http://s3.example.com:9000/test/obj.txt"); + } +} diff --git a/crates/signer/src/request_signature_v2.rs b/crates/signer/src/request_signature_v2.rs index 17666705..032924fd 100644 --- a/crates/signer/src/request_signature_v2.rs +++ b/crates/signer/src/request_signature_v2.rs @@ -26,8 +26,42 @@ use s3s::Body; const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; const SIGN_V2_ALGORITHM: &str = "AWS"; -fn encode_url2path(req: &request::Request, _virtual_host: bool) -> String { - req.uri().path().to_string() +fn encode_url2path(req: &request::Request, virtual_host: bool) -> String { + // In virtual-hosted-style, the canonical resource must include "/{bucket}" prefix + // extracted from the Host header: bucket.domain.tld -> "/bucket" + let mut path = req.uri().path().to_string(); + if virtual_host { + let host = super::utils::get_host_addr(req); + // strip port if present + let host = match host.split_once(':') { + Some((h, _)) => h, + None => host.as_str(), + }; + // If host has at least 3 labels (bucket + domain + tld), take first label as bucket + if let Some((bucket, _rest)) = host.split_once('.') { + if !bucket.is_empty() { + // avoid duplicating if path already starts with /bucket/ + let expected_prefix = format!("/{bucket}"); + if !path.starts_with(&expected_prefix) { + // Only prefix for bucket-level paths; ensure a single slash separation + if path == "/" { + path = expected_prefix; + } else { + path = format!( + "{}{}", + expected_prefix, + if path.starts_with('/') { + path.clone() + } else { + format!("/{path}") + } + ); + } + } + } + } + } + path } pub fn pre_sign_v2( @@ -239,3 +273,39 @@ fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Request } } } + +#[cfg(test)] +mod tests { + use super::*; + use http::Request; + + fn mk_req(host: &str, path: &str, query: &str) -> request::Request { + let uri = if query.is_empty() { + format!("http://{host}{path}") + } else { + format!("http://{host}{path}?{query}") + }; + let mut req = Request::builder().uri(uri).method("GET").body(Body::empty()).unwrap(); + // minimal headers used by signers + let h = req.headers_mut(); + h.insert("Content-Md5", "".parse().unwrap()); + h.insert("Content-Type", "".parse().unwrap()); + h.insert("Date", "Thu, 21 Aug 2025 00:00:00 +0000".parse().unwrap()); + h.insert("host", host.parse().unwrap()); + req + } + + #[test] + fn test_encode_url2path_vhost_prefixes_bucket() { + let req = mk_req("test.example.com", "/obj.txt", ""); + let path = encode_url2path(&req, true); + assert_eq!(path, "/test/obj.txt"); + } + + #[test] + fn test_encode_url2path_path_style_unchanged() { + let req = mk_req("example.com", "/test/obj.txt", "uploads="); + let path = encode_url2path(&req, false); + assert_eq!(path, "/test/obj.txt"); + } +}