feat(lock): enhance lock management with timeout and ownership tracking (#589)

- Add lock timeout support and track acquisition time in lock state
- Improve lock conflict handling with detailed error messages
- Optimize lock reuse when already held by same owner
- Refactor lock state to store owner info and timeout duration
- Update all lock operations to handle new state structure

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
guojidan
2025-09-26 11:21:53 +08:00
committed by GitHub
parent 9b7f4d477a
commit 9b029d18b2
5 changed files with 223 additions and 99 deletions

View File

@@ -71,7 +71,7 @@ use rustfs_filemeta::{
headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS},
merge_file_meta_versions,
};
use rustfs_lock::NamespaceLockManager;
use rustfs_lock::fast_lock::types::LockResult;
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader};
use rustfs_utils::{
@@ -147,6 +147,21 @@ impl SetDisks {
set_endpoints,
})
}
fn format_lock_error(&self, bucket: &str, object: &str, mode: &str, err: &LockResult) -> String {
match err {
LockResult::Timeout => {
format!("{mode} lock acquisition timed out on {bucket}/{object} (owner={})", self.locker_owner)
}
LockResult::Conflict {
current_owner,
current_mode,
} => format!(
"{mode} lock conflicted on {bucket}/{object}: held by {current_owner} as {:?}",
current_mode
),
LockResult::Acquired => format!("unexpected lock state while acquiring {mode} lock on {bucket}/{object}"),
}
}
async fn get_disks_internal(&self) -> Vec<Option<DiskStore>> {
let rl = self.disks.read().await;
@@ -2461,25 +2476,38 @@ impl SetDisks {
// Check if lock is already held
let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object);
let mut reuse_existing_lock = false;
if let Some(lock_info) = self.fast_lock_manager.get_lock_info(&key) {
warn!("Lock already exists for object {}: {:?}", object, lock_info);
if lock_info.owner.as_ref() == self.locker_owner.as_str()
&& matches!(lock_info.mode, rustfs_lock::fast_lock::types::LockMode::Exclusive)
{
reuse_existing_lock = true;
debug!("Reusing existing exclusive lock for object {} held by {}", object, self.locker_owner);
} else {
warn!("Lock already exists for object {}: {:?}", object, lock_info);
}
} else {
info!("No existing lock found for object {}", object);
}
let start_time = std::time::Instant::now();
let lock_result = self
.fast_lock_manager
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|e| {
let elapsed = start_time.elapsed();
error!("Failed to acquire write lock for heal operation after {:?}: {:?}", elapsed, e);
DiskError::other(format!("Failed to acquire write lock for heal operation: {e:?}"))
})?;
let elapsed = start_time.elapsed();
info!("Successfully acquired write lock for object: {} in {:?}", object, elapsed);
Some(lock_result)
if reuse_existing_lock {
None
} else {
let start_time = std::time::Instant::now();
let lock_result = self
.fast_lock_manager
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|e| {
let elapsed = start_time.elapsed();
let message = self.format_lock_error(bucket, object, "write", &e);
error!("Failed to acquire write lock for heal operation after {:?}: {}", elapsed, message);
DiskError::other(message)
})?;
let elapsed = start_time.elapsed();
info!("Successfully acquired write lock for object: {} in {:?}", object, elapsed);
Some(lock_result)
}
} else {
info!("Skipping lock acquisition (no_lock=true)");
None
@@ -3079,19 +3107,14 @@ impl SetDisks {
}
}
async fn heal_object_dir(
/// Heal directory metadata assuming caller already holds the write lock for `(bucket, object)`.
async fn heal_object_dir_locked(
&self,
bucket: &str,
object: &str,
dry_run: bool,
remove: bool,
) -> Result<(HealResultItem, Option<DiskError>)> {
let _write_lock_guard = self
.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.await
.map_err(|e| DiskError::other(format!("Failed to acquire write lock for heal directory operation: {e:?}")))?;
let disks = {
let disks = self.disks.read().await;
disks.clone()
@@ -3186,6 +3209,27 @@ impl SetDisks {
Ok((result, None))
}
#[allow(dead_code)]
/// Heal directory metadata after acquiring the necessary write lock.
async fn heal_object_dir(
&self,
bucket: &str,
object: &str,
dry_run: bool,
remove: bool,
) -> Result<(HealResultItem, Option<DiskError>)> {
let _write_lock_guard = self
.fast_lock_manager
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|e| {
let message = self.format_lock_error(bucket, object, "write", &e);
DiskError::other(message)
})?;
self.heal_object_dir_locked(bucket, object, dry_run, remove).await
}
async fn default_heal_result(
&self,
lfi: FileInfo,
@@ -3450,9 +3494,9 @@ impl ObjectIO for SetDisks {
let _read_lock_guard = if !opts.no_lock {
Some(
self.fast_lock_manager
.acquire_read_lock("", object, self.locker_owner.as_str())
.acquire_read_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?,
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "read", &e)))?,
)
} else {
None
@@ -3539,9 +3583,9 @@ impl ObjectIO for SetDisks {
let _object_lock_guard = if !opts.no_lock {
Some(
self.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?,
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?,
)
} else {
None
@@ -3835,9 +3879,9 @@ impl StorageAPI for SetDisks {
// Guard lock for source object metadata update
let _lock_guard = self
.fast_lock_manager
.acquire_write_lock("", src_object, self.locker_owner.as_str())
.acquire_write_lock(src_bucket, src_object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?;
.map_err(|e| Error::other(self.format_lock_error(src_bucket, src_object, "write", &e)))?;
let disks = self.get_disks_internal().await;
@@ -3935,9 +3979,9 @@ impl StorageAPI for SetDisks {
// Guard lock for single object delete-version
let _lock_guard = self
.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?;
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?;
let disks = self.get_disks(0, 0).await?;
let write_quorum = disks.len() / 2 + 1;
@@ -4007,17 +4051,18 @@ impl StorageAPI for SetDisks {
for object_name in unique_objects {
match self
.fast_lock_manager
.acquire_write_lock("", object_name.as_str(), self.locker_owner.as_str())
.acquire_write_lock(bucket, object_name.as_str(), self.locker_owner.as_str())
.await
{
Ok(guard) => {
_guards.insert(object_name, guard);
}
Err(_) => {
Err(err) => {
let message = self.format_lock_error(bucket, object_name.as_str(), "write", &err);
// Mark all operations on this object as failed
for (i, dobj) in objects.iter().enumerate() {
if dobj.object_name == object_name {
del_errs[i] = Some(Error::other("can not get lock. please retry"));
del_errs[i] = Some(Error::other(message.clone()));
}
}
}
@@ -4141,9 +4186,9 @@ impl StorageAPI for SetDisks {
let _lock_guard = if !opts.delete_prefix {
Some(
self.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?,
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?,
)
} else {
None
@@ -4329,9 +4374,9 @@ impl StorageAPI for SetDisks {
let _read_lock_guard = if !opts.no_lock {
Some(
self.fast_lock_manager
.acquire_read_lock("", object, self.locker_owner.as_str())
.acquire_read_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?,
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "read", &e)))?,
)
} else {
None
@@ -4371,9 +4416,9 @@ impl StorageAPI for SetDisks {
let _lock_guard = if !opts.no_lock {
Some(
self.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|_| Error::other("can not get lock. please retry".to_string()))?,
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?,
)
} else {
None
@@ -5640,18 +5685,36 @@ impl StorageAPI for SetDisks {
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)> {
let _write_lock_guard = if !opts.no_lock {
Some(
self.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.await
.map_err(|e| Error::other(format!("Failed to acquire write lock for heal operation: {e:?}")))?,
)
let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object);
let mut skip_lock = false;
if let Some(lock_info) = self.fast_lock_manager.get_lock_info(&key) {
if lock_info.owner.as_ref() == self.locker_owner.as_str()
&& matches!(lock_info.mode, rustfs_lock::fast_lock::types::LockMode::Exclusive)
{
debug!(
"Reusing existing exclusive lock for heal operation on {}/{} held by {}",
bucket, object, self.locker_owner
);
skip_lock = true;
}
}
if skip_lock {
None
} else {
Some(
self.fast_lock_manager
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))?,
)
}
} else {
None
};
if has_suffix(object, SLASH_SEPARATOR) {
let (result, err) = self.heal_object_dir(bucket, object, opts.dry_run, opts.remove).await?;
let (result, err) = self.heal_object_dir_locked(bucket, object, opts.dry_run, opts.remove).await?;
return Ok((result, err.map(|e| e.into())));
}

View File

@@ -111,6 +111,9 @@ impl ObjectLockState {
#[cfg(test)]
mod tests {
use super::*;
use crate::fast_lock::state::{ExclusiveOwnerInfo, SharedOwnerEntry};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[test]
fn test_object_pool() {
@@ -142,8 +145,17 @@ mod tests {
let mut state = ObjectLockState::new();
// Modify state
*state.current_owner.write() = Some("test_owner".into());
state.shared_owners.write().push("shared_owner".into());
*state.current_owner.write() = Some(ExclusiveOwnerInfo {
owner: Arc::from("test_owner"),
acquired_at: SystemTime::now(),
lock_timeout: Duration::from_secs(30),
});
state.shared_owners.write().push(SharedOwnerEntry {
owner: Arc::from("shared_owner"),
count: 1,
acquired_at: SystemTime::now(),
lock_timeout: Duration::from_secs(30),
});
// Reset
state.reset_for_reuse();

View File

@@ -88,8 +88,8 @@ impl LockShard {
// Try atomic acquisition
let success = match request.mode {
LockMode::Shared => state.try_acquire_shared_fast(&request.owner),
LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner),
LockMode::Shared => state.try_acquire_shared_fast(&request.owner, request.lock_timeout),
LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout),
};
if success {
@@ -108,14 +108,14 @@ impl LockShard {
let state = state.clone();
drop(objects);
if state.try_acquire_exclusive_fast(&request.owner) {
if state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout) {
return Some(state);
}
} else {
// Create new state from pool and acquire immediately
let state_box = self.object_pool.acquire();
let state = Arc::new(*state_box);
if state.try_acquire_exclusive_fast(&request.owner) {
if state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout) {
objects.insert(request.key.clone(), state.clone());
return Some(state);
}
@@ -151,8 +151,8 @@ impl LockShard {
// Try acquisition again
let success = match request.mode {
LockMode::Shared => state.try_acquire_shared_fast(&request.owner),
LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner),
LockMode::Shared => state.try_acquire_shared_fast(&request.owner, request.lock_timeout),
LockMode::Exclusive => state.try_acquire_exclusive_fast(&request.owner, request.lock_timeout),
};
if success {
@@ -443,22 +443,24 @@ impl LockShard {
let objects = self.objects.read();
if let Some(state) = objects.get(key) {
if let Some(mode) = state.current_mode() {
let owner = match mode {
let (owner, acquired_at, lock_timeout) = match mode {
LockMode::Exclusive => {
let current_owner = state.current_owner.read();
current_owner.clone()?
let info = current_owner.clone()?;
(info.owner, info.acquired_at, info.lock_timeout)
}
LockMode::Shared => {
let shared_owners = state.shared_owners.read();
shared_owners.first()?.clone()
let entry = shared_owners.first()?.clone();
(entry.owner, entry.acquired_at, entry.lock_timeout)
}
};
let priority = *state.priority.read();
// Estimate acquisition time (approximate)
let acquired_at = SystemTime::now() - Duration::from_secs(60);
let expires_at = acquired_at + Duration::from_secs(300);
let expires_at = acquired_at
.checked_add(lock_timeout)
.unwrap_or_else(|| acquired_at + crate::fast_lock::DEFAULT_LOCK_TIMEOUT);
return Some(crate::fast_lock::types::ObjectLockInfo {
key: key.clone(),

View File

@@ -308,13 +308,28 @@ pub struct ObjectLockState {
// Third cache line: Less frequently accessed data
/// Current owner of exclusive lock (if any)
pub current_owner: parking_lot::RwLock<Option<Arc<str>>>,
pub current_owner: parking_lot::RwLock<Option<ExclusiveOwnerInfo>>,
/// Shared owners - optimized for small number of readers
pub shared_owners: parking_lot::RwLock<smallvec::SmallVec<[Arc<str>; 4]>>,
pub shared_owners: parking_lot::RwLock<smallvec::SmallVec<[SharedOwnerEntry; 4]>>,
/// Lock priority for conflict resolution
pub priority: parking_lot::RwLock<LockPriority>,
}
#[derive(Clone, Debug)]
pub struct ExclusiveOwnerInfo {
pub owner: Arc<str>,
pub acquired_at: SystemTime,
pub lock_timeout: Duration,
}
#[derive(Clone, Debug)]
pub struct SharedOwnerEntry {
pub owner: Arc<str>,
pub count: u32,
pub acquired_at: SystemTime,
pub lock_timeout: Duration,
}
impl Default for ObjectLockState {
fn default() -> Self {
Self::new()
@@ -335,60 +350,87 @@ impl ObjectLockState {
}
/// Try fast path shared lock acquisition
pub fn try_acquire_shared_fast(&self, owner: &Arc<str>) -> bool {
if self.atomic_state.try_acquire_shared() {
self.atomic_state.update_access_time();
let mut shared = self.shared_owners.write();
if !shared.contains(owner) {
shared.push(owner.clone());
}
true
} else {
false
pub fn try_acquire_shared_fast(&self, owner: &Arc<str>, lock_timeout: Duration) -> bool {
if !self.atomic_state.try_acquire_shared() {
return false;
}
self.atomic_state.update_access_time();
let mut shared = self.shared_owners.write();
if let Some(entry) = shared.iter_mut().find(|entry| entry.owner.as_ref() == owner.as_ref()) {
entry.count = entry.count.saturating_add(1);
entry.acquired_at = SystemTime::now();
entry.lock_timeout = lock_timeout;
} else {
shared.push(SharedOwnerEntry {
owner: owner.clone(),
count: 1,
acquired_at: SystemTime::now(),
lock_timeout,
});
}
true
}
/// Try fast path exclusive lock acquisition
pub fn try_acquire_exclusive_fast(&self, owner: &Arc<str>) -> bool {
if self.atomic_state.try_acquire_exclusive() {
self.atomic_state.update_access_time();
let mut current = self.current_owner.write();
*current = Some(owner.clone());
true
} else {
false
pub fn try_acquire_exclusive_fast(&self, owner: &Arc<str>, lock_timeout: Duration) -> bool {
if !self.atomic_state.try_acquire_exclusive() {
return false;
}
self.atomic_state.update_access_time();
let mut current = self.current_owner.write();
*current = Some(ExclusiveOwnerInfo {
owner: owner.clone(),
acquired_at: SystemTime::now(),
lock_timeout,
});
true
}
/// Release shared lock
pub fn release_shared(&self, owner: &Arc<str>) -> bool {
let mut shared = self.shared_owners.write();
if let Some(pos) = shared.iter().position(|x| x.as_ref() == owner.as_ref()) {
shared.remove(pos);
if let Some(pos) = shared.iter().position(|entry| entry.owner.as_ref() == owner.as_ref()) {
let original_entry = shared[pos].clone();
let removed_entry = if shared[pos].count > 1 {
shared[pos].count -= 1;
None
} else {
Some(shared.remove(pos))
};
if self.atomic_state.release_shared() {
// Notify waiting writers if no more readers
if shared.is_empty() {
drop(shared);
self.optimized_notify.notify_writer();
}
true
} else {
// Inconsistency detected - atomic state shows no shared lock but owner was found
tracing::warn!(
"Atomic state inconsistency during shared lock release: owner={}, remaining_owners={}",
"Atomic state inconsistency during shared lock release: owner={}, remaining_entries={}",
owner,
shared.len()
);
// Re-add owner to maintain consistency
shared.push(owner.clone());
// Re-add owner entry to maintain consistency when release failed
match removed_entry {
Some(entry) => {
shared.push(entry);
}
None => {
if let Some(existing) = shared.iter_mut().find(|existing| existing.owner.as_ref() == owner.as_ref()) {
existing.count = existing.count.saturating_add(1);
} else {
shared.push(original_entry);
}
}
}
false
}
} else {
// Owner not found in shared owners list
tracing::debug!(
"Shared lock release failed - owner not found: owner={}, current_owners={:?}",
"Shared lock release failed - owner not found: owner={}, current_entries={:?}",
owner,
shared.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
shared.iter().map(|s| s.owner.as_ref()).collect::<Vec<_>>()
);
false
}
@@ -397,7 +439,7 @@ impl ObjectLockState {
/// Release exclusive lock
pub fn release_exclusive(&self, owner: &Arc<str>) -> bool {
let mut current = self.current_owner.write();
if current.as_ref() == Some(owner) {
if current.as_ref().is_some_and(|info| info.owner.as_ref() == owner.as_ref()) {
if self.atomic_state.release_exclusive() {
*current = None;
drop(current);
@@ -426,7 +468,7 @@ impl ObjectLockState {
tracing::debug!(
"Exclusive lock release failed - owner mismatch: expected_owner={}, actual_owner={:?}",
owner,
current.as_ref().map(|s| s.as_ref())
current.as_ref().map(|s| s.owner.as_ref())
);
false
}
@@ -483,16 +525,18 @@ mod tests {
let owner2 = Arc::from("owner2");
// Test shared locks
assert!(state.try_acquire_shared_fast(&owner1));
assert!(state.try_acquire_shared_fast(&owner2));
assert!(!state.try_acquire_exclusive_fast(&owner1));
let timeout = Duration::from_secs(30);
assert!(state.try_acquire_shared_fast(&owner1, timeout));
assert!(state.try_acquire_shared_fast(&owner2, timeout));
assert!(!state.try_acquire_exclusive_fast(&owner1, timeout));
assert!(state.release_shared(&owner1));
assert!(state.release_shared(&owner2));
// Test exclusive lock
assert!(state.try_acquire_exclusive_fast(&owner1));
assert!(!state.try_acquire_shared_fast(&owner2));
assert!(state.try_acquire_exclusive_fast(&owner1, timeout));
assert!(!state.try_acquire_shared_fast(&owner2, timeout));
assert!(state.release_exclusive(&owner1));
}
}

View File

@@ -73,8 +73,11 @@ impl Stream for RetryTimer {
//println!("\njitter: {:?}", jitter);
//println!("sleep: {sleep:?}");
//println!("0000: {:?}", self.random as f64 * jitter / 100_f64);
let sleep_ms = sleep.as_millis() as u64;
sleep = Duration::from_millis(sleep_ms - (sleep_ms as f64 * (self.random as f64 * jitter / 100_f64)) as u64);
let sleep_ms = sleep.as_millis();
let reduction = ((sleep_ms as f64) * (self.random as f64 * jitter / 100_f64)).round() as u128;
let jittered_ms = sleep_ms.saturating_sub(reduction);
let clamped_ms = std::cmp::min(jittered_ms.max(1), u128::from(u64::MAX));
sleep = Duration::from_millis(clamped_ms as u64);
}
//println!("sleep: {sleep:?}");