* fix: revet #443 #446

* fix
This commit is contained in:
houseme
2025-08-23 17:30:06 +08:00
committed by GitHub
parent f1c50fcb74
commit fd2aab2bd9
6 changed files with 79 additions and 344 deletions

View File

@@ -25,8 +25,6 @@ use ecstore::{
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
use rustfs_filemeta::{MetacacheReader, VersionType};
use tokio::sync::{Mutex, RwLock};
use tokio::task::yield_now;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@@ -72,27 +70,18 @@ pub struct ScannerConfig {
pub scan_mode: ScanMode,
/// Whether to enable data usage statistics collection
pub enable_data_usage_stats: bool,
/// Skip objects modified within this window to avoid racing with active writes
pub skip_recently_modified_within: Duration,
/// Throttle: after scanning this many objects, sleep for a short delay to reduce IO contention
pub throttle_every_n_objects: u32,
/// Throttle delay duration per throttle tick
pub throttle_delay: Duration,
}
impl Default for ScannerConfig {
fn default() -> Self {
Self {
scan_interval: Duration::from_secs(3600), // 1 hour
deep_scan_interval: Duration::from_secs(86400), // 1 day
max_concurrent_scans: 10,
scan_interval: Duration::from_secs(60), // 1 minute
deep_scan_interval: Duration::from_secs(3600), // 1 hour
max_concurrent_scans: 20,
enable_healing: true,
enable_metrics: true,
scan_mode: ScanMode::Normal,
enable_data_usage_stats: true,
skip_recently_modified_within: Duration::from_secs(600), // 10 minutes
throttle_every_n_objects: 200,
throttle_delay: Duration::from_millis(2),
}
}
}
@@ -1255,11 +1244,6 @@ impl Scanner {
let mut objects_scanned = 0u64;
let mut objects_with_issues = 0u64;
let mut object_metadata = HashMap::new();
// snapshot throttling/grace config
let cfg_snapshot = self.config.read().await.clone();
let throttle_n = cfg_snapshot.throttle_every_n_objects.max(1);
let throttle_delay = cfg_snapshot.throttle_delay;
let skip_recent = cfg_snapshot.skip_recently_modified_within;
// Process each object entry
while let Ok(Some(mut entry)) = reader.peek().await {
@@ -1270,33 +1254,11 @@ impl Scanner {
// Parse object metadata
if let Ok(file_meta) = entry.xl_meta() {
// Skip recently modified objects to avoid racing with active writes
if let Some(latest_mt) = file_meta.versions.iter().filter_map(|v| v.header.mod_time).max() {
let ts_nanos = latest_mt.unix_timestamp_nanos();
let latest_st = if ts_nanos >= 0 {
std::time::UNIX_EPOCH + Duration::from_nanos(ts_nanos as u64)
} else {
std::time::UNIX_EPOCH
};
if let Ok(elapsed) = SystemTime::now().duration_since(latest_st) {
if elapsed < skip_recent {
debug!(
"Skipping recently modified object {}/{} (elapsed {:?} < {:?})",
bucket, entry.name, elapsed, skip_recent
);
if (objects_scanned as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
}
}
if file_meta.versions.is_empty() {
objects_with_issues += 1;
warn!("Object {} has no versions", entry.name);
// 对象元数据损坏提交元数据heal任务
// 对象元数据损坏,提交元数据 heal 任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
@@ -1334,7 +1296,7 @@ impl Scanner {
objects_with_issues += 1;
warn!("Failed to parse metadata for object {}", entry.name);
// 对象元数据解析失败提交元数据heal任务
// 对象元数据解析失败,提交元数据 heal 任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
@@ -1357,12 +1319,6 @@ impl Scanner {
}
}
}
// lightweight throttling to reduce IO contention
if (objects_scanned as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
}
// Update metrics
@@ -1450,14 +1406,6 @@ impl Scanner {
// Step 2: Identify missing objects and perform EC verification
let mut objects_needing_heal = 0u64;
let mut objects_with_ec_issues = 0u64;
// snapshot config for gating and throttling
let cfg_snapshot = self.config.read().await.clone();
let skip_recent = cfg_snapshot.skip_recently_modified_within;
let throttle_n = cfg_snapshot.throttle_every_n_objects.max(1);
let throttle_delay = cfg_snapshot.throttle_delay;
let deep_mode = cfg_snapshot.scan_mode == ScanMode::Deep;
// cfg_snapshot is a plain value clone; no guard to explicitly drop here.
let mut iter_count: u64 = 0;
for (bucket, objects) in &all_objects {
// Skip internal RustFS system bucket to avoid lengthy checks on temporary/trash objects
@@ -1465,7 +1413,6 @@ impl Scanner {
continue;
}
for object_name in objects {
iter_count += 1;
let key = (bucket.clone(), object_name.clone());
let empty_vec = Vec::new();
let locations = object_locations.get(&key).unwrap_or(&empty_vec);
@@ -1497,51 +1444,8 @@ impl Scanner {
continue;
}
// Skip recently modified objects to avoid racing with writes
let is_recent = (|| {
for &disk_idx in locations {
if let Some(bucket_map) = all_disk_objects.get(disk_idx) {
if let Some(file_map) = bucket_map.get(bucket) {
if let Some(fm) = file_map.get(object_name) {
if let Some(mt) = fm.versions.iter().filter_map(|v| v.header.mod_time).max() {
let ts_nanos = mt.unix_timestamp_nanos();
let mt_st = if ts_nanos >= 0 {
std::time::UNIX_EPOCH + Duration::from_nanos(ts_nanos as u64)
} else {
std::time::UNIX_EPOCH
};
if let Ok(elapsed) = SystemTime::now().duration_since(mt_st) {
if elapsed < skip_recent {
return true;
}
}
}
}
}
}
}
false
})();
if is_recent {
debug!("Skipping missing-objects heal check for recently modified {}/{}", bucket, object_name);
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
// Only attempt missing-object heal checks in Deep scan mode
if !deep_mode {
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
// Check if object is missing from some disks
if !locations.is_empty() && locations.len() < disks.len() {
if locations.len() < disks.len() {
// Before submitting heal, confirm the object still exists logically.
let should_heal = if let Some(store) = rustfs_ecstore::new_object_layer_fn() {
match store.get_object_info(bucket, object_name, &Default::default()).await {
@@ -1569,10 +1473,6 @@ impl Scanner {
};
if !should_heal {
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
continue;
}
objects_needing_heal += 1;
@@ -1616,11 +1516,6 @@ impl Scanner {
warn!("Object integrity verification failed for object {}/{}: {}", bucket, object_name, e);
}
}
if (iter_count as u32) % throttle_n == 0 {
sleep(throttle_delay).await;
yield_now().await;
}
}
}

View File

@@ -1,3 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::heal::{
manager::{HealConfig, HealManager},
storage::{ECStoreHealStorage, HealStorageAPI},
@@ -108,17 +122,11 @@ async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage
/// Test helper: Create a test bucket
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
match (**ecstore).make_bucket(bucket_name, &Default::default()).await {
Ok(_) => info!("Created test bucket: {}", bucket_name),
Err(e) => {
// If the bucket already exists from a previous test run in the shared env, ignore.
if matches!(e, rustfs_ecstore::error::StorageError::BucketExists(_)) {
info!("Bucket already exists, continuing: {}", bucket_name);
} else {
panic!("Failed to create test bucket: {e:?}");
}
}
}
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object

View File

@@ -1,3 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig};
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
@@ -151,7 +165,6 @@ async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bo
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
#[ignore = "Please run it manually."]
async fn test_lifecycle_expiry_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;

View File

@@ -590,46 +590,7 @@ impl TransitionClient {
return false;
}
// AUTO
let host = match url.host_str() {
Some(h) => h,
None => return false,
};
// If endpoint is an IP address, do not use virtual host style
let is_ip = host.parse::<std::net::IpAddr>().is_ok();
if is_ip {
return false;
}
// Basic DNS bucket validation: lowercase letters, numbers, dot and hyphen; must start/end alnum
let is_dns_compatible = {
let bytes = bucket_name.as_bytes();
let start_end_ok = bucket_name
.chars()
.next()
.map(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
.unwrap_or(false)
&& bucket_name
.chars()
.last()
.map(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
.unwrap_or(false);
let middle_ok = bytes
.iter()
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || *b == b'-' || *b == b'.');
start_end_ok && middle_ok && bucket_name.len() >= 3 && bucket_name.len() <= 63
};
if !is_dns_compatible {
return false;
}
// When using TLS, avoid buckets with dots to prevent cert/SNI mismatch unless a wildcard cert is ensured.
if self.secure && bucket_name.contains('.') {
return false;
}
true
false
}
pub fn cred_context(&self) -> CredContext {
@@ -1035,56 +996,3 @@ impl tower::Service<Request<Body>> for SendRequest {
#[derive(Serialize, Deserialize)]
pub struct Document(pub String);
#[cfg(test)]
mod tests {
use super::*;
fn mk_client(endpoint: &str, secure: bool, lookup: BucketLookupType) -> TransitionClient {
let creds = Credentials::new(Static(Default::default()));
let opts = Options {
creds,
secure,
bucket_lookup: lookup,
..Default::default()
};
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { TransitionClient::new(endpoint, opts).await.unwrap() })
}
#[test]
fn test_is_virtual_host_auto_http_domain_dns_bucket() {
let cl = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupAuto);
assert!(cl.is_virtual_host_style_request(&cl.endpoint_url(), "test"));
}
#[test]
fn test_is_virtual_host_auto_http_ip() {
let cl = mk_client("127.0.0.1:9000", false, BucketLookupType::BucketLookupAuto);
assert!(!cl.is_virtual_host_style_request(&cl.endpoint_url(), "test"));
}
#[test]
fn test_is_virtual_host_auto_https_bucket_with_dot_disallowed() {
let cl = mk_client("s3.example.com:443", true, BucketLookupType::BucketLookupAuto);
assert!(!cl.is_virtual_host_style_request(&cl.endpoint_url(), "te.st"));
}
#[test]
fn test_is_virtual_host_dns_forced() {
let cl = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupDNS);
assert!(cl.is_virtual_host_style_request(&cl.endpoint_url(), "test"));
}
#[test]
fn test_target_url_vhost_and_path() {
let cl_v = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupDNS);
let url_v = cl_v.make_target_url("test", "obj.txt", "", true, &HashMap::new()).unwrap();
assert_eq!(url_v.as_str(), "http://test.s3.example.com:9000/obj.txt");
let cl_p = mk_client("s3.example.com:9000", false, BucketLookupType::BucketLookupPath);
let url_p = cl_p.make_target_url("test", "obj.txt", "", false, &HashMap::new()).unwrap();
assert_eq!(url_p.as_str(), "http://s3.example.com:9000/test/obj.txt");
}
}

View File

@@ -15,8 +15,6 @@
use std::sync::Arc;
use once_cell::sync::Lazy;
use std::thread;
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use crate::{client::LockClient, types::LockId};
@@ -27,43 +25,36 @@ struct UnlockJob {
clients: Vec<Arc<dyn LockClient>>, // cloned Arcs; cheap and shares state
}
// Global unlock runtime with background worker running on a dedicated thread-bound Tokio runtime.
// This avoids depending on the application's Tokio runtime lifetimes/cancellation scopes.
static UNLOCK_TX: Lazy<mpsc::Sender<UnlockJob>> = Lazy::new(|| {
#[derive(Debug)]
struct UnlockRuntime {
tx: mpsc::Sender<UnlockJob>,
}
// Global unlock runtime with background worker
static UNLOCK_RUNTIME: Lazy<UnlockRuntime> = Lazy::new(|| {
// Larger buffer to reduce contention during bursts
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
// Spawn a dedicated OS thread that owns its own Tokio runtime to process unlock jobs.
thread::Builder::new()
.name("rustfs-lock-unlocker".to_string())
.spawn(move || {
// A lightweight current-thread runtime is sufficient here.
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build Tokio runtime for background unlock jobs (possible causes: resource exhaustion, thread limit, Tokio misconfiguration)");
rt.block_on(async move {
while let Some(job) = rx.recv().await {
// Best-effort release across clients; try all, success if any succeeds
let mut any_ok = false;
let lock_id = job.lock_id.clone();
for client in job.clients.into_iter() {
if client.release(&lock_id).await.unwrap_or(false) {
any_ok = true;
}
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
} else {
tracing::debug!("LockGuard background released {}", lock_id);
}
// Spawn background worker when first used; assumes a Tokio runtime is available
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
// Best-effort release across clients; try all, success if any succeeds
let mut any_ok = false;
let lock_id = job.lock_id.clone();
for client in job.clients.into_iter() {
if client.release(&lock_id).await.unwrap_or(false) {
any_ok = true;
}
});
})
.expect("failed to spawn unlock worker thread");
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
} else {
tracing::debug!("LockGuard background released {}", lock_id);
}
}
});
tx
UnlockRuntime { tx }
});
/// A RAII guard that releases the lock asynchronously when dropped.
@@ -108,32 +99,22 @@ impl Drop for LockGuard {
};
// Try a non-blocking send to avoid panics in Drop
if let Err(err) = UNLOCK_TX.try_send(job) {
// Channel full or closed; best-effort fallback using a dedicated thread runtime
if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
// Channel full or closed; best-effort fallback: spawn a detached task
let lock_id = self.lock_id.clone();
let clients = self.clients.clone();
tracing::warn!(
"LockGuard channel send failed ({}), spawning fallback unlock thread for {}",
err,
lock_id.clone()
);
tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id);
// Use a short-lived background thread to execute the async releases on its own runtime.
let _ = thread::Builder::new()
.name("rustfs-lock-unlock-fallback".to_string())
.spawn(move || {
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build fallback unlock runtime in LockGuard::drop fallback thread. This indicates resource exhaustion or misconfiguration (e.g., thread limits, Tokio runtime issues). Remediation: check system resource limits, ensure sufficient threads are available, and verify Tokio runtime configuration.");
rt.block_on(async move {
let futures_iter = clients.into_iter().map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
});
let _ = futures::future::join_all(futures_iter).await;
});
// If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts.
let handle = tokio::spawn(async move {
let futures_iter = clients.into_iter().map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
});
let _ = futures::future::join_all(futures_iter).await;
});
// Explicitly drop the JoinHandle to acknowledge detaching the task.
drop(handle);
}
}
}

View File

@@ -26,42 +26,8 @@ use s3s::Body;
const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
const SIGN_V2_ALGORITHM: &str = "AWS";
fn encode_url2path(req: &request::Request<Body>, virtual_host: bool) -> String {
// In virtual-hosted-style, the canonical resource must include "/{bucket}" prefix
// extracted from the Host header: bucket.domain.tld -> "/bucket"
let mut path = req.uri().path().to_string();
if virtual_host {
let host = super::utils::get_host_addr(req);
// strip port if present
let host = match host.split_once(':') {
Some((h, _)) => h,
None => host.as_str(),
};
// If host has at least 3 labels (bucket + domain + tld), take first label as bucket
if let Some((bucket, _rest)) = host.split_once('.') {
if !bucket.is_empty() {
// avoid duplicating if path already starts with /bucket/
let expected_prefix = format!("/{bucket}");
if !path.starts_with(&expected_prefix) {
// Only prefix for bucket-level paths; ensure a single slash separation
if path == "/" {
path = expected_prefix;
} else {
path = format!(
"{}{}",
expected_prefix,
if path.starts_with('/') {
path.clone()
} else {
format!("/{path}")
}
);
}
}
}
}
}
path
fn encode_url2path(req: &request::Request<Body>, _virtual_host: bool) -> String {
req.uri().path().to_string()
}
pub fn pre_sign_v2(
@@ -273,39 +239,3 @@ fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Request<Body>
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use http::Request;
fn mk_req(host: &str, path: &str, query: &str) -> request::Request<Body> {
let uri = if query.is_empty() {
format!("http://{host}{path}")
} else {
format!("http://{host}{path}?{query}")
};
let mut req = Request::builder().uri(uri).method("GET").body(Body::empty()).unwrap();
// minimal headers used by signers
let h = req.headers_mut();
h.insert("Content-Md5", "".parse().unwrap());
h.insert("Content-Type", "".parse().unwrap());
h.insert("Date", "Thu, 21 Aug 2025 00:00:00 +0000".parse().unwrap());
h.insert("host", host.parse().unwrap());
req
}
#[test]
fn test_encode_url2path_vhost_prefixes_bucket() {
let req = mk_req("test.example.com", "/obj.txt", "");
let path = encode_url2path(&req, true);
assert_eq!(path, "/test/obj.txt");
}
#[test]
fn test_encode_url2path_path_style_unchanged() {
let req = mk_req("example.com", "/test/obj.txt", "uploads=");
let path = encode_url2path(&req, false);
assert_eq!(path, "/test/obj.txt");
}
}