From 9b029d18b2534350a34ca386062707fd94a9d3fb Mon Sep 17 00:00:00 2001 From: guojidan <63799833+guojidan@users.noreply.github.com> Date: Fri, 26 Sep 2025 11:21:53 +0800 Subject: [PATCH] feat(lock): enhance lock management with timeout and ownership tracking (#589) - Add lock timeout support and track acquisition time in lock state - Improve lock conflict handling with detailed error messages - Optimize lock reuse when already held by same owner - Refactor lock state to store owner info and timeout duration - Update all lock operations to handle new state structure Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/ecstore/src/set_disk.rs | 155 ++++++++++++++++------- crates/lock/src/fast_lock/object_pool.rs | 16 ++- crates/lock/src/fast_lock/shard.rs | 26 ++-- crates/lock/src/fast_lock/state.rs | 118 +++++++++++------ crates/utils/src/retry.rs | 7 +- 5 files changed, 223 insertions(+), 99 deletions(-) diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index d963a208..4674906f 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -71,7 +71,7 @@ use rustfs_filemeta::{ headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, }; -use rustfs_lock::NamespaceLockManager; +use rustfs_lock::fast_lock::types::LockResult; use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem}; use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader}; use rustfs_utils::{ @@ -147,6 +147,21 @@ impl SetDisks { set_endpoints, }) } + fn format_lock_error(&self, bucket: &str, object: &str, mode: &str, err: &LockResult) -> String { + match err { + LockResult::Timeout => { + format!("{mode} lock acquisition timed out on {bucket}/{object} (owner={})", self.locker_owner) + } + LockResult::Conflict { + current_owner, + current_mode, + } => format!( + "{mode} lock conflicted on {bucket}/{object}: held by {current_owner} as {:?}", + current_mode + ), + LockResult::Acquired => format!("unexpected lock state while acquiring {mode} lock on {bucket}/{object}"), + } + } async fn get_disks_internal(&self) -> Vec> { let rl = self.disks.read().await; @@ -2461,25 +2476,38 @@ impl SetDisks { // Check if lock is already held let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object); + let mut reuse_existing_lock = false; if let Some(lock_info) = self.fast_lock_manager.get_lock_info(&key) { - warn!("Lock already exists for object {}: {:?}", object, lock_info); + if lock_info.owner.as_ref() == self.locker_owner.as_str() + && matches!(lock_info.mode, rustfs_lock::fast_lock::types::LockMode::Exclusive) + { + reuse_existing_lock = true; + debug!("Reusing existing exclusive lock for object {} held by {}", object, self.locker_owner); + } else { + warn!("Lock already exists for object {}: {:?}", object, lock_info); + } } else { info!("No existing lock found for object {}", object); } - let start_time = std::time::Instant::now(); - let lock_result = self - .fast_lock_manager - .acquire_write_lock(bucket, object, self.locker_owner.as_str()) - .await - .map_err(|e| { - let elapsed = start_time.elapsed(); - error!("Failed to acquire write lock for heal operation after {:?}: {:?}", elapsed, e); - DiskError::other(format!("Failed to acquire write lock for heal operation: {e:?}")) - })?; - let elapsed = start_time.elapsed(); - info!("Successfully acquired write lock for object: {} in {:?}", object, elapsed); - Some(lock_result) + if reuse_existing_lock { + None + } else { + let start_time = std::time::Instant::now(); + let lock_result = self + .fast_lock_manager + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) + .await + .map_err(|e| { + let elapsed = start_time.elapsed(); + let message = self.format_lock_error(bucket, object, "write", &e); + error!("Failed to acquire write lock for heal operation after {:?}: {}", elapsed, message); + DiskError::other(message) + })?; + let elapsed = start_time.elapsed(); + info!("Successfully acquired write lock for object: {} in {:?}", object, elapsed); + Some(lock_result) + } } else { info!("Skipping lock acquisition (no_lock=true)"); None @@ -3079,19 +3107,14 @@ impl SetDisks { } } - async fn heal_object_dir( + /// Heal directory metadata assuming caller already holds the write lock for `(bucket, object)`. + async fn heal_object_dir_locked( &self, bucket: &str, object: &str, dry_run: bool, remove: bool, ) -> Result<(HealResultItem, Option)> { - let _write_lock_guard = self - .fast_lock_manager - .acquire_write_lock("", object, self.locker_owner.as_str()) - .await - .map_err(|e| DiskError::other(format!("Failed to acquire write lock for heal directory operation: {e:?}")))?; - let disks = { let disks = self.disks.read().await; disks.clone() @@ -3186,6 +3209,27 @@ impl SetDisks { Ok((result, None)) } + #[allow(dead_code)] + /// Heal directory metadata after acquiring the necessary write lock. + async fn heal_object_dir( + &self, + bucket: &str, + object: &str, + dry_run: bool, + remove: bool, + ) -> Result<(HealResultItem, Option)> { + let _write_lock_guard = self + .fast_lock_manager + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) + .await + .map_err(|e| { + let message = self.format_lock_error(bucket, object, "write", &e); + DiskError::other(message) + })?; + + self.heal_object_dir_locked(bucket, object, dry_run, remove).await + } + async fn default_heal_result( &self, lfi: FileInfo, @@ -3450,9 +3494,9 @@ impl ObjectIO for SetDisks { let _read_lock_guard = if !opts.no_lock { Some( self.fast_lock_manager - .acquire_read_lock("", object, self.locker_owner.as_str()) + .acquire_read_lock(bucket, object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?, + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "read", &e)))?, ) } else { None @@ -3539,9 +3583,9 @@ impl ObjectIO for SetDisks { let _object_lock_guard = if !opts.no_lock { Some( self.fast_lock_manager - .acquire_write_lock("", object, self.locker_owner.as_str()) + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?, + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?, ) } else { None @@ -3835,9 +3879,9 @@ impl StorageAPI for SetDisks { // Guard lock for source object metadata update let _lock_guard = self .fast_lock_manager - .acquire_write_lock("", src_object, self.locker_owner.as_str()) + .acquire_write_lock(src_bucket, src_object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?; + .map_err(|e| Error::other(self.format_lock_error(src_bucket, src_object, "write", &e)))?; let disks = self.get_disks_internal().await; @@ -3935,9 +3979,9 @@ impl StorageAPI for SetDisks { // Guard lock for single object delete-version let _lock_guard = self .fast_lock_manager - .acquire_write_lock("", object, self.locker_owner.as_str()) + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?; + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?; let disks = self.get_disks(0, 0).await?; let write_quorum = disks.len() / 2 + 1; @@ -4007,17 +4051,18 @@ impl StorageAPI for SetDisks { for object_name in unique_objects { match self .fast_lock_manager - .acquire_write_lock("", object_name.as_str(), self.locker_owner.as_str()) + .acquire_write_lock(bucket, object_name.as_str(), self.locker_owner.as_str()) .await { Ok(guard) => { _guards.insert(object_name, guard); } - Err(_) => { + Err(err) => { + let message = self.format_lock_error(bucket, object_name.as_str(), "write", &err); // Mark all operations on this object as failed for (i, dobj) in objects.iter().enumerate() { if dobj.object_name == object_name { - del_errs[i] = Some(Error::other("can not get lock. please retry")); + del_errs[i] = Some(Error::other(message.clone())); } } } @@ -4141,9 +4186,9 @@ impl StorageAPI for SetDisks { let _lock_guard = if !opts.delete_prefix { Some( self.fast_lock_manager - .acquire_write_lock("", object, self.locker_owner.as_str()) + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?, + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?, ) } else { None @@ -4329,9 +4374,9 @@ impl StorageAPI for SetDisks { let _read_lock_guard = if !opts.no_lock { Some( self.fast_lock_manager - .acquire_read_lock("", object, self.locker_owner.as_str()) + .acquire_read_lock(bucket, object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?, + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "read", &e)))?, ) } else { None @@ -4371,9 +4416,9 @@ impl StorageAPI for SetDisks { let _lock_guard = if !opts.no_lock { Some( self.fast_lock_manager - .acquire_write_lock("", object, self.locker_owner.as_str()) + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) .await - .map_err(|_| Error::other("can not get lock. please retry".to_string()))?, + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?, ) } else { None @@ -5640,18 +5685,36 @@ impl StorageAPI for SetDisks { opts: &HealOpts, ) -> Result<(HealResultItem, Option)> { let _write_lock_guard = if !opts.no_lock { - Some( - self.fast_lock_manager - .acquire_write_lock("", object, self.locker_owner.as_str()) - .await - .map_err(|e| Error::other(format!("Failed to acquire write lock for heal operation: {e:?}")))?, - ) + let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object); + let mut skip_lock = false; + if let Some(lock_info) = self.fast_lock_manager.get_lock_info(&key) { + if lock_info.owner.as_ref() == self.locker_owner.as_str() + && matches!(lock_info.mode, rustfs_lock::fast_lock::types::LockMode::Exclusive) + { + debug!( + "Reusing existing exclusive lock for heal operation on {}/{} held by {}", + bucket, object, self.locker_owner + ); + skip_lock = true; + } + } + + if skip_lock { + None + } else { + Some( + self.fast_lock_manager + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) + .await + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?, + ) + } } else { None }; if has_suffix(object, SLASH_SEPARATOR) { - let (result, err) = self.heal_object_dir(bucket, object, opts.dry_run, opts.remove).await?; + let (result, err) = self.heal_object_dir_locked(bucket, object, opts.dry_run, opts.remove).await?; return Ok((result, err.map(|e| e.into()))); } diff --git a/crates/lock/src/fast_lock/object_pool.rs b/crates/lock/src/fast_lock/object_pool.rs index 9040a127..779061e1 100644 --- a/crates/lock/src/fast_lock/object_pool.rs +++ b/crates/lock/src/fast_lock/object_pool.rs @@ -111,6 +111,9 @@ impl ObjectLockState { #[cfg(test)] mod tests { use super::*; + use crate::fast_lock::state::{ExclusiveOwnerInfo, SharedOwnerEntry}; + use std::sync::Arc; + use std::time::{Duration, SystemTime}; #[test] fn test_object_pool() { @@ -142,8 +145,17 @@ mod tests { let mut state = ObjectLockState::new(); // Modify state - *state.current_owner.write() = Some("test_owner".into()); - state.shared_owners.write().push("shared_owner".into()); + *state.current_owner.write() = Some(ExclusiveOwnerInfo { + owner: Arc::from("test_owner"), + acquired_at: SystemTime::now(), + lock_timeout: Duration::from_secs(30), + }); + state.shared_owners.write().push(SharedOwnerEntry { + owner: Arc::from("shared_owner"), + count: 1, + acquired_at: SystemTime::now(), + lock_timeout: Duration::from_secs(30), + }); // Reset state.reset_for_reuse(); diff --git a/crates/lock/src/fast_lock/shard.rs b/crates/lock/src/fast_lock/shard.rs index d0f67e26..6ca24409 100644 --- a/crates/lock/src/fast_lock/shard.rs +++ b/crates/lock/src/fast_lock/shard.rs @@ -88,8 +88,8 @@ impl LockShard { // Try atomic acquisition let success = match request.mode { - LockMode::Shared => state.try_acquire_shared_fast(&request.owner), - LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner), + LockMode::Shared => state.try_acquire_shared_fast(&request.owner, request.lock_timeout), + LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout), }; if success { @@ -108,14 +108,14 @@ impl LockShard { let state = state.clone(); drop(objects); - if state.try_acquire_exclusive_fast(&request.owner) { + if state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout) { return Some(state); } } else { // Create new state from pool and acquire immediately let state_box = self.object_pool.acquire(); let state = Arc::new(*state_box); - if state.try_acquire_exclusive_fast(&request.owner) { + if state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout) { objects.insert(request.key.clone(), state.clone()); return Some(state); } @@ -151,8 +151,8 @@ impl LockShard { // Try acquisition again let success = match request.mode { - LockMode::Shared => state.try_acquire_shared_fast(&request.owner), - LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner), + LockMode::Shared => state.try_acquire_shared_fast(&request.owner, request.lock_timeout), + LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout), }; if success { @@ -443,22 +443,24 @@ impl LockShard { let objects = self.objects.read(); if let Some(state) = objects.get(key) { if let Some(mode) = state.current_mode() { - let owner = match mode { + let (owner, acquired_at, lock_timeout) = match mode { LockMode::Exclusive => { let current_owner = state.current_owner.read(); - current_owner.clone()? + let info = current_owner.clone()?; + (info.owner, info.acquired_at, info.lock_timeout) } LockMode::Shared => { let shared_owners = state.shared_owners.read(); - shared_owners.first()?.clone() + let entry = shared_owners.first()?.clone(); + (entry.owner, entry.acquired_at, entry.lock_timeout) } }; let priority = *state.priority.read(); - // Estimate acquisition time (approximate) - let acquired_at = SystemTime::now() - Duration::from_secs(60); - let expires_at = acquired_at + Duration::from_secs(300); + let expires_at = acquired_at + .checked_add(lock_timeout) + .unwrap_or_else(|| acquired_at + crate::fast_lock::DEFAULT_LOCK_TIMEOUT); return Some(crate::fast_lock::types::ObjectLockInfo { key: key.clone(), diff --git a/crates/lock/src/fast_lock/state.rs b/crates/lock/src/fast_lock/state.rs index 1f075c8e..2896ead0 100644 --- a/crates/lock/src/fast_lock/state.rs +++ b/crates/lock/src/fast_lock/state.rs @@ -308,13 +308,28 @@ pub struct ObjectLockState { // Third cache line: Less frequently accessed data /// Current owner of exclusive lock (if any) - pub current_owner: parking_lot::RwLock>>, + pub current_owner: parking_lot::RwLock>, /// Shared owners - optimized for small number of readers - pub shared_owners: parking_lot::RwLock; 4]>>, + pub shared_owners: parking_lot::RwLock>, /// Lock priority for conflict resolution pub priority: parking_lot::RwLock, } +#[derive(Clone, Debug)] +pub struct ExclusiveOwnerInfo { + pub owner: Arc, + pub acquired_at: SystemTime, + pub lock_timeout: Duration, +} + +#[derive(Clone, Debug)] +pub struct SharedOwnerEntry { + pub owner: Arc, + pub count: u32, + pub acquired_at: SystemTime, + pub lock_timeout: Duration, +} + impl Default for ObjectLockState { fn default() -> Self { Self::new() @@ -335,60 +350,87 @@ impl ObjectLockState { } /// Try fast path shared lock acquisition - pub fn try_acquire_shared_fast(&self, owner: &Arc) -> bool { - if self.atomic_state.try_acquire_shared() { - self.atomic_state.update_access_time(); - let mut shared = self.shared_owners.write(); - if !shared.contains(owner) { - shared.push(owner.clone()); - } - true - } else { - false + pub fn try_acquire_shared_fast(&self, owner: &Arc, lock_timeout: Duration) -> bool { + if !self.atomic_state.try_acquire_shared() { + return false; } + + self.atomic_state.update_access_time(); + let mut shared = self.shared_owners.write(); + if let Some(entry) = shared.iter_mut().find(|entry| entry.owner.as_ref() == owner.as_ref()) { + entry.count = entry.count.saturating_add(1); + entry.acquired_at = SystemTime::now(); + entry.lock_timeout = lock_timeout; + } else { + shared.push(SharedOwnerEntry { + owner: owner.clone(), + count: 1, + acquired_at: SystemTime::now(), + lock_timeout, + }); + } + true } /// Try fast path exclusive lock acquisition - pub fn try_acquire_exclusive_fast(&self, owner: &Arc) -> bool { - if self.atomic_state.try_acquire_exclusive() { - self.atomic_state.update_access_time(); - let mut current = self.current_owner.write(); - *current = Some(owner.clone()); - true - } else { - false + pub fn try_acquire_exclusive_fast(&self, owner: &Arc, lock_timeout: Duration) -> bool { + if !self.atomic_state.try_acquire_exclusive() { + return false; } + + self.atomic_state.update_access_time(); + let mut current = self.current_owner.write(); + *current = Some(ExclusiveOwnerInfo { + owner: owner.clone(), + acquired_at: SystemTime::now(), + lock_timeout, + }); + true } /// Release shared lock pub fn release_shared(&self, owner: &Arc) -> bool { let mut shared = self.shared_owners.write(); - if let Some(pos) = shared.iter().position(|x| x.as_ref() == owner.as_ref()) { - shared.remove(pos); + if let Some(pos) = shared.iter().position(|entry| entry.owner.as_ref() == owner.as_ref()) { + let original_entry = shared[pos].clone(); + let removed_entry = if shared[pos].count > 1 { + shared[pos].count -= 1; + None + } else { + Some(shared.remove(pos)) + }; if self.atomic_state.release_shared() { - // Notify waiting writers if no more readers if shared.is_empty() { drop(shared); self.optimized_notify.notify_writer(); } true } else { - // Inconsistency detected - atomic state shows no shared lock but owner was found tracing::warn!( - "Atomic state inconsistency during shared lock release: owner={}, remaining_owners={}", + "Atomic state inconsistency during shared lock release: owner={}, remaining_entries={}", owner, shared.len() ); - // Re-add owner to maintain consistency - shared.push(owner.clone()); + // Re-add owner entry to maintain consistency when release failed + match removed_entry { + Some(entry) => { + shared.push(entry); + } + None => { + if let Some(existing) = shared.iter_mut().find(|existing| existing.owner.as_ref() == owner.as_ref()) { + existing.count = existing.count.saturating_add(1); + } else { + shared.push(original_entry); + } + } + } false } } else { - // Owner not found in shared owners list tracing::debug!( - "Shared lock release failed - owner not found: owner={}, current_owners={:?}", + "Shared lock release failed - owner not found: owner={}, current_entries={:?}", owner, - shared.iter().map(|s| s.as_ref()).collect::>() + shared.iter().map(|s| s.owner.as_ref()).collect::>() ); false } @@ -397,7 +439,7 @@ impl ObjectLockState { /// Release exclusive lock pub fn release_exclusive(&self, owner: &Arc) -> bool { let mut current = self.current_owner.write(); - if current.as_ref() == Some(owner) { + if current.as_ref().is_some_and(|info| info.owner.as_ref() == owner.as_ref()) { if self.atomic_state.release_exclusive() { *current = None; drop(current); @@ -426,7 +468,7 @@ impl ObjectLockState { tracing::debug!( "Exclusive lock release failed - owner mismatch: expected_owner={}, actual_owner={:?}", owner, - current.as_ref().map(|s| s.as_ref()) + current.as_ref().map(|s| s.owner.as_ref()) ); false } @@ -483,16 +525,18 @@ mod tests { let owner2 = Arc::from("owner2"); // Test shared locks - assert!(state.try_acquire_shared_fast(&owner1)); - assert!(state.try_acquire_shared_fast(&owner2)); - assert!(!state.try_acquire_exclusive_fast(&owner1)); + let timeout = Duration::from_secs(30); + + assert!(state.try_acquire_shared_fast(&owner1, timeout)); + assert!(state.try_acquire_shared_fast(&owner2, timeout)); + assert!(!state.try_acquire_exclusive_fast(&owner1, timeout)); assert!(state.release_shared(&owner1)); assert!(state.release_shared(&owner2)); // Test exclusive lock - assert!(state.try_acquire_exclusive_fast(&owner1)); - assert!(!state.try_acquire_shared_fast(&owner2)); + assert!(state.try_acquire_exclusive_fast(&owner1, timeout)); + assert!(!state.try_acquire_shared_fast(&owner2, timeout)); assert!(state.release_exclusive(&owner1)); } } diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index cd653582..04c5f2ad 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -73,8 +73,11 @@ impl Stream for RetryTimer { //println!("\njitter: {:?}", jitter); //println!("sleep: {sleep:?}"); //println!("0000: {:?}", self.random as f64 * jitter / 100_f64); - let sleep_ms = sleep.as_millis() as u64; - sleep = Duration::from_millis(sleep_ms - (sleep_ms as f64 * (self.random as f64 * jitter / 100_f64)) as u64); + let sleep_ms = sleep.as_millis(); + let reduction = ((sleep_ms as f64) * (self.random as f64 * jitter / 100_f64)).round() as u128; + let jittered_ms = sleep_ms.saturating_sub(reduction); + let clamped_ms = std::cmp::min(jittered_ms.max(1), u128::from(u64::MAX)); + sleep = Duration::from_millis(clamped_ms as u64); } //println!("sleep: {sleep:?}");