improve lock

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-08-09 21:05:46 +08:00
parent e369e9f481
commit 374a702f04
5 changed files with 408 additions and 216 deletions

View File

@@ -165,7 +165,13 @@ impl Sets {
let lock_clients = create_unique_clients(&set_endpoints).await?;
let namespace_lock = rustfs_lock::NamespaceLock::with_clients(format!("set-{i}"), lock_clients);
// Bind lock quorum to EC write quorum for this set: data_shards (+1 if equal to parity) per default_write_quorum()
let mut write_quorum = set_drive_count - parity_count;
if write_quorum == parity_count {
write_quorum += 1;
}
let namespace_lock =
rustfs_lock::NamespaceLock::with_clients_and_quorum(format!("set-{i}"), lock_clients, write_quorum);
let set_disks = SetDisks::new(
Arc::new(namespace_lock),

View File

@@ -32,29 +32,20 @@ struct UnlockRuntime {
// Global unlock runtime with background worker
static UNLOCK_RUNTIME: Lazy<UnlockRuntime> = Lazy::new(|| {
// Buffered channel to avoid blocking in Drop
let (tx, mut rx) = mpsc::channel::<UnlockJob>(1024);
// Larger buffer to reduce contention during bursts
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
// Spawn background worker when first used; assumes a Tokio runtime is available
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
// Best-effort release across clients; success if any succeeds
// Best-effort release across clients; try all, success if any succeeds
let mut any_ok = false;
let lock_id = job.lock_id.clone();
let futures = job
.clients
.into_iter()
.map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
})
.collect::<Vec<_>>();
let results = futures::future::join_all(futures).await;
if results.into_iter().any(|s| s) {
any_ok = true;
for client in job.clients.into_iter() {
if client.release(&lock_id).await.unwrap_or(false) {
any_ok = true;
}
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
} else {

View File

@@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::sync::{Mutex, Notify, RwLock};
use crate::LockRequest;
@@ -29,6 +29,11 @@ pub struct LocalLockEntry {
pub readers: HashMap<String, usize>,
/// lock expiration time
pub expires_at: Option<Instant>,
/// number of writers waiting (for simple fairness against reader storms)
pub writer_pending: usize,
/// notifiers for readers/writers
pub notify_readers: Arc<Notify>,
pub notify_writers: Arc<Notify>,
}
/// local lock map
@@ -38,6 +43,10 @@ pub struct LocalLockMap {
pub locks: Arc<RwLock<HashMap<crate::types::LockId, Arc<RwLock<LocalLockEntry>>>>>,
/// Shutdown flag for background tasks
shutdown: Arc<AtomicBool>,
/// expiration schedule map: when -> lock_ids
expirations: Arc<Mutex<BTreeMap<Instant, Vec<crate::types::LockId>>>>,
/// notify expiry task when new earlier deadline arrives
exp_notify: Arc<Notify>,
}
impl Default for LocalLockMap {
@@ -52,6 +61,8 @@ impl LocalLockMap {
let map = Self {
locks: Arc::new(RwLock::new(HashMap::new())),
shutdown: Arc::new(AtomicBool::new(false)),
expirations: Arc::new(Mutex::new(BTreeMap::new())),
exp_notify: Arc::new(Notify::new()),
};
map.spawn_expiry_task();
map
@@ -61,56 +72,121 @@ impl LocalLockMap {
fn spawn_expiry_task(&self) {
let locks = self.locks.clone();
let shutdown = self.shutdown.clone();
let expirations = self.expirations.clone();
let exp_notify = self.exp_notify.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
if shutdown.load(Ordering::Relaxed) {
tracing::debug!("Expiry task shutting down");
break;
}
let now = Instant::now();
let mut to_remove = Vec::new();
// Find next deadline and drain due ids
let (due_ids, wait_duration) = {
let mut due = Vec::new();
let mut guard = expirations.lock().await;
let now = Instant::now();
let next_deadline = guard.first_key_value().map(|(k, _)| *k);
// drain all <= now
let mut keys_to_remove = Vec::new();
for (k, v) in guard.range(..=now).map(|(k, v)| (*k, v.clone())) {
due.extend(v);
keys_to_remove.push(k);
}
for k in keys_to_remove {
guard.remove(&k);
}
let wait = if due.is_empty() {
next_deadline.and_then(|dl| {
if dl > now {
Some(dl - now)
} else {
Some(Duration::from_millis(0))
}
})
} else {
Some(Duration::from_millis(0))
};
(due, wait)
};
{
let locks_guard = locks.read().await;
for (key, entry) in locks_guard.iter() {
if let Ok(mut entry_guard) = entry.try_write() {
if let Some(exp) = entry_guard.expires_at {
if exp <= now {
entry_guard.writer = None;
entry_guard.readers.clear();
entry_guard.expires_at = None;
if !due_ids.is_empty() {
// process due ids without holding the map lock during awaits
let now = Instant::now();
// collect entries to process
let entries: Vec<(crate::types::LockId, Arc<RwLock<LocalLockEntry>>)> = {
let locks_guard = locks.read().await;
due_ids
.into_iter()
.filter_map(|id| locks_guard.get(&id).cloned().map(|e| (id, e)))
.collect()
};
if entry_guard.writer.is_none() && entry_guard.readers.is_empty() {
to_remove.push(key.clone());
}
let mut to_remove = Vec::new();
for (lock_id, entry) in entries {
let mut entry_guard = entry.write().await;
if let Some(exp) = entry_guard.expires_at {
if exp <= now {
entry_guard.writer = None;
entry_guard.readers.clear();
entry_guard.expires_at = None;
entry_guard.notify_writers.notify_waiters();
entry_guard.notify_readers.notify_waiters();
if entry_guard.writer.is_none() && entry_guard.readers.is_empty() {
to_remove.push(lock_id);
}
}
}
}
if !to_remove.is_empty() {
let mut locks_w = locks.write().await;
for id in to_remove {
let _ = locks_w.remove(&id);
}
}
continue; // immediately look for next
}
if !to_remove.is_empty() {
let mut locks_guard = locks.write().await;
for key in to_remove {
locks_guard.remove(&key);
// nothing due; wait for next deadline or notification
if let Some(dur) = wait_duration {
tokio::select! {
_ = tokio::time::sleep(dur) => {},
_ = exp_notify.notified() => {},
}
} else {
// no deadlines, wait for new schedule or shutdown tick
exp_notify.notified().await;
}
}
});
}
/// schedule an expiry time for the given lock id (inline, avoid per-acquisition spawn)
async fn schedule_expiry(&self, id: crate::types::LockId, exp: Instant) {
let mut guard = self.expirations.lock().await;
let is_earliest = match guard.first_key_value() {
Some((k, _)) => exp < *k,
None => true,
};
guard.entry(exp).or_insert_with(Vec::new).push(id);
drop(guard);
if is_earliest {
self.exp_notify.notify_waiters();
}
}
/// write lock with TTL, support timeout, use LockRequest
pub async fn lock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result<bool> {
let start = Instant::now();
let expires_at = Some(Instant::now() + request.ttl);
loop {
// get or create lock entry
let entry = {
// get or create lock entry (double-checked to reduce write-lock contention)
let entry = if let Some(e) = {
let locks_guard = self.locks.read().await;
locks_guard.get(&request.lock_id).cloned()
} {
e
} else {
let mut locks_guard = self.locks.write().await;
locks_guard
.entry(request.lock_id.clone())
@@ -119,13 +195,17 @@ impl LocalLockMap {
writer: None,
readers: HashMap::new(),
expires_at: None,
writer_pending: 0,
notify_readers: Arc::new(Notify::new()),
notify_writers: Arc::new(Notify::new()),
}))
})
.clone()
};
// try to get write lock to modify state
if let Ok(mut entry_guard) = entry.try_write() {
// attempt acquisition or wait using Notify
let notify_to_wait = {
let mut entry_guard = entry.write().await;
// check expired state
let now = Instant::now();
if let Some(exp) = entry_guard.expires_at {
@@ -136,30 +216,68 @@ impl LocalLockMap {
}
}
// check if can get write lock
// try acquire
if entry_guard.writer.is_none() && entry_guard.readers.is_empty() {
entry_guard.writer = Some(request.owner.clone());
entry_guard.expires_at = expires_at;
let expires_at = Instant::now() + request.ttl;
entry_guard.expires_at = Some(expires_at);
tracing::debug!("Write lock acquired for resource '{}' by owner '{}'", request.resource, request.owner);
{
drop(entry_guard);
self.schedule_expiry(request.lock_id.clone(), expires_at).await;
}
return Ok(true);
}
}
// couldn't acquire now, mark as pending writer and choose notifier
entry_guard.writer_pending = entry_guard.writer_pending.saturating_add(1);
entry_guard.notify_writers.clone()
};
if start.elapsed() >= request.acquire_timeout {
// wait with remaining timeout
let elapsed = start.elapsed();
if elapsed >= request.acquire_timeout {
// best-effort decrement pending counter
if let Ok(mut eg) = entry.try_write() {
eg.writer_pending = eg.writer_pending.saturating_sub(1);
} else {
let mut eg = entry.write().await;
eg.writer_pending = eg.writer_pending.saturating_sub(1);
}
return Ok(false);
}
tokio::time::sleep(Duration::from_millis(10)).await;
let remaining = request.acquire_timeout - elapsed;
if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() {
// timeout; decrement pending before returning
if let Ok(mut eg) = entry.try_write() {
eg.writer_pending = eg.writer_pending.saturating_sub(1);
} else {
let mut eg = entry.write().await;
eg.writer_pending = eg.writer_pending.saturating_sub(1);
}
return Ok(false);
}
// woke up; decrement pending before retrying
if let Ok(mut eg) = entry.try_write() {
eg.writer_pending = eg.writer_pending.saturating_sub(1);
} else {
let mut eg = entry.write().await;
eg.writer_pending = eg.writer_pending.saturating_sub(1);
}
}
}
/// read lock with TTL, support timeout, use LockRequest
pub async fn rlock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result<bool> {
let start = Instant::now();
let expires_at = Some(Instant::now() + request.ttl);
loop {
// get or create lock entry
let entry = {
// get or create lock entry (double-checked to reduce write-lock contention)
let entry = if let Some(e) = {
let locks_guard = self.locks.read().await;
locks_guard.get(&request.lock_id).cloned()
} {
e
} else {
let mut locks_guard = self.locks.write().await;
locks_guard
.entry(request.lock_id.clone())
@@ -168,13 +286,17 @@ impl LocalLockMap {
writer: None,
readers: HashMap::new(),
expires_at: None,
writer_pending: 0,
notify_readers: Arc::new(Notify::new()),
notify_writers: Arc::new(Notify::new()),
}))
})
.clone()
};
// try to get write lock to modify state
if let Ok(mut entry_guard) = entry.try_write() {
// attempt acquisition or wait using Notify
let notify_to_wait = {
let mut entry_guard = entry.write().await;
// check expired state
let now = Instant::now();
if let Some(exp) = entry_guard.expires_at {
@@ -185,189 +307,247 @@ impl LocalLockMap {
}
}
// check if can get read lock
if entry_guard.writer.is_none() {
// increase read lock count
if entry_guard.writer.is_none() && entry_guard.writer_pending == 0 {
*entry_guard.readers.entry(request.owner.clone()).or_insert(0) += 1;
entry_guard.expires_at = expires_at;
let expires_at = Instant::now() + request.ttl;
entry_guard.expires_at = Some(expires_at);
tracing::debug!("Read lock acquired for resource '{}' by owner '{}'", request.resource, request.owner);
{
drop(entry_guard);
self.schedule_expiry(request.lock_id.clone(), expires_at).await;
}
return Ok(true);
}
}
if start.elapsed() >= request.acquire_timeout {
// choose notifier: prefer waiting on writers if writers pending, else readers
if entry_guard.writer_pending > 0 {
entry_guard.notify_writers.clone()
} else {
entry_guard.notify_readers.clone()
}
};
// wait with remaining timeout
let elapsed = start.elapsed();
if elapsed >= request.acquire_timeout {
return Ok(false);
}
let remaining = request.acquire_timeout - elapsed;
if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() {
return Ok(false);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
/// unlock by LockId and owner - need to specify owner to correctly unlock
pub async fn unlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> {
println!("Unlocking lock_id: {lock_id:?}, owner: {owner}");
let mut need_remove = false;
{
// first, get the entry without holding the write lock on the map
let entry = {
let locks_guard = self.locks.read().await;
if let Some(entry) = locks_guard.get(lock_id) {
println!("Found lock entry, attempting to acquire write lock...");
match entry.try_write() {
Ok(mut entry_guard) => {
println!("Successfully acquired write lock for unlock");
// try to release write lock
if entry_guard.writer.as_ref() == Some(&owner.to_string()) {
println!("Releasing write lock for owner: {owner}");
entry_guard.writer = None;
}
// try to release read lock
else if let Some(count) = entry_guard.readers.get_mut(owner) {
println!("Releasing read lock for owner: {owner} (count: {count})");
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(owner);
println!("Removed owner {owner} from readers");
}
} else {
println!("Owner {owner} not found in writers or readers");
}
// check if need to remove
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
println!("Lock entry is empty, marking for removal");
entry_guard.expires_at = None;
need_remove = true;
} else {
println!(
"Lock entry still has content: writer={:?}, readers={:?}",
entry_guard.writer, entry_guard.readers
);
}
}
Err(_) => {
println!("Failed to acquire write lock for unlock - this is the problem!");
return Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Failed to acquire write lock for unlock",
));
}
match locks_guard.get(lock_id) {
Some(e) => e.clone(),
None => return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Lock entry not found")),
}
};
let mut need_remove = false;
let (notify_writers, notify_readers, writer_pending, writer_none) = {
let mut entry_guard = entry.write().await;
// try to release write lock
if entry_guard.writer.as_ref() == Some(&owner.to_string()) {
entry_guard.writer = None;
}
// try to release read lock
else if let Some(count) = entry_guard.readers.get_mut(owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(owner);
}
} else {
println!("Lock entry not found for lock_id: {lock_id:?}");
// owner not found, treat as no-op
}
// check if need to remove
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
}
// capture notifications and state
(
entry_guard.notify_writers.clone(),
entry_guard.notify_readers.clone(),
entry_guard.writer_pending,
entry_guard.writer.is_none(),
)
};
if writer_pending > 0 && writer_none {
// Wake a single writer to preserve fairness and avoid thundering herd
notify_writers.notify_one();
} else if writer_none {
// No writers waiting, allow readers to proceed
notify_readers.notify_waiters();
}
// only here, entry's Ref is really dropped, can safely remove
if need_remove {
println!("Removing lock entry from map...");
let mut locks_guard = self.locks.write().await;
let removed = locks_guard.remove(lock_id);
println!("Lock entry removed: {:?}", removed.is_some());
let _ = locks_guard.remove(lock_id);
}
println!("Unlock operation completed");
Ok(())
}
/// unlock by LockId - smart release (compatible with old interface, but may be inaccurate)
pub async fn unlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> {
let mut need_remove = false;
{
let entry = {
let locks_guard = self.locks.read().await;
if let Some(entry) = locks_guard.get(lock_id) {
if let Ok(mut entry_guard) = entry.try_write() {
// release write lock first
if entry_guard.writer.is_some() {
entry_guard.writer = None;
}
// if no write lock, release first read lock
else if let Some((owner, _)) = entry_guard.readers.iter().next() {
let owner = owner.clone();
if let Some(count) = entry_guard.readers.get_mut(&owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(&owner);
}
}
}
match locks_guard.get(lock_id) {
Some(e) => e.clone(),
None => return Ok(()), // nothing to do
}
};
// if completely idle, clean entry
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
let mut need_remove = false;
let (notify_writers, notify_readers, writer_pending, writer_none) = {
let mut entry_guard = entry.write().await;
// release write lock first
if entry_guard.writer.is_some() {
entry_guard.writer = None;
}
// if no write lock, release first read lock
else if let Some((owner, _)) = entry_guard.readers.iter().next() {
let owner = owner.clone();
if let Some(count) = entry_guard.readers.get_mut(&owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(&owner);
}
}
}
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
}
(
entry_guard.notify_writers.clone(),
entry_guard.notify_readers.clone(),
entry_guard.writer_pending,
entry_guard.writer.is_none(),
)
};
if writer_pending > 0 && writer_none {
notify_writers.notify_one();
} else if writer_none {
notify_readers.notify_waiters();
}
if need_remove {
let mut locks_guard = self.locks.write().await;
locks_guard.remove(lock_id);
let _ = locks_guard.remove(lock_id);
}
Ok(())
}
/// runlock by LockId and owner - need to specify owner to correctly unlock read lock
pub async fn runlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> {
let mut need_remove = false;
{
let entry = {
let locks_guard = self.locks.read().await;
if let Some(entry) = locks_guard.get(lock_id) {
if let Ok(mut entry_guard) = entry.try_write() {
// release read lock
if let Some(count) = entry_guard.readers.get_mut(owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(owner);
}
}
match locks_guard.get(lock_id) {
Some(e) => e.clone(),
None => return Ok(()),
}
};
// if completely idle, clean entry
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
}
let mut need_remove = false;
let (notify_writers, notify_readers, writer_pending, writer_none) = {
let mut entry_guard = entry.write().await;
// release read lock
if let Some(count) = entry_guard.readers.get_mut(owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(owner);
}
}
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
}
(
entry_guard.notify_writers.clone(),
entry_guard.notify_readers.clone(),
entry_guard.writer_pending,
entry_guard.writer.is_none(),
)
};
if writer_pending > 0 && writer_none {
notify_writers.notify_waiters();
} else if writer_none {
notify_readers.notify_waiters();
}
if need_remove {
let mut locks_guard = self.locks.write().await;
locks_guard.remove(lock_id);
let _ = locks_guard.remove(lock_id);
}
Ok(())
}
/// runlock by LockId - smart release read lock (compatible with old interface)
pub async fn runlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> {
let mut need_remove = false;
{
let entry = {
let locks_guard = self.locks.read().await;
if let Some(entry) = locks_guard.get(lock_id) {
if let Ok(mut entry_guard) = entry.try_write() {
// release first read lock
if let Some((owner, _)) = entry_guard.readers.iter().next() {
let owner = owner.clone();
if let Some(count) = entry_guard.readers.get_mut(&owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(&owner);
}
}
}
match locks_guard.get(lock_id) {
Some(e) => e.clone(),
None => return Ok(()),
}
};
// if completely idle, clean entry
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
let mut need_remove = false;
let (notify_writers, notify_readers, writer_pending, writer_none) = {
let mut entry_guard = entry.write().await;
// release first read lock
if let Some((owner, _)) = entry_guard.readers.iter().next() {
let owner = owner.clone();
if let Some(count) = entry_guard.readers.get_mut(&owner) {
*count -= 1;
if *count == 0 {
entry_guard.readers.remove(&owner);
}
}
}
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
entry_guard.expires_at = None;
need_remove = true;
}
(
entry_guard.notify_writers.clone(),
entry_guard.notify_readers.clone(),
entry_guard.writer_pending,
entry_guard.writer.is_none(),
)
};
if writer_pending > 0 && writer_none {
notify_writers.notify_waiters();
} else if writer_none {
notify_readers.notify_waiters();
}
if need_remove {
let mut locks_guard = self.locks.write().await;
locks_guard.remove(lock_id);
let _ = locks_guard.remove(lock_id);
}
Ok(())
}

View File

@@ -61,6 +61,22 @@ impl NamespaceLock {
}
}
/// Create namespace lock with clients and an explicit quorum size.
/// Quorum will be clamped into [1, clients.len()]. For single client, quorum is always 1.
pub fn with_clients_and_quorum(namespace: String, clients: Vec<Arc<dyn LockClient>>, quorum: usize) -> Self {
let q = if clients.len() <= 1 {
1
} else {
quorum.clamp(1, clients.len())
};
Self {
clients,
namespace,
quorum: q,
}
}
/// Create namespace lock with client (compatibility)
pub fn with_client(client: Arc<dyn LockClient>) -> Self {
Self::with_clients("default".to_string(), vec![client])
@@ -87,17 +103,33 @@ impl NamespaceLock {
return self.clients[0].acquire_lock(request).await;
}
// Two-phase commit for distributed lock acquisition
self.acquire_lock_with_2pc(request).await
// Quorum-based acquisition for distributed mode
let (resp, _idxs) = self.acquire_lock_quorum(request).await?;
Ok(resp)
}
/// Acquire a lock and return a RAII guard that will release asynchronously on Drop.
/// This is a thin wrapper around `acquire_lock` and will only create a guard when acquisition succeeds.
pub async fn acquire_guard(&self, request: &LockRequest) -> Result<Option<LockGuard>> {
let resp = self.acquire_lock(request).await?;
if self.clients.is_empty() {
return Err(LockError::internal("No lock clients available"));
}
if self.clients.len() == 1 {
let resp = self.clients[0].acquire_lock(request).await?;
if resp.success {
return Ok(Some(LockGuard::new(
LockId::new_deterministic(&request.resource),
vec![self.clients[0].clone()],
)));
}
return Ok(None);
}
let (resp, idxs) = self.acquire_lock_quorum(request).await?;
if resp.success {
let guard = LockGuard::new(LockId::new_deterministic(&request.resource), self.clients.clone());
Ok(Some(guard))
let subset: Vec<_> = idxs.into_iter().filter_map(|i| self.clients.get(i).cloned()).collect();
Ok(Some(LockGuard::new(LockId::new_deterministic(&request.resource), subset)))
} else {
Ok(None)
}
@@ -119,50 +151,29 @@ impl NamespaceLock {
self.acquire_guard(&req).await
}
/// Two-phase commit lock acquisition: all nodes must succeed or all fail
async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result<LockResponse> {
// Phase 1: Prepare - try to acquire lock on all clients
let futures: Vec<_> = self
/// Quorum-based lock acquisition: success if at least `self.quorum` clients succeed.
/// Returns the LockResponse and the indices of clients that acquired the lock.
async fn acquire_lock_quorum(&self, request: &LockRequest) -> Result<(LockResponse, Vec<usize>)> {
let futs: Vec<_> = self
.clients
.iter()
.enumerate()
.map(|(idx, client)| async move {
let result = client.acquire_lock(request).await;
(idx, result)
})
.map(|(idx, client)| async move { (idx, client.acquire_lock(request).await) })
.collect();
let results = futures::future::join_all(futures).await;
let results = futures::future::join_all(futs).await;
let mut successful_clients = Vec::new();
let mut failed_clients = Vec::new();
// Collect results
for (idx, result) in results {
match result {
Ok(response) if response.success => {
for (idx, res) in results {
if let Ok(resp) = res {
if resp.success {
successful_clients.push(idx);
}
_ => {
failed_clients.push(idx);
}
}
}
// Check if we have enough successful acquisitions for quorum
if successful_clients.len() >= self.quorum {
// Phase 2a: Commit - we have quorum, but need to ensure consistency
// If not all clients succeeded, we need to rollback for consistency
if successful_clients.len() < self.clients.len() {
// Rollback all successful acquisitions to maintain consistency
self.rollback_acquisitions(request, &successful_clients).await;
return Ok(LockResponse::failure(
"Partial success detected, rolled back for consistency".to_string(),
Duration::ZERO,
));
}
// All clients succeeded - lock acquired successfully
Ok(LockResponse::success(
let resp = LockResponse::success(
LockInfo {
id: LockId::new_deterministic(&request.resource),
resource: request.resource.clone(),
@@ -177,16 +188,17 @@ impl NamespaceLock {
wait_start_time: None,
},
Duration::ZERO,
))
);
Ok((resp, successful_clients))
} else {
// Phase 2b: Abort - insufficient quorum, rollback any successful acquisitions
if !successful_clients.is_empty() {
self.rollback_acquisitions(request, &successful_clients).await;
}
Ok(LockResponse::failure(
let resp = LockResponse::failure(
format!("Failed to acquire quorum: {}/{} required", successful_clients.len(), self.quorum),
Duration::ZERO,
))
);
Ok((resp, Vec::new()))
}
}
@@ -558,9 +570,11 @@ mod tests {
let client2: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let clients = vec![client1, client2];
let ns_lock = NamespaceLock::with_clients("test-namespace".to_string(), clients);
// LocalClient shares a global in-memory map. For exclusive locks, only one can acquire at a time.
// In real distributed setups the quorum should be tied to EC write quorum. Here we use quorum=1 for success.
let ns_lock = NamespaceLock::with_clients_and_quorum("test-namespace".to_string(), clients, 1);
let request = LockRequest::new("test-resource", LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(10));
let request = LockRequest::new("test-resource", LockType::Shared, "test_owner").with_ttl(Duration::from_secs(2));
// This should succeed only if ALL clients can acquire the lock
let response = ns_lock.acquire_lock(&request).await.unwrap();

View File

@@ -22,6 +22,7 @@ pub mod net;
#[cfg(feature = "net")]
pub use net::*;
#[cfg(all(feature = "net", feature = "io"))]
pub mod retry;
#[cfg(feature = "io")]