mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
Fix/resolve pr 1710 (#1743)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -7882,6 +7882,8 @@ dependencies = [
|
||||
"rustfs-utils",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serial_test",
|
||||
"temp-env",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tower",
|
||||
|
||||
@@ -90,6 +90,8 @@ pub fn to_unformatted_disk_error(io_err: std::io::Error) -> std::io::Error {
|
||||
match io_err.kind() {
|
||||
std::io::ErrorKind::NotFound => DiskError::UnformattedDisk.into(),
|
||||
std::io::ErrorKind::PermissionDenied => DiskError::DiskAccessDenied.into(),
|
||||
std::io::ErrorKind::UnexpectedEof => DiskError::UnformattedDisk.into(),
|
||||
std::io::ErrorKind::InvalidData => DiskError::UnformattedDisk.into(),
|
||||
std::io::ErrorKind::Other => match io_err.downcast::<DiskError>() {
|
||||
Ok(err) => match err {
|
||||
DiskError::FileNotFound => DiskError::UnformattedDisk.into(),
|
||||
@@ -97,11 +99,11 @@ pub fn to_unformatted_disk_error(io_err: std::io::Error) -> std::io::Error {
|
||||
DiskError::VolumeNotFound => DiskError::UnformattedDisk.into(),
|
||||
DiskError::FileAccessDenied => DiskError::DiskAccessDenied.into(),
|
||||
DiskError::DiskAccessDenied => DiskError::DiskAccessDenied.into(),
|
||||
_ => DiskError::CorruptedBackend.into(),
|
||||
_ => DiskError::UnformattedDisk.into(),
|
||||
},
|
||||
Err(_err) => DiskError::CorruptedBackend.into(),
|
||||
Err(_err) => DiskError::UnformattedDisk.into(),
|
||||
},
|
||||
_ => DiskError::CorruptedBackend.into(),
|
||||
_ => DiskError::UnformattedDisk.into(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,18 +371,18 @@ mod tests {
|
||||
let result = to_unformatted_disk_error(io_error);
|
||||
assert!(contains_disk_error(result, DiskError::DiskAccessDenied));
|
||||
|
||||
// Test Other error kind with other DiskError -> CorruptedBackend
|
||||
// Test Other error kind with other DiskError -> UnformattedDisk
|
||||
let io_error = create_io_error_with_disk_error(DiskError::DiskFull);
|
||||
let result = to_unformatted_disk_error(io_error);
|
||||
assert!(contains_disk_error(result, DiskError::CorruptedBackend));
|
||||
assert!(contains_disk_error(result, DiskError::UnformattedDisk));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_unformatted_disk_error_recursive_behavior() {
|
||||
// Test with non-Other error kind that should be handled without infinite recursion
|
||||
let result = to_unformatted_disk_error(create_io_error(ErrorKind::Interrupted));
|
||||
// This should not cause infinite recursion and should produce CorruptedBackend
|
||||
assert!(contains_disk_error(result, DiskError::CorruptedBackend));
|
||||
// This should not cause infinite recursion and should produce UnformattedDisk
|
||||
assert!(contains_disk_error(result, DiskError::UnformattedDisk));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -339,7 +339,9 @@ impl LocalDisk {
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn check_format_json(&self) -> Result<Metadata> {
|
||||
let md = std::fs::metadata(&self.format_path).map_err(to_unformatted_disk_error)?;
|
||||
let md = tokio::fs::metadata(&self.format_path)
|
||||
.await
|
||||
.map_err(to_unformatted_disk_error)?;
|
||||
Ok(md)
|
||||
}
|
||||
async fn make_meta_volumes(&self) -> Result<()> {
|
||||
@@ -1365,36 +1367,43 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
|
||||
let format_info = {
|
||||
let (id, last_check, file_info) = {
|
||||
let format_info = self.format_info.read().await;
|
||||
format_info.clone()
|
||||
(format_info.id, format_info.last_check, format_info.file_info.clone())
|
||||
};
|
||||
|
||||
let id = format_info.id;
|
||||
|
||||
// if format_info.last_check_valid() {
|
||||
// return Ok(id);
|
||||
// }
|
||||
|
||||
if format_info.file_info.is_some() && id.is_some() {
|
||||
// check last check time
|
||||
if let Some(last_check) = format_info.last_check
|
||||
&& last_check.unix_timestamp() + 1 < OffsetDateTime::now_utc().unix_timestamp()
|
||||
{
|
||||
return Ok(id);
|
||||
}
|
||||
// Check if we can use cached value without doing any I/O
|
||||
// If we checked recently (within 1 second) and have valid cache, return immediately
|
||||
if let (Some(id), Some(last_check)) = (id, last_check)
|
||||
&& last_check.unix_timestamp() + 1 >= OffsetDateTime::now_utc().unix_timestamp()
|
||||
{
|
||||
return Ok(Some(id));
|
||||
}
|
||||
|
||||
let file_meta = self.check_format_json().await?;
|
||||
// Get current file metadata (async I/O)
|
||||
let file_meta = match self.check_format_json().await {
|
||||
Ok(meta) => meta,
|
||||
Err(e) => {
|
||||
// file does not exist or cannot be accessed, clear cached format info
|
||||
if matches!(e, DiskError::UnformattedDisk | DiskError::DiskNotFound) {
|
||||
let mut format_info = self.format_info.write().await;
|
||||
format_info.id = None;
|
||||
format_info.file_info = None;
|
||||
format_info.data = Bytes::new();
|
||||
format_info.last_check = None;
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(file_info) = &format_info.file_info
|
||||
&& super::fs::same_file(&file_meta, file_info)
|
||||
// Validate cache against current file metadata
|
||||
if let (Some(cached_file_info), Some(id)) = (&file_info, id)
|
||||
&& super::fs::same_file(&file_meta, cached_file_info)
|
||||
{
|
||||
// Cache is still valid, update last_check and return
|
||||
let mut format_info = self.format_info.write().await;
|
||||
format_info.last_check = Some(OffsetDateTime::now_utc());
|
||||
drop(format_info);
|
||||
|
||||
return Ok(id);
|
||||
return Ok(Some(id));
|
||||
}
|
||||
|
||||
debug!("get_disk_id: read format.json");
|
||||
@@ -1403,7 +1412,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
let fm = FormatV3::try_from(b.as_slice()).map_err(|e| {
|
||||
warn!("decode format.json err {:?}", e);
|
||||
DiskError::CorruptedBackend
|
||||
DiskError::UnformattedDisk
|
||||
})?;
|
||||
|
||||
let (m, n) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
|
||||
|
||||
@@ -492,11 +492,19 @@ impl HealManager {
|
||||
for (_, disk_opt) in GLOBAL_LOCAL_DISK_MAP.read().await.iter() {
|
||||
if let Some(disk) = disk_opt {
|
||||
// detect unformatted disk via get_disk_id()
|
||||
if let Err(err) = disk.get_disk_id().await
|
||||
&& err == DiskError::UnformattedDisk {
|
||||
match disk.get_disk_id().await {
|
||||
Err(DiskError::UnformattedDisk) => {
|
||||
info!("start_auto_disk_scanner: Detected unformatted disk: {}", disk.endpoint());
|
||||
endpoints.push(disk.endpoint());
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
// Log other errors for debugging
|
||||
tracing::warn!("start_auto_disk_scanner: Disk {} check failed: {:?}", disk.endpoint(), e);
|
||||
}
|
||||
Ok(_) => {
|
||||
// Disk is formatted, no action needed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::{Error, Result};
|
||||
use rustfs_ecstore::disk::error::DiskError;
|
||||
use rustfs_ecstore::disk::{BUCKET_META_PREFIX, DiskAPI, DiskStore, RUSTFS_META_BUCKET};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
@@ -180,7 +181,9 @@ impl ResumeManager {
|
||||
};
|
||||
|
||||
// save initial state
|
||||
manager.save_state().await?;
|
||||
if let Err(e) = manager.save_state().await {
|
||||
warn!("Failed to save initial resume state: {}", e);
|
||||
}
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
@@ -297,12 +300,15 @@ impl ResumeManager {
|
||||
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", state.task_id, RESUME_STATE_FILE));
|
||||
|
||||
let path_str = path_to_str(&file_path)?;
|
||||
self.disk
|
||||
.write_all(RUSTFS_META_BUCKET, path_str, state_data.into())
|
||||
.await
|
||||
.map_err(|e| Error::TaskExecutionFailed {
|
||||
if let Err(e) = self.disk.write_all(RUSTFS_META_BUCKET, path_str, state_data.into()).await {
|
||||
if matches!(e, DiskError::UnformattedDisk) {
|
||||
warn!("Cannot save resume state: unformatted disk");
|
||||
return Ok(());
|
||||
}
|
||||
return Err(Error::TaskExecutionFailed {
|
||||
message: format!("Failed to save resume state: {e}"),
|
||||
})?;
|
||||
});
|
||||
}
|
||||
|
||||
debug!("Saved resume state for task: {}", state.task_id);
|
||||
Ok(())
|
||||
@@ -395,7 +401,9 @@ impl CheckpointManager {
|
||||
};
|
||||
|
||||
// save initial checkpoint
|
||||
manager.save_checkpoint().await?;
|
||||
if let Err(e) = manager.save_checkpoint().await {
|
||||
warn!("Failed to save initial checkpoint: {}", e);
|
||||
}
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use rustfs_heal::heal::{
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
@@ -49,17 +49,6 @@ pub fn init_tracing() {
|
||||
});
|
||||
}
|
||||
|
||||
async fn wait_for_path_exists(path: &Path, timeout: Duration) -> bool {
|
||||
let deadline = std::time::Instant::now() + timeout;
|
||||
while std::time::Instant::now() < deadline {
|
||||
if path.exists() {
|
||||
return true;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
path.exists()
|
||||
}
|
||||
|
||||
/// Test helper: Create test environment with ECStore
|
||||
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>) {
|
||||
init_tracing();
|
||||
@@ -340,11 +329,25 @@ mod serial_tests {
|
||||
let (_result, error) = heal_storage.heal_format(false).await.expect("Failed to heal format");
|
||||
assert!(error.is_none(), "Heal format returned error: {error:?}");
|
||||
|
||||
// Wait for task completion
|
||||
let restored = wait_for_path_exists(&format_path, Duration::from_secs(20)).await;
|
||||
// ─── 2️⃣ wait for format.json to be restored with polling + timeout ───────
|
||||
// The minimal scanner interval is clamped to 10s in manager.rs, so we set timeout to 20s
|
||||
let timeout_duration = Duration::from_secs(20);
|
||||
let poll_interval = Duration::from_millis(200);
|
||||
|
||||
// ─── 2️⃣ verify format.json is restored ───────
|
||||
assert!(restored, "format.json does not exist on disk after heal");
|
||||
let result = tokio::time::timeout(timeout_duration, async {
|
||||
loop {
|
||||
if format_path.exists() {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok(), "format.json was not restored within timeout period");
|
||||
|
||||
// ─── 3️⃣ verify format.json is restored ───────
|
||||
assert!(format_path.exists(), "format.json does not exist on disk after heal");
|
||||
|
||||
info!("Heal format basic test passed");
|
||||
}
|
||||
@@ -362,37 +365,51 @@ mod serial_tests {
|
||||
create_test_bucket(&ecstore, bucket_name).await;
|
||||
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
|
||||
|
||||
let obj_dir = disk_paths[0].join(bucket_name).join(object_name);
|
||||
let target_part = WalkDir::new(&obj_dir)
|
||||
.min_depth(2)
|
||||
.max_depth(2)
|
||||
.into_iter()
|
||||
.filter_map(Result::ok)
|
||||
.find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false))
|
||||
.map(|e| e.into_path())
|
||||
.expect("Failed to locate part file to delete");
|
||||
|
||||
// ─── 1️⃣ delete format.json on one disk ──────────────
|
||||
let format_path = disk_paths[0].join(".rustfs.sys").join("format.json");
|
||||
std::fs::remove_dir_all(&disk_paths[0]).expect("failed to delete all contents under disk_paths[0]");
|
||||
std::fs::create_dir_all(&disk_paths[0]).expect("failed to recreate disk_paths[0] directory");
|
||||
println!("✅ Deleted format.json on disk: {:?}", disk_paths[0]);
|
||||
|
||||
let (_result, error) = heal_storage.heal_format(false).await.expect("Failed to heal format");
|
||||
assert!(error.is_none(), "Heal format returned error: {error:?}");
|
||||
|
||||
// Wait for task completion
|
||||
let restored = wait_for_path_exists(&format_path, Duration::from_secs(20)).await;
|
||||
|
||||
// ─── 2️⃣ verify format.json is restored ───────
|
||||
assert!(restored, "format.json does not exist on disk after heal");
|
||||
// ─── 3 verify each part file is restored ───────
|
||||
let heal_opts = HealOpts {
|
||||
recursive: false,
|
||||
dry_run: false,
|
||||
remove: false,
|
||||
recreate: true,
|
||||
scan_mode: HealScanMode::Normal,
|
||||
update_parity: true,
|
||||
no_lock: false,
|
||||
pool: None,
|
||||
set: None,
|
||||
// Create heal manager with faster interval
|
||||
let cfg = HealConfig {
|
||||
heal_interval: Duration::from_secs(1),
|
||||
..Default::default()
|
||||
};
|
||||
let (_result, error) = heal_storage
|
||||
.heal_object(bucket_name, object_name, None, &heal_opts)
|
||||
.await
|
||||
.expect("Failed to heal object");
|
||||
assert!(error.is_none(), "Heal object returned error: {error:?}");
|
||||
let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
// ─── 2️⃣ wait for format.json and part file to be restored with polling + timeout ───────
|
||||
// The minimal scanner interval is clamped to 10s in manager.rs, so we set timeout to 20s
|
||||
let timeout_duration = Duration::from_secs(20);
|
||||
let poll_interval = Duration::from_millis(200);
|
||||
|
||||
let result = tokio::time::timeout(timeout_duration, async {
|
||||
loop {
|
||||
if format_path.exists() && target_part.exists() {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok(), "format.json or part file was not restored within timeout period");
|
||||
|
||||
// ─── 3️⃣ verify format.json is restored ───────
|
||||
assert!(format_path.exists(), "format.json does not exist on disk after heal");
|
||||
// ─── 4️⃣ verify each part file is restored ───────
|
||||
assert!(target_part.exists());
|
||||
|
||||
// Verify object metadata is accessible
|
||||
let obj_info = ecstore
|
||||
|
||||
@@ -744,7 +744,7 @@ impl<T: Store> IamSys<T> {
|
||||
}
|
||||
|
||||
pub async fn is_allowed_sts(&self, args: &Args<'_>, parent_user: &str) -> bool {
|
||||
let is_owner = parent_user == get_global_action_cred().unwrap().access_key;
|
||||
let is_owner = matches!(get_global_action_cred(), Some(cred) if cred.access_key == parent_user);
|
||||
let role_arn = args.get_role_arn();
|
||||
let policies = {
|
||||
if is_owner {
|
||||
@@ -794,7 +794,7 @@ impl<T: Store> IamSys<T> {
|
||||
return false;
|
||||
}
|
||||
|
||||
let is_owner = parent_user == get_global_action_cred().unwrap().access_key;
|
||||
let is_owner = matches!(get_global_action_cred(), Some(cred) if cred.access_key == parent_user);
|
||||
|
||||
let role_arn = args.get_role_arn();
|
||||
|
||||
|
||||
@@ -45,6 +45,8 @@ regex = { workspace = true }
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full", "test-util"] }
|
||||
tower = { workspace = true, features = ["util"] }
|
||||
serial_test = { workspace = true }
|
||||
temp-env = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -13,75 +13,55 @@
|
||||
// limitations under the License.
|
||||
|
||||
use rustfs_config::{
|
||||
DEFAULT_TRUSTED_PROXY_PROXIES, ENV_TRUSTED_PROXY_ENABLE_RFC7239, ENV_TRUSTED_PROXY_MAX_HOPS, ENV_TRUSTED_PROXY_PROXIES,
|
||||
ENV_TRUSTED_PROXY_VALIDATION_MODE,
|
||||
DEFAULT_TRUSTED_PROXY_PROXIES, ENV_TRUSTED_PROXY_MAX_HOPS, ENV_TRUSTED_PROXY_PROXIES, ENV_TRUSTED_PROXY_VALIDATION_MODE,
|
||||
};
|
||||
use rustfs_trusted_proxies::{ConfigLoader, TrustedProxy, TrustedProxyConfig, ValidationMode};
|
||||
use serial_test::serial;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Mutex;
|
||||
|
||||
static ENV_MUTEX: Mutex<()> = Mutex::new(());
|
||||
|
||||
#[test]
|
||||
#[allow(unsafe_code)]
|
||||
#[serial]
|
||||
fn test_config_loader_default() {
|
||||
let _guard = ENV_MUTEX.lock().unwrap();
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_PROXIES);
|
||||
}
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_VALIDATION_MODE);
|
||||
}
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_MAX_HOPS);
|
||||
}
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_ENABLE_RFC7239);
|
||||
}
|
||||
let config = ConfigLoader::from_env_or_default();
|
||||
assert_eq!(config.server_addr.port(), 9000);
|
||||
assert!(!config.proxy.proxies.is_empty());
|
||||
assert_eq!(config.proxy.validation_mode, ValidationMode::HopByHop);
|
||||
assert!(config.proxy.enable_rfc7239);
|
||||
assert_eq!(config.proxy.max_hops, 10);
|
||||
// Clean up environment variables explicitly to ensure clean state
|
||||
temp_env::with_vars_unset(
|
||||
vec![
|
||||
ENV_TRUSTED_PROXY_PROXIES,
|
||||
ENV_TRUSTED_PROXY_VALIDATION_MODE,
|
||||
ENV_TRUSTED_PROXY_MAX_HOPS,
|
||||
],
|
||||
|| {
|
||||
let config = ConfigLoader::from_env_or_default();
|
||||
assert_eq!(config.server_addr.port(), 9000);
|
||||
assert!(!config.proxy.proxies.is_empty());
|
||||
assert_eq!(config.proxy.validation_mode, ValidationMode::HopByHop);
|
||||
assert!(config.proxy.enable_rfc7239);
|
||||
assert_eq!(config.proxy.max_hops, 10);
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(unsafe_code)]
|
||||
#[serial]
|
||||
fn test_config_loader_env_vars() {
|
||||
let _guard = ENV_MUTEX.lock().unwrap();
|
||||
unsafe {
|
||||
std::env::set_var(ENV_TRUSTED_PROXY_PROXIES, "192.168.1.0/24,10.0.0.0/8");
|
||||
}
|
||||
unsafe {
|
||||
std::env::set_var(ENV_TRUSTED_PROXY_VALIDATION_MODE, "strict");
|
||||
}
|
||||
unsafe {
|
||||
std::env::set_var(ENV_TRUSTED_PROXY_MAX_HOPS, "5");
|
||||
}
|
||||
// Use temp_env to ensure environment variables are cleaned up even if test fails
|
||||
temp_env::with_vars(
|
||||
vec![
|
||||
(ENV_TRUSTED_PROXY_PROXIES, Some("192.168.1.0/24,10.0.0.0/8")),
|
||||
(ENV_TRUSTED_PROXY_VALIDATION_MODE, Some("strict")),
|
||||
(ENV_TRUSTED_PROXY_MAX_HOPS, Some("5")),
|
||||
],
|
||||
|| {
|
||||
let config = ConfigLoader::from_env();
|
||||
|
||||
let config = ConfigLoader::from_env();
|
||||
|
||||
if let Ok(config) = config {
|
||||
assert_eq!(config.server_addr.port(), 9000);
|
||||
assert_eq!(config.proxy.validation_mode, ValidationMode::Strict);
|
||||
assert_eq!(config.proxy.max_hops, 5);
|
||||
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_PROXIES);
|
||||
}
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_VALIDATION_MODE);
|
||||
}
|
||||
unsafe {
|
||||
std::env::remove_var(ENV_TRUSTED_PROXY_MAX_HOPS);
|
||||
}
|
||||
unsafe {
|
||||
std::env::remove_var("SERVER_PORT");
|
||||
}
|
||||
} else {
|
||||
panic!("Failed to load configuration from environment variables");
|
||||
}
|
||||
if let Ok(config) = config {
|
||||
assert_eq!(config.server_addr.port(), 9000);
|
||||
assert_eq!(config.proxy.validation_mode, ValidationMode::Strict);
|
||||
assert_eq!(config.proxy.max_hops, 5);
|
||||
} else {
|
||||
panic!("Failed to load configuration from environment variables");
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -375,6 +375,7 @@ static CONCURRENCY_MANAGER: LazyLock<ConcurrencyManager> = LazyLock::new(Concurr
|
||||
pub struct GetObjectGuard {
|
||||
/// Track when the request started for metrics collection.
|
||||
/// Used to calculate end-to-end request latency in the Drop implementation.
|
||||
#[allow(dead_code)]
|
||||
start_time: Instant,
|
||||
/// Reference to the concurrency manager for cleanup operations.
|
||||
/// The underscore prefix indicates this is used implicitly (for type safety).
|
||||
@@ -399,6 +400,7 @@ impl GetObjectGuard {
|
||||
///
|
||||
/// Useful for logging or metrics collection during request processing.
|
||||
/// Called automatically in the Drop implementation for duration tracking.
|
||||
#[allow(dead_code)]
|
||||
pub fn elapsed(&self) -> Duration {
|
||||
self.start_time.elapsed()
|
||||
}
|
||||
@@ -441,8 +443,11 @@ impl Drop for GetObjectGuard {
|
||||
}
|
||||
|
||||
// Record Prometheus metrics for monitoring and alerting
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
// We strictly disable metrics recording in unit tests (cfg(test)) to prevent
|
||||
// Thread Local Storage (TLS) destruction order issues causing panics.
|
||||
// In production (not(test)), we use a panic check as a safety guard.
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
if !std::thread::panicking() {
|
||||
use metrics::{counter, histogram};
|
||||
// Track total completed requests for throughput calculation
|
||||
counter!("rustfs.get.object.requests.completed").increment(1);
|
||||
@@ -475,7 +480,7 @@ pub fn get_concurrency_aware_buffer_size(file_size: i64, base_buffer_size: usize
|
||||
let concurrent_requests = ACTIVE_GET_REQUESTS.load(Ordering::Relaxed);
|
||||
|
||||
// Record concurrent request metrics
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::gauge;
|
||||
gauge!("rustfs.concurrent.get.requests").set(concurrent_requests as f64);
|
||||
@@ -885,7 +890,7 @@ impl HotObjectCache {
|
||||
cached.access_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs.object.cache.hits").increment(1);
|
||||
@@ -897,7 +902,7 @@ impl HotObjectCache {
|
||||
None => {
|
||||
self.miss_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs.object.cache.misses").increment(1);
|
||||
@@ -929,7 +934,7 @@ impl HotObjectCache {
|
||||
|
||||
self.cache.insert(key.clone(), cached_obj).await;
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::{counter, gauge};
|
||||
counter!("rustfs.object.cache.insertions").increment(1);
|
||||
@@ -1086,7 +1091,7 @@ impl HotObjectCache {
|
||||
cached.data.increment_access();
|
||||
self.hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_response_cache_hits").increment(1);
|
||||
@@ -1098,7 +1103,7 @@ impl HotObjectCache {
|
||||
None => {
|
||||
self.miss_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_response_cache_misses").increment(1);
|
||||
@@ -1135,7 +1140,7 @@ impl HotObjectCache {
|
||||
|
||||
self.response_cache.insert(key.clone(), cached_internal).await;
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::{counter, gauge};
|
||||
counter!("rustfs_object_response_cache_insertions").increment(1);
|
||||
@@ -1158,7 +1163,7 @@ impl HotObjectCache {
|
||||
self.cache.invalidate(key).await;
|
||||
self.response_cache.invalidate(key).await;
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_cache_invalidations").increment(1);
|
||||
@@ -1340,7 +1345,7 @@ impl ConcurrencyManager {
|
||||
}
|
||||
|
||||
// Record histogram metric for Prometheus
|
||||
#[cfg(feature = "metrics")]
|
||||
#[cfg(all(feature = "metrics", not(test)))]
|
||||
{
|
||||
use metrics::histogram;
|
||||
histogram!("rustfs.disk.permit.wait.duration.seconds").record(wait_duration.as_secs_f64());
|
||||
@@ -1675,8 +1680,10 @@ pub(crate) fn reset_active_get_requests() {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serial_test::serial;
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_concurrent_request_tracking() {
|
||||
reset_active_get_requests();
|
||||
assert_eq!(GetObjectGuard::concurrent_requests(), 0);
|
||||
@@ -1722,6 +1729,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_hot_object_cache() {
|
||||
let cache = HotObjectCache::new();
|
||||
let test_data = vec![1u8; 1024];
|
||||
@@ -1738,6 +1746,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_cache_eviction() {
|
||||
let cache = HotObjectCache::new();
|
||||
|
||||
@@ -1757,6 +1766,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_cache_reject_large_objects() {
|
||||
let cache = HotObjectCache::new();
|
||||
let large_data = vec![0u8; 11 * MI_B]; // Larger than max_object_size
|
||||
@@ -1777,6 +1787,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_disk_read_permits() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
@@ -1814,6 +1825,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_hot_keys_tracking() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
@@ -1834,6 +1846,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_batch_operations() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
@@ -1851,6 +1864,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_cache_clear() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
@@ -1868,6 +1882,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_warm_cache() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
|
||||
@@ -93,6 +93,7 @@ mod tests {
|
||||
CachedGetObject, ConcurrencyManager, GetObjectGuard, get_advanced_buffer_size, get_concurrency_aware_buffer_size,
|
||||
};
|
||||
use rustfs_config::{KI_B, MI_B};
|
||||
use serial_test::serial;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::{Instant, sleep};
|
||||
@@ -117,6 +118,7 @@ mod tests {
|
||||
/// uses `ACTIVE_GET_REQUESTS` to determine optimal buffer sizes. A leaked
|
||||
/// counter would cause permanently reduced buffer sizes, degrading performance.
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_concurrent_request_tracking() {
|
||||
// Start with current baseline (may not be zero if other tests are running)
|
||||
let initial = GetObjectGuard::concurrent_requests();
|
||||
@@ -191,6 +193,7 @@ mod tests {
|
||||
/// ACTIVE_GET_REQUESTS is a global atomic counter. The test uses widened
|
||||
/// tolerances to account for this.
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_adaptive_buffer_sizing() {
|
||||
let file_size = 32 * MI_B as i64; // 32MB file (matches issue #911 test case)
|
||||
let base_buffer = 256 * KI_B; // 256KB base buffer (typical for S3-like workloads)
|
||||
@@ -527,6 +530,7 @@ mod tests {
|
||||
|
||||
/// Test advanced buffer sizing with file patterns
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_advanced_buffer_sizing() {
|
||||
crate::storage::concurrency::reset_active_get_requests();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user