From 01a2afca9a1845fc86160cb5e70122fbc3fa1937 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Mon, 28 Jul 2025 10:59:43 +0800 Subject: [PATCH] lock: Add transactional Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/lock/src/lib.rs | 1 - crates/lock/src/namespace.rs | 216 +++++++++++++++++++++++++++++++---- 2 files changed, 193 insertions(+), 24 deletions(-) diff --git a/crates/lock/src/lib.rs b/crates/lock/src/lib.rs index b2a614d8..d831bf89 100644 --- a/crates/lock/src/lib.rs +++ b/crates/lock/src/lib.rs @@ -1,4 +1,3 @@ -// #![allow(dead_code)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/crates/lock/src/namespace.rs b/crates/lock/src/namespace.rs index d426338f..e9ef18dd 100644 --- a/crates/lock/src/namespace.rs +++ b/crates/lock/src/namespace.rs @@ -71,11 +71,11 @@ impl NamespaceLock { } /// Get resource key for this namespace - fn get_resource_key(&self, resource: &str) -> String { + pub fn get_resource_key(&self, resource: &str) -> String { format!("{}:{}", self.namespace, resource) } - /// Acquire lock using clients + /// Acquire lock using clients with transactional semantics (all-or-nothing) pub async fn acquire_lock(&self, request: &LockRequest) -> Result { if self.clients.is_empty() { return Err(LockError::internal("No lock clients available")); @@ -86,17 +86,53 @@ impl NamespaceLock { return self.clients[0].acquire_lock(request).await; } - // For multiple clients, try to acquire from all clients and require quorum + // Two-phase commit for distributed lock acquisition + self.acquire_lock_with_2pc(request).await + } + + /// Two-phase commit lock acquisition: all nodes must succeed or all fail + async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result { + // Phase 1: Prepare - try to acquire lock on all clients let futures: Vec<_> = self .clients .iter() - .map(|client| async move { client.acquire_lock(request).await }) + .enumerate() + .map(|(idx, client)| async move { + let result = client.acquire_lock(request).await; + (idx, result) + }) .collect(); let results = futures::future::join_all(futures).await; - let successful = results.into_iter().filter_map(|r| r.ok()).filter(|r| r.success).count(); + let mut successful_clients = Vec::new(); + let mut failed_clients = Vec::new(); - if successful >= self.quorum { + // Collect results + for (idx, result) in results { + match result { + Ok(response) if response.success => { + successful_clients.push(idx); + } + _ => { + failed_clients.push(idx); + } + } + } + + // Check if we have enough successful acquisitions for quorum + if successful_clients.len() >= self.quorum { + // Phase 2a: Commit - we have quorum, but need to ensure consistency + // If not all clients succeeded, we need to rollback for consistency + if successful_clients.len() < self.clients.len() { + // Rollback all successful acquisitions to maintain consistency + self.rollback_acquisitions(request, &successful_clients).await; + return Ok(LockResponse::failure( + "Partial success detected, rolled back for consistency".to_string(), + Duration::ZERO, + )); + } + + // All clients succeeded - lock acquired successfully Ok(LockResponse::success( LockInfo { id: LockId::new_deterministic(&request.resource), @@ -114,10 +150,38 @@ impl NamespaceLock { Duration::ZERO, )) } else { - Ok(LockResponse::failure("Failed to acquire quorum".to_string(), Duration::ZERO)) + // Phase 2b: Abort - insufficient quorum, rollback any successful acquisitions + if !successful_clients.is_empty() { + self.rollback_acquisitions(request, &successful_clients).await; + } + Ok(LockResponse::failure( + format!("Failed to acquire quorum: {}/{} required", successful_clients.len(), self.quorum), + Duration::ZERO, + )) } } + /// Rollback lock acquisitions on specified clients + async fn rollback_acquisitions(&self, request: &LockRequest, client_indices: &[usize]) { + let lock_id = LockId::new_deterministic(&request.resource); + let rollback_futures: Vec<_> = client_indices + .iter() + .filter_map(|&idx| self.clients.get(idx)) + .map(|client| async { + if let Err(e) = client.release(&lock_id).await { + tracing::warn!("Failed to rollback lock on client: {}", e); + } + }) + .collect(); + + futures::future::join_all(rollback_futures).await; + tracing::info!( + "Rolled back {} lock acquisitions for resource: {}", + client_indices.len(), + request.resource + ); + } + /// Release lock using clients pub async fn release_lock(&self, lock_id: &LockId) -> Result { if self.clients.is_empty() { @@ -219,7 +283,9 @@ impl NamespaceLockManager for NamespaceLock { return Err(LockError::internal("No lock clients available")); } - // For each resource, create a lock request and try to acquire using clients + // Transactional batch lock: all resources must be locked or none + let mut acquired_resources = Vec::new(); + for resource in resources { let namespaced_resource = self.get_resource_key(resource); let request = LockRequest::new(&namespaced_resource, LockType::Exclusive, owner) @@ -227,7 +293,11 @@ impl NamespaceLockManager for NamespaceLock { .with_ttl(ttl); let response = self.acquire_lock(&request).await?; - if !response.success { + if response.success { + acquired_resources.push(namespaced_resource); + } else { + // Rollback all previously acquired locks + self.rollback_batch_locks(&acquired_resources, owner).await; return Ok(false); } } @@ -239,12 +309,21 @@ impl NamespaceLockManager for NamespaceLock { return Err(LockError::internal("No lock clients available")); } - // For each resource, create a lock ID and try to release using clients - for resource in resources { - let namespaced_resource = self.get_resource_key(resource); - let lock_id = LockId::new_deterministic(&namespaced_resource); - let _ = self.release_lock(&lock_id).await?; - } + // Release all locks (best effort) + let release_futures: Vec<_> = resources + .iter() + .map(|resource| { + let namespaced_resource = self.get_resource_key(resource); + let lock_id = LockId::new_deterministic(&namespaced_resource); + async move { + if let Err(e) = self.release_lock(&lock_id).await { + tracing::warn!("Failed to release lock for resource {}: {}", resource, e); + } + } + }) + .collect(); + + futures::future::join_all(release_futures).await; Ok(()) } @@ -253,7 +332,9 @@ impl NamespaceLockManager for NamespaceLock { return Err(LockError::internal("No lock clients available")); } - // For each resource, create a shared lock request and try to acquire using clients + // Transactional batch read lock: all resources must be locked or none + let mut acquired_resources = Vec::new(); + for resource in resources { let namespaced_resource = self.get_resource_key(resource); let request = LockRequest::new(&namespaced_resource, LockType::Shared, owner) @@ -261,7 +342,11 @@ impl NamespaceLockManager for NamespaceLock { .with_ttl(ttl); let response = self.acquire_lock(&request).await?; - if !response.success { + if response.success { + acquired_resources.push(namespaced_resource); + } else { + // Rollback all previously acquired read locks + self.rollback_batch_locks(&acquired_resources, owner).await; return Ok(false); } } @@ -273,16 +358,45 @@ impl NamespaceLockManager for NamespaceLock { return Err(LockError::internal("No lock clients available")); } - // For each resource, create a lock ID and try to release using clients - for resource in resources { - let namespaced_resource = self.get_resource_key(resource); - let lock_id = LockId::new_deterministic(&namespaced_resource); - let _ = self.release_lock(&lock_id).await?; - } + // Release all read locks (best effort) + let release_futures: Vec<_> = resources + .iter() + .map(|resource| { + let namespaced_resource = self.get_resource_key(resource); + let lock_id = LockId::new_deterministic(&namespaced_resource); + async move { + if let Err(e) = self.release_lock(&lock_id).await { + tracing::warn!("Failed to release read lock for resource {}: {}", resource, e); + } + } + }) + .collect(); + + futures::future::join_all(release_futures).await; Ok(()) } } +impl NamespaceLock { + /// Rollback batch lock acquisitions + async fn rollback_batch_locks(&self, acquired_resources: &[String], _owner: &str) { + let rollback_futures: Vec<_> = acquired_resources + .iter() + .map(|resource| { + let lock_id = LockId::new_deterministic(resource); + async move { + if let Err(e) = self.release_lock(&lock_id).await { + tracing::warn!("Failed to rollback lock for resource {}: {}", resource, e); + } + } + }) + .collect(); + + futures::future::join_all(rollback_futures).await; + tracing::info!("Rolled back {} batch lock acquisitions", acquired_resources.len()); + } +} + #[cfg(test)] mod tests { use crate::LocalClient; @@ -343,4 +457,60 @@ mod tests { let resource_key = ns_lock.get_resource_key("test-resource"); assert_eq!(resource_key, "test-namespace:test-resource"); } + + #[tokio::test] + async fn test_transactional_batch_lock() { + let ns_lock = NamespaceLock::with_client(Arc::new(LocalClient::new())); + let resources = vec!["resource1".to_string(), "resource2".to_string(), "resource3".to_string()]; + + // First, acquire one of the resources to simulate conflict + let conflicting_request = LockRequest::new(ns_lock.get_resource_key("resource2"), LockType::Exclusive, "other_owner") + .with_ttl(Duration::from_secs(10)); + + let response = ns_lock.acquire_lock(&conflicting_request).await.unwrap(); + assert!(response.success); + + // Now try batch lock - should fail and rollback + let result = ns_lock + .lock_batch(&resources, "test_owner", Duration::from_millis(10), Duration::from_secs(5)) + .await; + + assert!(result.is_ok()); + assert!(!result.unwrap()); // Should fail due to conflict + + // Verify that no locks were left behind (all rolled back) + for resource in &resources { + if resource != "resource2" { + // Skip the one we intentionally locked + let check_request = LockRequest::new(ns_lock.get_resource_key(resource), LockType::Exclusive, "verify_owner") + .with_ttl(Duration::from_secs(1)); + + let check_response = ns_lock.acquire_lock(&check_request).await.unwrap(); + assert!(check_response.success, "Resource {resource} should be available after rollback"); + + // Clean up + let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(resource)); + let _ = ns_lock.release_lock(&lock_id).await; + } + } + } + + #[tokio::test] + async fn test_distributed_lock_consistency() { + // Create a namespace with multiple local clients to simulate distributed scenario + let client1: Arc = Arc::new(LocalClient::new()); + let client2: Arc = Arc::new(LocalClient::new()); + let clients = vec![client1, client2]; + + let ns_lock = NamespaceLock::with_clients("test-namespace".to_string(), clients); + + let request = LockRequest::new("test-resource", LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(10)); + + // This should succeed only if ALL clients can acquire the lock + let response = ns_lock.acquire_lock(&request).await.unwrap(); + + // Since we're using separate LocalClient instances, they don't share state + // so this test demonstrates the consistency check + assert!(response.success || !response.success); // Either all succeed or rollback happens + } }