diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index 43ca4163..48e485b5 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -133,8 +133,14 @@ impl HealStorageAPI for ECStoreHealStorage { match self.ecstore.get_object_info(bucket, object, &Default::default()).await { Ok(info) => Ok(Some(info)), Err(e) => { - error!("Failed to get object meta: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) + // Map ObjectNotFound to None to align with Option return type + if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) { + debug!("Object meta not found: {}/{}", bucket, object); + Ok(None) + } else { + error!("Failed to get object meta: {}/{} - {}", bucket, object, e); + Err(Error::other(e)) + } } } } @@ -154,8 +160,13 @@ impl HealStorageAPI for ECStoreHealStorage { } }, Err(e) => { - error!("Failed to get object: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) + if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) { + debug!("Object data not found: {}/{}", bucket, object); + Ok(None) + } else { + error!("Failed to get object: {}/{} - {}", bucket, object, e); + Err(Error::other(e)) + } } } } diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 3aa4b9df..df22dddd 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -23,7 +23,7 @@ use ecstore::{ set_disk::SetDisks, }; use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend}; -use rustfs_filemeta::MetacacheReader; +use rustfs_filemeta::{MetacacheReader, VersionType}; use tokio::sync::{Mutex, RwLock}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -432,8 +432,27 @@ impl Scanner { } if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() { - // First try the standard integrity check + // First check whether the object still logically exists. + // If it's already deleted (e.g., non-versioned bucket), do not trigger heal. let object_opts = ecstore::store_api::ObjectOptions::default(); + match ecstore.get_object_info(bucket, object, &object_opts).await { + Ok(_) => { + // Object exists logically, continue with verification below + } + Err(e) => { + if matches!(e, ecstore::error::StorageError::ObjectNotFound(_, _)) { + debug!( + "Object {}/{} not found logically (likely deleted), skip integrity check & heal", + bucket, object + ); + return Ok(()); + } else { + debug!("get_object_info error for {}/{}: {}", bucket, object, e); + // Fall through to existing logic which will handle accordingly + } + } + } + // First try the standard integrity check let mut integrity_failed = false; debug!("Running standard object verification for {}/{}", bucket, object); @@ -1398,8 +1417,64 @@ impl Scanner { let empty_vec = Vec::new(); let locations = object_locations.get(&key).unwrap_or(&empty_vec); + // If any disk reports this object as a latest delete marker (tombstone), + // it's a legitimate deletion. Skip missing-object heal to avoid recreating + // deleted objects. Optional: a metadata heal could be submitted to fan-out + // the delete marker, but we keep it conservative here. + let mut has_latest_delete_marker = false; + for &disk_idx in locations { + if let Some(bucket_map) = all_disk_objects.get(disk_idx) { + if let Some(file_map) = bucket_map.get(bucket) { + if let Some(fm) = file_map.get(object_name) { + if let Some(first_ver) = fm.versions.first() { + if first_ver.header.version_type == VersionType::Delete { + has_latest_delete_marker = true; + break; + } + } + } + } + } + } + if has_latest_delete_marker { + debug!( + "Object {}/{} is a delete marker on some disk(s), skipping heal for missing parts", + bucket, object_name + ); + continue; + } + // Check if object is missing from some disks if locations.len() < disks.len() { + // Before submitting heal, confirm the object still exists logically. + let should_heal = if let Some(store) = rustfs_ecstore::new_object_layer_fn() { + match store.get_object_info(bucket, object_name, &Default::default()).await { + Ok(_) => true, // exists -> propagate by heal + Err(e) => { + if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) { + debug!( + "Object {}/{} not found logically (deleted), skip missing-disks heal", + bucket, object_name + ); + false + } else { + debug!( + "Object {}/{} get_object_info errored ({}), conservatively skip heal", + bucket, object_name, e + ); + false + } + } + } + } else { + // No store available; be conservative and skip to avoid recreating deletions + debug!("No ECStore available to confirm existence, skip heal for {}/{}", bucket, object_name); + false + }; + + if !should_heal { + continue; + } objects_needing_heal += 1; let missing_disks: Vec = (0..disks.len()).filter(|&i| !locations.contains(&i)).collect(); warn!("Object {}/{} missing from disks: {:?}", bucket, object_name, missing_disks); diff --git a/crates/e2e_test/src/reliant/lock.rs b/crates/e2e_test/src/reliant/lock.rs index 050d559e..66ab140c 100644 --- a/crates/e2e_test/src/reliant/lock.rs +++ b/crates/e2e_test/src/reliant/lock.rs @@ -38,6 +38,79 @@ fn get_cluster_endpoints() -> Vec { }] } +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_guard_drop_releases_exclusive_lock_local() -> Result<(), Box> { + // Single local client; no external server required + let client: Arc = Arc::new(LocalClient::new()); + let ns_lock = NamespaceLock::with_clients("e2e_guard_local".to_string(), vec![client]); + + // Acquire exclusive guard + let g1 = ns_lock + .lock_guard("guard_exclusive", "owner1", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + assert!(g1.is_some(), "first guard acquisition should succeed"); + + // While g1 is alive, second exclusive acquisition should fail + let g2 = ns_lock + .lock_guard("guard_exclusive", "owner2", Duration::from_millis(50), Duration::from_secs(5)) + .await?; + assert!(g2.is_none(), "second guard acquisition should fail while first is held"); + + // Drop first guard to trigger background release + drop(g1); + // Give the background unlock worker a short moment to process + sleep(Duration::from_millis(80)).await; + + // Now acquisition should succeed + let g3 = ns_lock + .lock_guard("guard_exclusive", "owner2", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + assert!(g3.is_some(), "acquisition should succeed after guard drop releases the lock"); + drop(g3); + + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_guard_shared_then_write_after_drop() -> Result<(), Box> { + // Two shared read guards should coexist; write should be blocked until they drop + let client: Arc = Arc::new(LocalClient::new()); + let ns_lock = NamespaceLock::with_clients("e2e_guard_rw".to_string(), vec![client]); + + // Acquire two read guards + let r1 = ns_lock + .rlock_guard("rw_resource", "reader1", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + let r2 = ns_lock + .rlock_guard("rw_resource", "reader2", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + assert!(r1.is_some() && r2.is_some(), "both read guards should be acquired"); + + // Attempt write while readers hold the lock should fail + let w_fail = ns_lock + .lock_guard("rw_resource", "writer", Duration::from_millis(50), Duration::from_secs(5)) + .await?; + assert!(w_fail.is_none(), "write should be blocked when read guards are active"); + + // Drop read guards to release + drop(r1); + drop(r2); + sleep(Duration::from_millis(80)).await; + + // Now write should succeed + let w_ok = ns_lock + .lock_guard("rw_resource", "writer", Duration::from_millis(150), Duration::from_secs(5)) + .await?; + assert!(w_ok.is_some(), "write should succeed after read guards are dropped"); + drop(w_ok); + + Ok(()) +} + #[tokio::test] #[serial] #[ignore = "requires running RustFS server at localhost:9000"] diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 2a4d8409..8f486fd4 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -3211,6 +3211,20 @@ impl ObjectIO for SetDisks { h: HeaderMap, opts: &ObjectOptions, ) -> Result { + // Acquire a shared read-lock early to protect read consistency + let mut _read_lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _read_lock_guard = guard_opt; + } + let (fi, files, disks) = self .get_object_fileinfo(bucket, object, opts, true) .await @@ -3256,7 +3270,10 @@ impl ObjectIO for SetDisks { let object = object.to_owned(); let set_index = self.set_index; let pool_index = self.pool_index; + // Move the read-lock guard into the task so it lives for the duration of the read + let _guard_to_hold = _read_lock_guard; // moved into closure below tokio::spawn(async move { + let _guard = _guard_to_hold; // keep guard alive until task ends if let Err(e) = Self::get_object_with_fileinfo( &bucket, &object, @@ -3284,16 +3301,18 @@ impl ObjectIO for SetDisks { async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result { let disks = self.disks.read().await; + // Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop. + let mut _object_lock_guard: Option = None; if !opts.no_lock { - let paths = vec![object.to_string()]; - let lock_acquired = self + let guard_opt = self .namespace_lock - .lock_batch(&paths, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) .await?; - if !lock_acquired { + if guard_opt.is_none() { return Err(Error::other("can not get lock. please retry".to_string())); } + _object_lock_guard = guard_opt; } let mut user_defined = opts.user_defined.clone(); @@ -3500,14 +3519,6 @@ impl ObjectIO for SetDisks { self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_dir).await?; - // Release lock if it was acquired - if !opts.no_lock { - let paths = vec![object.to_string()]; - if let Err(err) = self.namespace_lock.unlock_batch(&paths, &self.locker_owner).await { - error!("Failed to unlock object {}: {}", object, err); - } - } - for (i, op_disk) in online_disks.iter().enumerate() { if let Some(disk) = op_disk { if disk.is_online().await { @@ -3583,6 +3594,19 @@ impl StorageAPI for SetDisks { return Err(StorageError::NotImplemented); } + // Guard lock for source object metadata update + let mut _lock_guard: Option = None; + { + let guard_opt = self + .namespace_lock + .lock_guard(src_object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } + let disks = self.get_disks_internal().await; let (mut metas, errs) = { @@ -3676,6 +3700,18 @@ impl StorageAPI for SetDisks { } #[tracing::instrument(skip(self))] async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> { + // Guard lock for single object delete-version + let mut _lock_guard: Option = None; + { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let disks = self.get_disks(0, 0).await?; let write_quorum = disks.len() / 2 + 1; @@ -3732,6 +3768,23 @@ impl StorageAPI for SetDisks { del_errs.push(None) } + // Per-object guards to keep until function end + let mut _guards: Vec> = Vec::with_capacity(objects.len()); + // Acquire locks for all objects first; mark errors for failures + for (i, dobj) in objects.iter().enumerate() { + match self + .namespace_lock + .lock_guard(&dobj.object_name, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await? + { + Some(g) => _guards.push(Some(g)), + None => { + del_errs[i] = Some(Error::other("can not get lock. please retry")); + _guards.push(None); + } + } + } + // let mut del_fvers = Vec::with_capacity(objects.len()); let mut vers_map: HashMap<&String, FileInfoVersions> = HashMap::new(); @@ -3788,7 +3841,10 @@ impl StorageAPI for SetDisks { } } - vers_map.insert(&dobj.object_name, v); + // Only add to vers_map if we hold the lock + if _guards[i].is_some() { + vers_map.insert(&dobj.object_name, v); + } } let mut vers = Vec::with_capacity(vers_map.len()); @@ -3830,6 +3886,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { + // Guard lock for single object delete + let mut _lock_guard: Option = None; + if !opts.delete_prefix { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } if opts.delete_prefix { self.delete_prefix(bucket, object) .await @@ -3952,33 +4020,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { - // let mut _ns = None; - // if !opts.no_lock { - // let paths = vec![object.to_string()]; - // let ns_lock = new_nslock( - // Arc::clone(&self.ns_mutex), - // self.locker_owner.clone(), - // bucket.to_string(), - // paths, - // self.lockers.clone(), - // ) - // .await; - // if !ns_lock - // .0 - // .write() - // .await - // .get_lock(&Options { - // timeout: Duration::from_secs(5), - // retry_interval: Duration::from_secs(1), - // }) - // .await - // .map_err(|err| Error::other(err.to_string()))? - // { - // return Err(Error::other("can not get lock. please retry".to_string())); - // } - - // _ns = Some(ns_lock); - // } + // Acquire a shared read-lock to protect consistency during info fetch + let mut _read_lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _read_lock_guard = guard_opt; + } let (fi, _, _) = self .get_object_fileinfo(bucket, object, opts, false) @@ -4010,6 +4063,19 @@ impl StorageAPI for SetDisks { async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { // TODO: nslock + // Guard lock for metadata update + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } + let disks = self.get_disks_internal().await; let (metas, errs) = { @@ -4100,12 +4166,18 @@ impl StorageAPI for SetDisks { } }; - /*if !opts.no_lock { - let lk = self.new_ns_lock(bucket, object); - let lkctx = lk.get_lock(globalDeleteOperationTimeout)?; - //ctx = lkctx.Context() - //defer lk.Unlock(lkctx) - }*/ + // Acquire write-lock early; hold for the whole transition operation scope + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let (mut fi, meta_arr, online_disks) = self.get_object_fileinfo(bucket, object, opts, true).await?; /*if err != nil { @@ -4223,6 +4295,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(level = "debug", skip(self))] async fn restore_transitioned_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { + // Acquire write-lock early for the restore operation + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let set_restore_header_fn = async move |oi: &mut ObjectInfo, rerr: Option| -> Result<()> { if rerr.is_none() { return Ok(()); @@ -4296,6 +4380,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(level = "debug", skip(self))] async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result { + // Acquire write-lock for tag update (metadata write) + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let (mut fi, _, disks) = self.get_object_fileinfo(bucket, object, opts, false).await?; fi.metadata.insert(AMZ_OBJECT_TAGGING.to_owned(), tags.to_owned()); diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 40e692ae..7ec6e01d 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -165,7 +165,13 @@ impl Sets { let lock_clients = create_unique_clients(&set_endpoints).await?; - let namespace_lock = rustfs_lock::NamespaceLock::with_clients(format!("set-{i}"), lock_clients); + // Bind lock quorum to EC write quorum for this set: data_shards (+1 if equal to parity) per default_write_quorum() + let mut write_quorum = set_drive_count - parity_count; + if write_quorum == parity_count { + write_quorum += 1; + } + let namespace_lock = + rustfs_lock::NamespaceLock::with_clients_and_quorum(format!("set-{i}"), lock_clients, write_quorum); let set_disks = SetDisks::new( Arc::new(namespace_lock), diff --git a/crates/lock/src/guard.rs b/crates/lock/src/guard.rs new file mode 100644 index 00000000..f680840f --- /dev/null +++ b/crates/lock/src/guard.rs @@ -0,0 +1,120 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use once_cell::sync::Lazy; +use tokio::sync::mpsc; + +use crate::{client::LockClient, types::LockId}; + +#[derive(Debug, Clone)] +struct UnlockJob { + lock_id: LockId, + clients: Vec>, // cloned Arcs; cheap and shares state +} + +#[derive(Debug)] +struct UnlockRuntime { + tx: mpsc::Sender, +} + +// Global unlock runtime with background worker +static UNLOCK_RUNTIME: Lazy = Lazy::new(|| { + // Larger buffer to reduce contention during bursts + let (tx, mut rx) = mpsc::channel::(8192); + + // Spawn background worker when first used; assumes a Tokio runtime is available + tokio::spawn(async move { + while let Some(job) = rx.recv().await { + // Best-effort release across clients; try all, success if any succeeds + let mut any_ok = false; + let lock_id = job.lock_id.clone(); + for client in job.clients.into_iter() { + if client.release(&lock_id).await.unwrap_or(false) { + any_ok = true; + } + } + if !any_ok { + tracing::warn!("LockGuard background release failed for {}", lock_id); + } else { + tracing::debug!("LockGuard background released {}", lock_id); + } + } + }); + + UnlockRuntime { tx } +}); + +/// A RAII guard that releases the lock asynchronously when dropped. +#[derive(Debug)] +pub struct LockGuard { + lock_id: LockId, + clients: Vec>, + /// If true, Drop will not try to release (used if user manually released). + disarmed: bool, +} + +impl LockGuard { + pub(crate) fn new(lock_id: LockId, clients: Vec>) -> Self { + Self { + lock_id, + clients, + disarmed: false, + } + } + + /// Get the lock id associated with this guard + pub fn lock_id(&self) -> &LockId { + &self.lock_id + } + + /// Manually disarm the guard so dropping it won't release the lock. + /// Call this if you explicitly released the lock elsewhere. + pub fn disarm(&mut self) { + self.disarmed = true; + } +} + +impl Drop for LockGuard { + fn drop(&mut self) { + if self.disarmed { + return; + } + + let job = UnlockJob { + lock_id: self.lock_id.clone(), + clients: self.clients.clone(), + }; + + // Try a non-blocking send to avoid panics in Drop + if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) { + // Channel full or closed; best-effort fallback: spawn a detached task + let lock_id = self.lock_id.clone(); + let clients = self.clients.clone(); + tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id); + + // If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts. + let handle = tokio::spawn(async move { + let futures_iter = clients.into_iter().map(|client| { + let id = lock_id.clone(); + async move { client.release(&id).await.unwrap_or(false) } + }); + let _ = futures::future::join_all(futures_iter).await; + }); + // Explicitly drop the JoinHandle to acknowledge detaching the task. + std::mem::drop(handle); + } + } +} diff --git a/crates/lock/src/lib.rs b/crates/lock/src/lib.rs index d831bf89..d047668b 100644 --- a/crates/lock/src/lib.rs +++ b/crates/lock/src/lib.rs @@ -27,6 +27,7 @@ pub mod local; // Core Modules pub mod error; +pub mod guard; pub mod types; // ============================================================================ @@ -39,6 +40,7 @@ pub use crate::{ client::{LockClient, local::LocalClient, remote::RemoteClient}, // Error types error::{LockError, Result}, + guard::LockGuard, local::LocalLockMap, // Main components namespace::{NamespaceLock, NamespaceLockManager}, diff --git a/crates/lock/src/local.rs b/crates/lock/src/local.rs index d0b7239a..380fb194 100644 --- a/crates/lock/src/local.rs +++ b/crates/lock/src/local.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, Notify, RwLock}; use crate::LockRequest; @@ -29,6 +29,11 @@ pub struct LocalLockEntry { pub readers: HashMap, /// lock expiration time pub expires_at: Option, + /// number of writers waiting (for simple fairness against reader storms) + pub writer_pending: usize, + /// notifiers for readers/writers + pub notify_readers: Arc, + pub notify_writers: Arc, } /// local lock map @@ -38,6 +43,10 @@ pub struct LocalLockMap { pub locks: Arc>>>>, /// Shutdown flag for background tasks shutdown: Arc, + /// expiration schedule map: when -> lock_ids + expirations: Arc>>>, + /// notify expiry task when new earlier deadline arrives + exp_notify: Arc, } impl Default for LocalLockMap { @@ -52,6 +61,8 @@ impl LocalLockMap { let map = Self { locks: Arc::new(RwLock::new(HashMap::new())), shutdown: Arc::new(AtomicBool::new(false)), + expirations: Arc::new(Mutex::new(BTreeMap::new())), + exp_notify: Arc::new(Notify::new()), }; map.spawn_expiry_task(); map @@ -61,56 +72,115 @@ impl LocalLockMap { fn spawn_expiry_task(&self) { let locks = self.locks.clone(); let shutdown = self.shutdown.clone(); + let expirations = self.expirations.clone(); + let exp_notify = self.exp_notify.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); loop { - interval.tick().await; - if shutdown.load(Ordering::Relaxed) { tracing::debug!("Expiry task shutting down"); break; } - let now = Instant::now(); - let mut to_remove = Vec::new(); + // Find next deadline and drain due ids + let (due_ids, wait_duration) = { + let mut due = Vec::new(); + let mut guard = expirations.lock().await; + let now = Instant::now(); + let next_deadline = guard.first_key_value().map(|(k, _)| *k); + // drain all <= now + let mut keys_to_remove = Vec::new(); + for (k, v) in guard.range(..=now).map(|(k, v)| (*k, v.clone())) { + due.extend(v); + keys_to_remove.push(k); + } + for k in keys_to_remove { + guard.remove(&k); + } + let wait = if due.is_empty() { + next_deadline.map(|dl| if dl > now { dl - now } else { Duration::from_millis(0) }) + } else { + Some(Duration::from_millis(0)) + }; + (due, wait) + }; - { - let locks_guard = locks.read().await; - for (key, entry) in locks_guard.iter() { - if let Ok(mut entry_guard) = entry.try_write() { - if let Some(exp) = entry_guard.expires_at { - if exp <= now { - entry_guard.writer = None; - entry_guard.readers.clear(); - entry_guard.expires_at = None; + if !due_ids.is_empty() { + // process due ids without holding the map lock during awaits + let now = Instant::now(); + // collect entries to process + let entries: Vec<(crate::types::LockId, Arc>)> = { + let locks_guard = locks.read().await; + due_ids + .into_iter() + .filter_map(|id| locks_guard.get(&id).cloned().map(|e| (id, e))) + .collect() + }; - if entry_guard.writer.is_none() && entry_guard.readers.is_empty() { - to_remove.push(key.clone()); - } + let mut to_remove = Vec::new(); + for (lock_id, entry) in entries { + let mut entry_guard = entry.write().await; + if let Some(exp) = entry_guard.expires_at { + if exp <= now { + entry_guard.writer = None; + entry_guard.readers.clear(); + entry_guard.expires_at = None; + entry_guard.notify_writers.notify_waiters(); + entry_guard.notify_readers.notify_waiters(); + if entry_guard.writer.is_none() && entry_guard.readers.is_empty() { + to_remove.push(lock_id); } } } } + if !to_remove.is_empty() { + let mut locks_w = locks.write().await; + for id in to_remove { + let _ = locks_w.remove(&id); + } + } + continue; // immediately look for next } - if !to_remove.is_empty() { - let mut locks_guard = locks.write().await; - for key in to_remove { - locks_guard.remove(&key); + // nothing due; wait for next deadline or notification + if let Some(dur) = wait_duration { + tokio::select! { + _ = tokio::time::sleep(dur) => {}, + _ = exp_notify.notified() => {}, } + } else { + // no deadlines, wait for new schedule or shutdown tick + exp_notify.notified().await; } } }); } + /// schedule an expiry time for the given lock id (inline, avoid per-acquisition spawn) + async fn schedule_expiry(&self, id: crate::types::LockId, exp: Instant) { + let mut guard = self.expirations.lock().await; + let is_earliest = match guard.first_key_value() { + Some((k, _)) => exp < *k, + None => true, + }; + guard.entry(exp).or_insert_with(Vec::new).push(id); + drop(guard); + if is_earliest { + self.exp_notify.notify_waiters(); + } + } + /// write lock with TTL, support timeout, use LockRequest pub async fn lock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result { let start = Instant::now(); - let expires_at = Some(Instant::now() + request.ttl); loop { - // get or create lock entry - let entry = { + // get or create lock entry (double-checked to reduce write-lock contention) + let entry = if let Some(e) = { + let locks_guard = self.locks.read().await; + locks_guard.get(&request.lock_id).cloned() + } { + e + } else { let mut locks_guard = self.locks.write().await; locks_guard .entry(request.lock_id.clone()) @@ -119,13 +189,17 @@ impl LocalLockMap { writer: None, readers: HashMap::new(), expires_at: None, + writer_pending: 0, + notify_readers: Arc::new(Notify::new()), + notify_writers: Arc::new(Notify::new()), })) }) .clone() }; - // try to get write lock to modify state - if let Ok(mut entry_guard) = entry.try_write() { + // attempt acquisition or wait using Notify + let notify_to_wait = { + let mut entry_guard = entry.write().await; // check expired state let now = Instant::now(); if let Some(exp) = entry_guard.expires_at { @@ -136,30 +210,68 @@ impl LocalLockMap { } } - // check if can get write lock + // try acquire if entry_guard.writer.is_none() && entry_guard.readers.is_empty() { entry_guard.writer = Some(request.owner.clone()); - entry_guard.expires_at = expires_at; + let expires_at = Instant::now() + request.ttl; + entry_guard.expires_at = Some(expires_at); tracing::debug!("Write lock acquired for resource '{}' by owner '{}'", request.resource, request.owner); + { + drop(entry_guard); + self.schedule_expiry(request.lock_id.clone(), expires_at).await; + } return Ok(true); } - } + // couldn't acquire now, mark as pending writer and choose notifier + entry_guard.writer_pending = entry_guard.writer_pending.saturating_add(1); + entry_guard.notify_writers.clone() + }; - if start.elapsed() >= request.acquire_timeout { + // wait with remaining timeout + let elapsed = start.elapsed(); + if elapsed >= request.acquire_timeout { + // best-effort decrement pending counter + if let Ok(mut eg) = entry.try_write() { + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } else { + let mut eg = entry.write().await; + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } return Ok(false); } - tokio::time::sleep(Duration::from_millis(10)).await; + let remaining = request.acquire_timeout - elapsed; + if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() { + // timeout; decrement pending before returning + if let Ok(mut eg) = entry.try_write() { + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } else { + let mut eg = entry.write().await; + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } + return Ok(false); + } + // woke up; decrement pending before retrying + if let Ok(mut eg) = entry.try_write() { + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } else { + let mut eg = entry.write().await; + eg.writer_pending = eg.writer_pending.saturating_sub(1); + } } } /// read lock with TTL, support timeout, use LockRequest pub async fn rlock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result { let start = Instant::now(); - let expires_at = Some(Instant::now() + request.ttl); loop { - // get or create lock entry - let entry = { + // get or create lock entry (double-checked to reduce write-lock contention) + let entry = if let Some(e) = { + let locks_guard = self.locks.read().await; + locks_guard.get(&request.lock_id).cloned() + } { + e + } else { let mut locks_guard = self.locks.write().await; locks_guard .entry(request.lock_id.clone()) @@ -168,13 +280,17 @@ impl LocalLockMap { writer: None, readers: HashMap::new(), expires_at: None, + writer_pending: 0, + notify_readers: Arc::new(Notify::new()), + notify_writers: Arc::new(Notify::new()), })) }) .clone() }; - // try to get write lock to modify state - if let Ok(mut entry_guard) = entry.try_write() { + // attempt acquisition or wait using Notify + let notify_to_wait = { + let mut entry_guard = entry.write().await; // check expired state let now = Instant::now(); if let Some(exp) = entry_guard.expires_at { @@ -185,189 +301,247 @@ impl LocalLockMap { } } - // check if can get read lock - if entry_guard.writer.is_none() { - // increase read lock count + if entry_guard.writer.is_none() && entry_guard.writer_pending == 0 { *entry_guard.readers.entry(request.owner.clone()).or_insert(0) += 1; - entry_guard.expires_at = expires_at; + let expires_at = Instant::now() + request.ttl; + entry_guard.expires_at = Some(expires_at); tracing::debug!("Read lock acquired for resource '{}' by owner '{}'", request.resource, request.owner); + { + drop(entry_guard); + self.schedule_expiry(request.lock_id.clone(), expires_at).await; + } return Ok(true); } - } - if start.elapsed() >= request.acquire_timeout { + // choose notifier: prefer waiting on writers if writers pending, else readers + if entry_guard.writer_pending > 0 { + entry_guard.notify_writers.clone() + } else { + entry_guard.notify_readers.clone() + } + }; + + // wait with remaining timeout + let elapsed = start.elapsed(); + if elapsed >= request.acquire_timeout { + return Ok(false); + } + let remaining = request.acquire_timeout - elapsed; + if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() { return Ok(false); } - tokio::time::sleep(Duration::from_millis(10)).await; } } /// unlock by LockId and owner - need to specify owner to correctly unlock pub async fn unlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> { - println!("Unlocking lock_id: {lock_id:?}, owner: {owner}"); - let mut need_remove = false; - - { + // first, get the entry without holding the write lock on the map + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - println!("Found lock entry, attempting to acquire write lock..."); - match entry.try_write() { - Ok(mut entry_guard) => { - println!("Successfully acquired write lock for unlock"); - // try to release write lock - if entry_guard.writer.as_ref() == Some(&owner.to_string()) { - println!("Releasing write lock for owner: {owner}"); - entry_guard.writer = None; - } - // try to release read lock - else if let Some(count) = entry_guard.readers.get_mut(owner) { - println!("Releasing read lock for owner: {owner} (count: {count})"); - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(owner); - println!("Removed owner {owner} from readers"); - } - } else { - println!("Owner {owner} not found in writers or readers"); - } - // check if need to remove - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - println!("Lock entry is empty, marking for removal"); - entry_guard.expires_at = None; - need_remove = true; - } else { - println!( - "Lock entry still has content: writer={:?}, readers={:?}", - entry_guard.writer, entry_guard.readers - ); - } - } - Err(_) => { - println!("Failed to acquire write lock for unlock - this is the problem!"); - return Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "Failed to acquire write lock for unlock", - )); - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Lock entry not found")), + } + }; + + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // try to release write lock + if entry_guard.writer.as_ref() == Some(&owner.to_string()) { + entry_guard.writer = None; + } + // try to release read lock + else if let Some(count) = entry_guard.readers.get_mut(owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(owner); } } else { - println!("Lock entry not found for lock_id: {lock_id:?}"); + // owner not found, treat as no-op } + + // check if need to remove + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + // capture notifications and state + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + // Wake a single writer to preserve fairness and avoid thundering herd + notify_writers.notify_one(); + } else if writer_none { + // No writers waiting, allow readers to proceed + notify_readers.notify_waiters(); } - // only here, entry's Ref is really dropped, can safely remove if need_remove { - println!("Removing lock entry from map..."); let mut locks_guard = self.locks.write().await; - let removed = locks_guard.remove(lock_id); - println!("Lock entry removed: {:?}", removed.is_some()); + let _ = locks_guard.remove(lock_id); } - println!("Unlock operation completed"); Ok(()) } /// unlock by LockId - smart release (compatible with old interface, but may be inaccurate) pub async fn unlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> { - let mut need_remove = false; - - { + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - if let Ok(mut entry_guard) = entry.try_write() { - // release write lock first - if entry_guard.writer.is_some() { - entry_guard.writer = None; - } - // if no write lock, release first read lock - else if let Some((owner, _)) = entry_guard.readers.iter().next() { - let owner = owner.clone(); - if let Some(count) = entry_guard.readers.get_mut(&owner) { - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(&owner); - } - } - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Ok(()), // nothing to do + } + }; - // if completely idle, clean entry - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - entry_guard.expires_at = None; - need_remove = true; + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // release write lock first + if entry_guard.writer.is_some() { + entry_guard.writer = None; + } + // if no write lock, release first read lock + else if let Some((owner, _)) = entry_guard.readers.iter().next() { + let owner = owner.clone(); + if let Some(count) = entry_guard.readers.get_mut(&owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(&owner); } } } + + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + notify_writers.notify_one(); + } else if writer_none { + notify_readers.notify_waiters(); } if need_remove { let mut locks_guard = self.locks.write().await; - locks_guard.remove(lock_id); + let _ = locks_guard.remove(lock_id); } Ok(()) } /// runlock by LockId and owner - need to specify owner to correctly unlock read lock pub async fn runlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> { - let mut need_remove = false; - - { + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - if let Ok(mut entry_guard) = entry.try_write() { - // release read lock - if let Some(count) = entry_guard.readers.get_mut(owner) { - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(owner); - } - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Ok(()), + } + }; - // if completely idle, clean entry - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - entry_guard.expires_at = None; - need_remove = true; - } + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // release read lock + if let Some(count) = entry_guard.readers.get_mut(owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(owner); } } + + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + notify_writers.notify_waiters(); + } else if writer_none { + notify_readers.notify_waiters(); } if need_remove { let mut locks_guard = self.locks.write().await; - locks_guard.remove(lock_id); + let _ = locks_guard.remove(lock_id); } Ok(()) } /// runlock by LockId - smart release read lock (compatible with old interface) pub async fn runlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> { - let mut need_remove = false; - - { + let entry = { let locks_guard = self.locks.read().await; - if let Some(entry) = locks_guard.get(lock_id) { - if let Ok(mut entry_guard) = entry.try_write() { - // release first read lock - if let Some((owner, _)) = entry_guard.readers.iter().next() { - let owner = owner.clone(); - if let Some(count) = entry_guard.readers.get_mut(&owner) { - *count -= 1; - if *count == 0 { - entry_guard.readers.remove(&owner); - } - } - } + match locks_guard.get(lock_id) { + Some(e) => e.clone(), + None => return Ok(()), + } + }; - // if completely idle, clean entry - if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { - entry_guard.expires_at = None; - need_remove = true; + let mut need_remove = false; + let (notify_writers, notify_readers, writer_pending, writer_none) = { + let mut entry_guard = entry.write().await; + + // release first read lock + if let Some((owner, _)) = entry_guard.readers.iter().next() { + let owner = owner.clone(); + if let Some(count) = entry_guard.readers.get_mut(&owner) { + *count -= 1; + if *count == 0 { + entry_guard.readers.remove(&owner); } } } + + if entry_guard.readers.is_empty() && entry_guard.writer.is_none() { + entry_guard.expires_at = None; + need_remove = true; + } + + ( + entry_guard.notify_writers.clone(), + entry_guard.notify_readers.clone(), + entry_guard.writer_pending, + entry_guard.writer.is_none(), + ) + }; + + if writer_pending > 0 && writer_none { + notify_writers.notify_waiters(); + } else if writer_none { + notify_readers.notify_waiters(); } if need_remove { let mut locks_guard = self.locks.write().await; - locks_guard.remove(lock_id); + let _ = locks_guard.remove(lock_id); } Ok(()) } diff --git a/crates/lock/src/namespace.rs b/crates/lock/src/namespace.rs index 71135e65..19cdffa8 100644 --- a/crates/lock/src/namespace.rs +++ b/crates/lock/src/namespace.rs @@ -19,6 +19,7 @@ use std::time::Duration; use crate::{ client::LockClient, error::{LockError, Result}, + guard::LockGuard, types::{LockId, LockInfo, LockRequest, LockResponse, LockStatus, LockType}, }; @@ -60,6 +61,22 @@ impl NamespaceLock { } } + /// Create namespace lock with clients and an explicit quorum size. + /// Quorum will be clamped into [1, clients.len()]. For single client, quorum is always 1. + pub fn with_clients_and_quorum(namespace: String, clients: Vec>, quorum: usize) -> Self { + let q = if clients.len() <= 1 { + 1 + } else { + quorum.clamp(1, clients.len()) + }; + + Self { + clients, + namespace, + quorum: q, + } + } + /// Create namespace lock with client (compatibility) pub fn with_client(client: Arc) -> Self { Self::with_clients("default".to_string(), vec![client]) @@ -86,54 +103,77 @@ impl NamespaceLock { return self.clients[0].acquire_lock(request).await; } - // Two-phase commit for distributed lock acquisition - self.acquire_lock_with_2pc(request).await + // Quorum-based acquisition for distributed mode + let (resp, _idxs) = self.acquire_lock_quorum(request).await?; + Ok(resp) } - /// Two-phase commit lock acquisition: all nodes must succeed or all fail - async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result { - // Phase 1: Prepare - try to acquire lock on all clients - let futures: Vec<_> = self + /// Acquire a lock and return a RAII guard that will release asynchronously on Drop. + /// This is a thin wrapper around `acquire_lock` and will only create a guard when acquisition succeeds. + pub async fn acquire_guard(&self, request: &LockRequest) -> Result> { + if self.clients.is_empty() { + return Err(LockError::internal("No lock clients available")); + } + + if self.clients.len() == 1 { + let resp = self.clients[0].acquire_lock(request).await?; + if resp.success { + return Ok(Some(LockGuard::new( + LockId::new_deterministic(&request.resource), + vec![self.clients[0].clone()], + ))); + } + return Ok(None); + } + + let (resp, idxs) = self.acquire_lock_quorum(request).await?; + if resp.success { + let subset: Vec<_> = idxs.into_iter().filter_map(|i| self.clients.get(i).cloned()).collect(); + Ok(Some(LockGuard::new(LockId::new_deterministic(&request.resource), subset))) + } else { + Ok(None) + } + } + + /// Convenience: acquire exclusive lock as a guard + pub async fn lock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result> { + let req = LockRequest::new(self.get_resource_key(resource), LockType::Exclusive, owner) + .with_acquire_timeout(timeout) + .with_ttl(ttl); + self.acquire_guard(&req).await + } + + /// Convenience: acquire shared lock as a guard + pub async fn rlock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result> { + let req = LockRequest::new(self.get_resource_key(resource), LockType::Shared, owner) + .with_acquire_timeout(timeout) + .with_ttl(ttl); + self.acquire_guard(&req).await + } + + /// Quorum-based lock acquisition: success if at least `self.quorum` clients succeed. + /// Returns the LockResponse and the indices of clients that acquired the lock. + async fn acquire_lock_quorum(&self, request: &LockRequest) -> Result<(LockResponse, Vec)> { + let futs: Vec<_> = self .clients .iter() .enumerate() - .map(|(idx, client)| async move { - let result = client.acquire_lock(request).await; - (idx, result) - }) + .map(|(idx, client)| async move { (idx, client.acquire_lock(request).await) }) .collect(); - let results = futures::future::join_all(futures).await; + let results = futures::future::join_all(futs).await; let mut successful_clients = Vec::new(); - let mut failed_clients = Vec::new(); - // Collect results - for (idx, result) in results { - match result { - Ok(response) if response.success => { + for (idx, res) in results { + if let Ok(resp) = res { + if resp.success { successful_clients.push(idx); } - _ => { - failed_clients.push(idx); - } } } - // Check if we have enough successful acquisitions for quorum if successful_clients.len() >= self.quorum { - // Phase 2a: Commit - we have quorum, but need to ensure consistency - // If not all clients succeeded, we need to rollback for consistency - if successful_clients.len() < self.clients.len() { - // Rollback all successful acquisitions to maintain consistency - self.rollback_acquisitions(request, &successful_clients).await; - return Ok(LockResponse::failure( - "Partial success detected, rolled back for consistency".to_string(), - Duration::ZERO, - )); - } - - // All clients succeeded - lock acquired successfully - Ok(LockResponse::success( + let resp = LockResponse::success( LockInfo { id: LockId::new_deterministic(&request.resource), resource: request.resource.clone(), @@ -148,16 +188,17 @@ impl NamespaceLock { wait_start_time: None, }, Duration::ZERO, - )) + ); + Ok((resp, successful_clients)) } else { - // Phase 2b: Abort - insufficient quorum, rollback any successful acquisitions if !successful_clients.is_empty() { self.rollback_acquisitions(request, &successful_clients).await; } - Ok(LockResponse::failure( + let resp = LockResponse::failure( format!("Failed to acquire quorum: {}/{} required", successful_clients.len(), self.quorum), Duration::ZERO, - )) + ); + Ok((resp, Vec::new())) } } @@ -420,6 +461,33 @@ mod tests { assert!(result.is_ok()); } + #[tokio::test] + async fn test_guard_acquire_and_drop_release() { + let ns_lock = NamespaceLock::with_client(Arc::new(LocalClient::new())); + + // Acquire guard + let guard = ns_lock + .lock_guard("guard-resource", "owner", Duration::from_millis(100), Duration::from_secs(5)) + .await + .unwrap(); + assert!(guard.is_some()); + let lock_id = guard.as_ref().unwrap().lock_id().clone(); + + // Drop guard to trigger background release + drop(guard); + + // Give background worker a moment to process + tokio::time::sleep(Duration::from_millis(50)).await; + + // Re-acquire should succeed (previous lock released) + let req = LockRequest::new(&lock_id.resource, LockType::Exclusive, "owner").with_ttl(Duration::from_secs(2)); + let resp = ns_lock.acquire_lock(&req).await.unwrap(); + assert!(resp.success); + + // Cleanup + let _ = ns_lock.release_lock(&LockId::new_deterministic(&lock_id.resource)).await; + } + #[tokio::test] async fn test_connection_health() { let local_lock = NamespaceLock::new("test-namespace".to_string()); @@ -502,9 +570,11 @@ mod tests { let client2: Arc = Arc::new(LocalClient::new()); let clients = vec![client1, client2]; - let ns_lock = NamespaceLock::with_clients("test-namespace".to_string(), clients); + // LocalClient shares a global in-memory map. For exclusive locks, only one can acquire at a time. + // In real distributed setups the quorum should be tied to EC write quorum. Here we use quorum=1 for success. + let ns_lock = NamespaceLock::with_clients_and_quorum("test-namespace".to_string(), clients, 1); - let request = LockRequest::new("test-resource", LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(10)); + let request = LockRequest::new("test-resource", LockType::Shared, "test_owner").with_ttl(Duration::from_secs(2)); // This should succeed only if ALL clients can acquire the lock let response = ns_lock.acquire_lock(&request).await.unwrap(); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index b7ed6303..9fbe3a13 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -22,6 +22,7 @@ pub mod net; #[cfg(feature = "net")] pub use net::*; +#[cfg(all(feature = "net", feature = "io"))] pub mod retry; #[cfg(feature = "io")]