From 374a702f041f1cf7ca3654e35a12ed6651a45732 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Sat, 9 Aug 2025 21:05:46 +0800 Subject: [PATCH] improve lock Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/ecstore/src/sets.rs | 8 +- crates/lock/src/guard.rs | 23 +- crates/lock/src/local.rs | 494 ++++++++++++++++++++++++----------- crates/lock/src/namespace.rs | 98 ++++--- crates/utils/src/lib.rs | 1 + 5 files changed, 408 insertions(+), 216 deletions(-) diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 40e692ae..7ec6e01d 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -165,7 +165,13 @@ impl Sets { let lock_clients = create_unique_clients(&set_endpoints).await?; - let namespace_lock = rustfs_lock::NamespaceLock::with_clients(format!("set-{i}"), lock_clients); + // Bind lock quorum to EC write quorum for this set: data_shards (+1 if equal to parity) per default_write_quorum() + let mut write_quorum = set_drive_count - parity_count; + if write_quorum == parity_count { + write_quorum += 1; + } + let namespace_lock = + rustfs_lock::NamespaceLock::with_clients_and_quorum(format!("set-{i}"), lock_clients, write_quorum); let set_disks = SetDisks::new( Arc::new(namespace_lock), diff --git a/crates/lock/src/guard.rs b/crates/lock/src/guard.rs index 490c745b..2a286e10 100644 --- a/crates/lock/src/guard.rs +++ b/crates/lock/src/guard.rs @@ -32,29 +32,20 @@ struct UnlockRuntime { // Global unlock runtime with background worker static UNLOCK_RUNTIME: Lazy = Lazy::new(|| { - // Buffered channel to avoid blocking in Drop - let (tx, mut rx) = mpsc::channel::(1024); + // Larger buffer to reduce contention during bursts + let (tx, mut rx) = mpsc::channel::(8192); // Spawn background worker when first used; assumes a Tokio runtime is available tokio::spawn(async move { while let Some(job) = rx.recv().await { - // Best-effort release across clients; success if any succeeds + // Best-effort release across clients; try all, success if any succeeds let mut any_ok = false; let lock_id = job.lock_id.clone(); - let futures = job - .clients - .into_iter() - .map(|client| { - let id = lock_id.clone(); - async move { client.release(&id).await.unwrap_or(false) } - }) - .collect::>(); - - let results = futures::future::join_all(futures).await; - if results.into_iter().any(|s| s) { - any_ok = true; + for client in job.clients.into_iter() { + if client.release(&lock_id).await.unwrap_or(false) { + any_ok = true; + } } - if !any_ok { tracing::warn!("LockGuard background release failed for {}", lock_id); } else { diff --git a/crates/lock/src/local.rs b/crates/lock/src/local.rs index d0b7239a..c514a1c2 100644 --- a/crates/lock/src/local.rs +++ b/crates/lock/src/local.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, Notify, RwLock}; use crate::LockRequest; @@ -29,6 +29,11 @@ pub struct LocalLockEntry { pub readers: HashMap, /// lock expiration time pub expires_at: Option, + /// number of writers waiting (for simple fairness against reader storms) + pub writer_pending: usize, + /// notifiers for readers/writers + pub notify_readers: Arc, + pub notify_writers: Arc, } /// local lock map @@ -38,6 +43,10 @@ pub struct LocalLockMap { pub locks: Arc>>>>, /// Shutdown flag for background tasks shutdown: Arc, + /// expiration schedule map: when -> lock_ids + expirations: Arc>>>, + /// notify expiry task when new earlier deadline arrives + exp_notify: Arc, } impl Default for LocalLockMap { @@ -52,6 +61,8 @@ impl LocalLockMap { let map = Self { locks: Arc::new(RwLock::new(HashMap::new())), shutdown: Arc::new(AtomicBool::new(false)), + expirations: Arc::new(Mutex::new(BTreeMap::new())), + exp_notify: Arc::new(Notify::new()), }; map.spawn_expiry_task(); map @@ -61,56 +72,121 @@ impl LocalLockMap { fn spawn_expiry_task(&self) { let locks = self.locks.clone(); let shutdown = self.shutdown.clone(); + let expirations = self.expirations.clone(); + let exp_notify = self.exp_notify.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); loop { - interval.tick().await; - if shutdown.load(Ordering::Relaxed) { tracing::debug!("Expiry task shutting down"); break; } - let now = Instant::now(); - let mut to_remove = Vec::new(); + // Find next deadline and drain due ids + let (due_ids, wait_duration) = { + let mut due = Vec::new(); + let mut guard = expirations.lock().await; + let now = Instant::now(); + let next_deadline = guard.first_key_value().map(|(k, _)| *k); + // drain all <= now + let mut keys_to_remove = Vec::new(); + for (k, v) in guard.range(..=now).map(|(k, v)| (*k, v.clone())) { + due.extend(v); + keys_to_remove.push(k); + } + for k in keys_to_remove { + guard.remove(&k); + } + let wait = if due.is_empty() { + next_deadline.and_then(|dl| { + if dl > now { + Some(dl - now) + } else { + Some(Duration::from_millis(0)) + } + }) + } else { + Some(Duration::from_millis(0)) + }; + (due, wait) + }; - { - let locks_guard = locks.read().await; - for (key, entry) in locks_guard.iter() { - if let Ok(mut entry_guard) = entry.try_write() { - if let Some(exp) = entry_guard.expires_at { - if exp <= now { - entry_guard.writer = None; - entry_guard.readers.clear(); - entry_guard.expires_at = None; + if !due_ids.is_empty() { + // process due ids without holding the map lock during awaits + let now = Instant::now(); + // collect entries to process + let entries: Vec<(crate::types::LockId, Arc>)> = { + let locks_guard = locks.read().await; + due_ids + .into_iter() + .filter_map(|id| locks_guard.get(&id).cloned().map(|e| (id, e))) + .collect() + }; - if entry_guard.writer.is_none() && entry_guard.readers.is_empty() { - to_remove.push(key.clone()); - } + let mut to_remove = Vec::new(); + for (lock_id, entry) in entries { + let mut entry_guard = entry.write().await; + if let Some(exp) = entry_guard.expires_at { + if exp <= now { + entry_guard.writer = None; + entry_guard.readers.clear(); + entry_guard.expires_at = None; + entry_guard.notify_writers.notify_waiters(); + entry_guard.notify_readers.notify_waiters(); + if entry_guard.writer.is_none() && entry_guard.readers.is_empty() { + to_remove.push(lock_id); } } } } + if !to_remove.is_empty() { + let mut locks_w = locks.write().await; + for id in to_remove { + let _ = locks_w.remove(&id); + } + } + continue; // immediately look for next } - if !to_remove.is_empty() { - let mut locks_guard = locks.write().await; - for key in to_remove { - locks_guard.remove(&key); + // nothing due; wait for next deadline or notification + if let Some(dur) = wait_duration { + tokio::select! { + _ = tokio::time::sleep(dur) => {}, + _ = exp_notify.notified() => {}, } + } else { + // no deadlines, wait for new schedule or shutdown tick + exp_notify.notified().await; } } }); } + /// schedule an expiry time for the given lock id (inline, avoid per-acquisition spawn) + async fn schedule_expiry(&self, id: crate::types::LockId, exp: Instant) { + let mut guard = self.expirations.lock().await; + let is_earliest = match guard.first_key_value() { + Some((k, _)) => exp < *k, + None => true, + }; + guard.entry(exp).or_insert_with(Vec::new).push(id); + drop(guard); + if is_earliest { + self.exp_notify.notify_waiters(); + } + } + /// write lock with TTL, support timeout, use LockRequest pub async fn lock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result { let start = Instant::now(); - let expires_at = Some(Instant::now() + request.ttl); loop { - // get or create lock entry - let entry = { + // get or create lock entry (double-checked to reduce write-lock contention) + let entry = if let Some(e) = { + let locks_guard = self.locks.read().await; + locks_guard.get(&request.lock_id).cloned() + } { + e + } else { let mut locks_guard = self.locks.write().await; locks_guard .entry(request.lock_id.clone()) @@ -119,13 +195,17 @@ impl LocalLockMap { writer: None, readers: HashMap::new(), expires_at: None, + writer_pending: 0, + notify_readers: Arc::new(Notify::new()), + notify_writers: Arc::new(Notify::new()), })) }) .clone() }; - // try to get write lock to modify state - if let Ok(mut entry_guard) = entry.try_write() { + // attempt acquisition or wait using Notify + let notify_to_wait = { + let mut entry_guard = entry.write().await; // check expired state let now = Instant::now(); if let Some(exp) = entry_guard.expires_at { @@ -136,30 +216,68 @@ impl LocalLockMap { } } - // check if can get write lock + // try acquire if entry_guard.writer.is_none() && entry_guard.readers.is_empty() { entry_guard.writer = Some(request.owner.clone()); - entry_guard.expires_at = expires_at; + let expires_at = Instant::now() + request.ttl; + entry_guard.expires_at = Some(expires_at); tracing::debug!("Write lock acquired for resource '{}' by owner '{}'", request.resource, request.owner); + { + drop(entry_guard); + self.schedule_expiry(request.lock_id.clone(), expires_at).await; + } return Ok(true); } - } + // couldn't acquire now, mark as pending writer and choose notifier + entry_guard.writer_pending = entry_guard.writer_pending.saturating_add(1); + entry_guard.notify_writers.clone() + }; - if start.elapsed() >= request.acquire_timeout { + // wait with remaining timeout + let elapsed = start.elapsed(); + if elapsed >= request.acquire_timeout { + // best-effort decrement pending counter + if let Ok(mut eg) = entry.try_write() { + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } else { + let mut eg = entry.write().await; + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } return Ok(false); } - tokio::time::sleep(Duration::from_millis(10)).await; + let remaining = request.acquire_timeout - elapsed; + if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() { + // timeout; decrement pending before returning + if let Ok(mut eg) = entry.try_write() { + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } else { + let mut eg = entry.write().await; + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } + return Ok(false); + } + // woke up; decrement pending before retrying + if let Ok(mut eg) = entry.try_write() { + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } else { + let mut eg = entry.write().await; + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } } } /// read lock with TTL, support timeout, use LockRequest pub async fn rlock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result { let start = Instant::now(); - let expires_at = Some(Instant::now() + request.ttl); loop { - // get or create lock entry - let entry = { + // get or create lock entry (double-checked to reduce write-lock contention) + let entry = if let Some(e) = { + let locks_guard = self.locks.read().await; + locks_guard.get(&request.lock_id).cloned() + } { + e + } else { let mut locks_guard = self.locks.write().await; locks_guard .entry(request.lock_id.clone()) @@ -168,13 +286,17 @@ impl LocalLockMap { writer: None, readers: HashMap::new(), expires_at: None, + writer_pending: 0, + notify_readers: Arc::new(Notify::new()), + notify_writers: Arc::new(Notify::new()), })) }) .clone() }; - // try to get write lock to modify state - if let Ok(mut entry_guard) = entry.try_write() { + // attempt acquisition or wait using Notify + let notify_to_wait = { + let mut entry_guard = entry.write().await; // check expired state let now = Instant::now(); if let Some(exp) = entry_guard.expires_at { @@ -185,189 +307,247 @@ impl LocalLockMap { } } - // check if can get read lock - if entry_guard.writer.is_none() { - // increase read lock count + if entry_guard.writer.is_none() && entry_guard.writer_pending == 0 { *entry_guard.readers.entry(request.owner.clone()).or_insert(0) += 1; - entry_guard.expires_at = expires_at; + let expires_at = Instant::now() + request.ttl; + entry_guard.expires_at = Some(expires_at); tracing::debug!("Read lock acquired for resource '{}' by owner '{}'", request.resource, request.owner); + { + drop(entry_guard); + self.schedule_expiry(request.lock_id.clone(), expires_at).await; + } return Ok(true); } - } - if start.elapsed() >= request.acquire_timeout { + // choose notifier: prefer waiting on writers if writers pending, else readers + if entry_guard.writer_pending > 0 { + entry_guard.notify_writers.clone() + } else { + entry_guard.notify_readers.clone() + } + }; + + // wait with remaining timeout + let elapsed = start.elapsed(); + if elapsed >= request.acquire_timeout { + return Ok(false); + } + let remaining = request.acquire_timeout - elapsed; + if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() { return Ok(false); } - tokio::time::sleep(Duration::from_millis(10)).await; } } /// unlock by LockId and owner - need to specify owner to correctly unlock pub async fn unlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> { - println!("Unlocking lock_id: {lock_id:?}, owner: {owner}"); - let mut need_remove = false; - - { + // first, get the entry without holding the write lock on the map + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - println!("Found lock entry, attempting to acquire write lock..."); - match entry.try_write() { - Ok(mut entry_guard) => { - println!("Successfully acquired write lock for unlock"); - // try to release write lock - if entry_guard.writer.as_ref() == Some(&owner.to_string()) { - println!("Releasing write lock for owner: {owner}"); - entry_guard.writer = None; - } - // try to release read lock - else if let Some(count) = entry_guard.readers.get_mut(owner) { - println!("Releasing read lock for owner: {owner} (count: {count})"); - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(owner); - println!("Removed owner {owner} from readers"); - } - } else { - println!("Owner {owner} not found in writers or readers"); - } - // check if need to remove - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - println!("Lock entry is empty, marking for removal"); - entry_guard.expires_at = None; - need_remove = true; - } else { - println!( - "Lock entry still has content: writer={:?}, readers={:?}", - entry_guard.writer, entry_guard.readers - ); - } - } - Err(_) => { - println!("Failed to acquire write lock for unlock - this is the problem!"); - return Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "Failed to acquire write lock for unlock", - )); - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Lock entry not found")), + } + }; + + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // try to release write lock + if entry_guard.writer.as_ref() == Some(&owner.to_string()) { + entry_guard.writer = None; + } + // try to release read lock + else if let Some(count) = entry_guard.readers.get_mut(owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(owner); } } else { - println!("Lock entry not found for lock_id: {lock_id:?}"); + // owner not found, treat as no-op } + + // check if need to remove + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + // capture notifications and state + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + // Wake a single writer to preserve fairness and avoid thundering herd + notify_writers.notify_one(); + } else if writer_none { + // No writers waiting, allow readers to proceed + notify_readers.notify_waiters(); } - // only here, entry's Ref is really dropped, can safely remove if need_remove { - println!("Removing lock entry from map..."); let mut locks_guard = self.locks.write().await; - let removed = locks_guard.remove(lock_id); - println!("Lock entry removed: {:?}", removed.is_some()); + let _ = locks_guard.remove(lock_id); } - println!("Unlock operation completed"); Ok(()) } /// unlock by LockId - smart release (compatible with old interface, but may be inaccurate) pub async fn unlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> { - let mut need_remove = false; - - { + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - if let Ok(mut entry_guard) = entry.try_write() { - // release write lock first - if entry_guard.writer.is_some() { - entry_guard.writer = None; - } - // if no write lock, release first read lock - else if let Some((owner, _)) = entry_guard.readers.iter().next() { - let owner = owner.clone(); - if let Some(count) = entry_guard.readers.get_mut(&owner) { - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(&owner); - } - } - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Ok(()), // nothing to do + } + }; - // if completely idle, clean entry - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - entry_guard.expires_at = None; - need_remove = true; + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // release write lock first + if entry_guard.writer.is_some() { + entry_guard.writer = None; + } + // if no write lock, release first read lock + else if let Some((owner, _)) = entry_guard.readers.iter().next() { + let owner = owner.clone(); + if let Some(count) = entry_guard.readers.get_mut(&owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(&owner); } } } + + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + notify_writers.notify_one(); + } else if writer_none { + notify_readers.notify_waiters(); } if need_remove { let mut locks_guard = self.locks.write().await; - locks_guard.remove(lock_id); + let _ = locks_guard.remove(lock_id); } Ok(()) } /// runlock by LockId and owner - need to specify owner to correctly unlock read lock pub async fn runlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> { - let mut need_remove = false; - - { + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - if let Ok(mut entry_guard) = entry.try_write() { - // release read lock - if let Some(count) = entry_guard.readers.get_mut(owner) { - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(owner); - } - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Ok(()), + } + }; - // if completely idle, clean entry - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - entry_guard.expires_at = None; - need_remove = true; - } + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // release read lock + if let Some(count) = entry_guard.readers.get_mut(owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(owner); } } + + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + notify_writers.notify_waiters(); + } else if writer_none { + notify_readers.notify_waiters(); } if need_remove { let mut locks_guard = self.locks.write().await; - locks_guard.remove(lock_id); + let _ = locks_guard.remove(lock_id); } Ok(()) } /// runlock by LockId - smart release read lock (compatible with old interface) pub async fn runlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> { - let mut need_remove = false; - - { + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - if let Ok(mut entry_guard) = entry.try_write() { - // release first read lock - if let Some((owner, _)) = entry_guard.readers.iter().next() { - let owner = owner.clone(); - if let Some(count) = entry_guard.readers.get_mut(&owner) { - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(&owner); - } - } - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Ok(()), + } + }; - // if completely idle, clean entry - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - entry_guard.expires_at = None; - need_remove = true; + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // release first read lock + if let Some((owner, _)) = entry_guard.readers.iter().next() { + let owner = owner.clone(); + if let Some(count) = entry_guard.readers.get_mut(&owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(&owner); } } } + + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + notify_writers.notify_waiters(); + } else if writer_none { + notify_readers.notify_waiters(); } if need_remove { let mut locks_guard = self.locks.write().await; - locks_guard.remove(lock_id); + let _ = locks_guard.remove(lock_id); } Ok(()) } diff --git a/crates/lock/src/namespace.rs b/crates/lock/src/namespace.rs index 630b22e8..19cdffa8 100644 --- a/crates/lock/src/namespace.rs +++ b/crates/lock/src/namespace.rs @@ -61,6 +61,22 @@ impl NamespaceLock { } } + /// Create namespace lock with clients and an explicit quorum size. + /// Quorum will be clamped into [1, clients.len()]. For single client, quorum is always 1. + pub fn with_clients_and_quorum(namespace: String, clients: Vec>, quorum: usize) -> Self { + let q = if clients.len() <= 1 { + 1 + } else { + quorum.clamp(1, clients.len()) + }; + + Self { + clients, + namespace, + quorum: q, + } + } + /// Create namespace lock with client (compatibility) pub fn with_client(client: Arc) -> Self { Self::with_clients("default".to_string(), vec![client]) @@ -87,17 +103,33 @@ impl NamespaceLock { return self.clients[0].acquire_lock(request).await; } - // Two-phase commit for distributed lock acquisition - self.acquire_lock_with_2pc(request).await + // Quorum-based acquisition for distributed mode + let (resp, _idxs) = self.acquire_lock_quorum(request).await?; + Ok(resp) } /// Acquire a lock and return a RAII guard that will release asynchronously on Drop. /// This is a thin wrapper around `acquire_lock` and will only create a guard when acquisition succeeds. pub async fn acquire_guard(&self, request: &LockRequest) -> Result> { - let resp = self.acquire_lock(request).await?; + if self.clients.is_empty() { + return Err(LockError::internal("No lock clients available")); + } + + if self.clients.len() == 1 { + let resp = self.clients[0].acquire_lock(request).await?; + if resp.success { + return Ok(Some(LockGuard::new( + LockId::new_deterministic(&request.resource), + vec![self.clients[0].clone()], + ))); + } + return Ok(None); + } + + let (resp, idxs) = self.acquire_lock_quorum(request).await?; if resp.success { - let guard = LockGuard::new(LockId::new_deterministic(&request.resource), self.clients.clone()); - Ok(Some(guard)) + let subset: Vec<_> = idxs.into_iter().filter_map(|i| self.clients.get(i).cloned()).collect(); + Ok(Some(LockGuard::new(LockId::new_deterministic(&request.resource), subset))) } else { Ok(None) } @@ -119,50 +151,29 @@ impl NamespaceLock { self.acquire_guard(&req).await } - /// Two-phase commit lock acquisition: all nodes must succeed or all fail - async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result { - // Phase 1: Prepare - try to acquire lock on all clients - let futures: Vec<_> = self + /// Quorum-based lock acquisition: success if at least `self.quorum` clients succeed. + /// Returns the LockResponse and the indices of clients that acquired the lock. + async fn acquire_lock_quorum(&self, request: &LockRequest) -> Result<(LockResponse, Vec)> { + let futs: Vec<_> = self .clients .iter() .enumerate() - .map(|(idx, client)| async move { - let result = client.acquire_lock(request).await; - (idx, result) - }) + .map(|(idx, client)| async move { (idx, client.acquire_lock(request).await) }) .collect(); - let results = futures::future::join_all(futures).await; + let results = futures::future::join_all(futs).await; let mut successful_clients = Vec::new(); - let mut failed_clients = Vec::new(); - // Collect results - for (idx, result) in results { - match result { - Ok(response) if response.success => { + for (idx, res) in results { + if let Ok(resp) = res { + if resp.success { successful_clients.push(idx); } - _ => { - failed_clients.push(idx); - } } } - // Check if we have enough successful acquisitions for quorum if successful_clients.len() >= self.quorum { - // Phase 2a: Commit - we have quorum, but need to ensure consistency - // If not all clients succeeded, we need to rollback for consistency - if successful_clients.len() < self.clients.len() { - // Rollback all successful acquisitions to maintain consistency - self.rollback_acquisitions(request, &successful_clients).await; - return Ok(LockResponse::failure( - "Partial success detected, rolled back for consistency".to_string(), - Duration::ZERO, - )); - } - - // All clients succeeded - lock acquired successfully - Ok(LockResponse::success( + let resp = LockResponse::success( LockInfo { id: LockId::new_deterministic(&request.resource), resource: request.resource.clone(), @@ -177,16 +188,17 @@ impl NamespaceLock { wait_start_time: None, }, Duration::ZERO, - )) + ); + Ok((resp, successful_clients)) } else { - // Phase 2b: Abort - insufficient quorum, rollback any successful acquisitions if !successful_clients.is_empty() { self.rollback_acquisitions(request, &successful_clients).await; } - Ok(LockResponse::failure( + let resp = LockResponse::failure( format!("Failed to acquire quorum: {}/{} required", successful_clients.len(), self.quorum), Duration::ZERO, - )) + ); + Ok((resp, Vec::new())) } } @@ -558,9 +570,11 @@ mod tests { let client2: Arc = Arc::new(LocalClient::new()); let clients = vec![client1, client2]; - let ns_lock = NamespaceLock::with_clients("test-namespace".to_string(), clients); + // LocalClient shares a global in-memory map. For exclusive locks, only one can acquire at a time. + // In real distributed setups the quorum should be tied to EC write quorum. Here we use quorum=1 for success. + let ns_lock = NamespaceLock::with_clients_and_quorum("test-namespace".to_string(), clients, 1); - let request = LockRequest::new("test-resource", LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(10)); + let request = LockRequest::new("test-resource", LockType::Shared, "test_owner").with_ttl(Duration::from_secs(2)); // This should succeed only if ALL clients can acquire the lock let response = ns_lock.acquire_lock(&request).await.unwrap(); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index b7ed6303..9fbe3a13 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -22,6 +22,7 @@ pub mod net; #[cfg(feature = "net")] pub use net::*; +#[cfg(all(feature = "net", feature = "io"))] pub mod retry; #[cfg(feature = "io")]