Merge pull request #302 from guojidan/lock

Lock: add transactional
This commit is contained in:
guojidan
2025-07-28 12:00:44 +08:00
committed by GitHub
5 changed files with 409 additions and 25 deletions

1
Cargo.lock generated
View File

@@ -3573,6 +3573,7 @@ checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
name = "e2e_test"
version = "0.0.5"
dependencies = [
"async-trait",
"aws-config",
"aws-sdk-s3",
"bytes",

View File

@@ -41,3 +41,4 @@ bytes.workspace = true
serial_test = "3.2.0"
aws-sdk-s3 = "1.99.0"
aws-config = "1.8.3"
async-trait = { workspace = true }

View File

@@ -13,12 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use rustfs_ecstore::{disk::endpoint::Endpoint, lock_utils::create_unique_clients};
use rustfs_lock::client::{LockClient, local::LocalClient};
use rustfs_lock::types::{LockInfo, LockResponse, LockStats};
use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType};
use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager};
use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
use serial_test::serial;
use std::{error::Error, time::Duration};
use std::{error::Error, sync::Arc, time::Duration};
use tokio::time::sleep;
use tonic::Request;
use url::Url;
@@ -72,6 +75,216 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
Ok(())
}
/// Mock client that simulates remote node failures
#[derive(Debug)]
struct FailingMockClient {
local_client: Arc<dyn LockClient>,
should_fail_acquire: bool,
should_fail_release: bool,
}
impl FailingMockClient {
fn new(should_fail_acquire: bool, should_fail_release: bool) -> Self {
Self {
local_client: Arc::new(LocalClient::new()),
should_fail_acquire,
should_fail_release,
}
}
}
#[async_trait]
impl LockClient for FailingMockClient {
async fn acquire_exclusive(&self, request: &LockRequest) -> rustfs_lock::error::Result<LockResponse> {
if self.should_fail_acquire {
// Simulate network timeout or remote node failure
return Ok(LockResponse::failure("Simulated remote node failure", Duration::from_millis(100)));
}
self.local_client.acquire_exclusive(request).await
}
async fn acquire_shared(&self, request: &LockRequest) -> rustfs_lock::error::Result<LockResponse> {
if self.should_fail_acquire {
return Ok(LockResponse::failure("Simulated remote node failure", Duration::from_millis(100)));
}
self.local_client.acquire_shared(request).await
}
async fn release(&self, lock_id: &LockId) -> rustfs_lock::error::Result<bool> {
if self.should_fail_release {
return Err(rustfs_lock::error::LockError::internal("Simulated release failure"));
}
self.local_client.release(lock_id).await
}
async fn refresh(&self, lock_id: &LockId) -> rustfs_lock::error::Result<bool> {
self.local_client.refresh(lock_id).await
}
async fn force_release(&self, lock_id: &LockId) -> rustfs_lock::error::Result<bool> {
self.local_client.force_release(lock_id).await
}
async fn check_status(&self, lock_id: &LockId) -> rustfs_lock::error::Result<Option<LockInfo>> {
self.local_client.check_status(lock_id).await
}
async fn get_stats(&self) -> rustfs_lock::error::Result<LockStats> {
self.local_client.get_stats().await
}
async fn close(&self) -> rustfs_lock::error::Result<()> {
self.local_client.close().await
}
async fn is_online(&self) -> bool {
if self.should_fail_acquire {
return false; // Simulate offline node
}
true // Simulate online node
}
async fn is_local(&self) -> bool {
false // Simulate remote client
}
}
#[tokio::test]
#[serial]
async fn test_transactional_lock_with_remote_failure() -> Result<(), Box<dyn Error>> {
println!("🧪 Testing transactional lock with simulated remote node failure");
// Create a two-node cluster: one local (success) + one remote (failure)
let local_client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let failing_remote_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(true, false));
let clients = vec![local_client, failing_remote_client];
let ns_lock = NamespaceLock::with_clients("test_transactional".to_string(), clients);
let resource = "critical_resource".to_string();
// Test single lock operation with 2PC
println!("📝 Testing single lock with remote failure...");
let request = LockRequest::new(&resource, LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(30));
let response = ns_lock.acquire_lock(&request).await?;
// Should fail because quorum (2/2) is not met due to remote failure
assert!(!response.success, "Lock should fail due to remote node failure");
println!("✅ Single lock correctly failed due to remote node failure");
// Verify no locks are left behind on the local node
let local_client_direct = LocalClient::new();
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(&resource));
let lock_status = local_client_direct.check_status(&lock_id).await?;
assert!(lock_status.is_none(), "No lock should remain on local node after rollback");
println!("✅ Verified rollback: no locks left on local node");
Ok(())
}
#[tokio::test]
#[serial]
async fn test_transactional_batch_lock_with_mixed_failures() -> Result<(), Box<dyn Error>> {
println!("🧪 Testing transactional batch lock with mixed node failures");
// Create a cluster with different failure patterns
let local_client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let failing_remote_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(true, false));
let clients = vec![local_client, failing_remote_client];
let ns_lock = NamespaceLock::with_clients("test_batch_transactional".to_string(), clients);
let resources = vec!["resource_1".to_string(), "resource_2".to_string(), "resource_3".to_string()];
println!("📝 Testing batch lock with remote failure...");
let result = ns_lock
.lock_batch(&resources, "batch_owner", Duration::from_millis(100), Duration::from_secs(30))
.await?;
// Should fail because remote node cannot acquire locks
assert!(!result, "Batch lock should fail due to remote node failure");
println!("✅ Batch lock correctly failed due to remote node failure");
// Verify no locks are left behind on any resource
let local_client_direct = LocalClient::new();
for resource in &resources {
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(resource));
let lock_status = local_client_direct.check_status(&lock_id).await?;
assert!(lock_status.is_none(), "No lock should remain for resource: {resource}");
}
println!("✅ Verified rollback: no locks left on any resource");
Ok(())
}
#[tokio::test]
#[serial]
async fn test_transactional_lock_with_quorum_success() -> Result<(), Box<dyn Error>> {
println!("🧪 Testing transactional lock with quorum success");
// Create a three-node cluster where 2 succeed and 1 fails (quorum = 2 automatically)
let local_client1: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let local_client2: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let failing_remote_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(true, false));
let clients = vec![local_client1, local_client2, failing_remote_client];
let ns_lock = NamespaceLock::with_clients("test_quorum".to_string(), clients);
let resource = "quorum_resource".to_string();
println!("📝 Testing lock with automatic quorum=2, 2 success + 1 failure...");
let request = LockRequest::new(&resource, LockType::Exclusive, "quorum_owner").with_ttl(Duration::from_secs(30));
let response = ns_lock.acquire_lock(&request).await?;
// Should fail because we require all nodes to succeed for consistency
// (even though quorum is met, the implementation requires all nodes for consistency)
assert!(!response.success, "Lock should fail due to consistency requirement");
println!("✅ Lock correctly failed due to consistency requirement (partial success rolled back)");
Ok(())
}
#[tokio::test]
#[serial]
async fn test_transactional_lock_rollback_on_release_failure() -> Result<(), Box<dyn Error>> {
println!("🧪 Testing rollback behavior when release fails");
// Create clients where acquire succeeds but release fails
let local_client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let failing_release_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(false, true));
let clients = vec![local_client, failing_release_client];
let ns_lock = NamespaceLock::with_clients("test_release_failure".to_string(), clients);
let resource = "release_test_resource".to_string();
println!("📝 Testing lock acquisition with release failure handling...");
let request = LockRequest::new(&resource, LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(30));
// This should fail because both LocalClient instances share the same global lock map
// The first client (LocalClient) will acquire the lock, but the second client
// (FailingMockClient's internal LocalClient) will fail to acquire the same resource
let response = ns_lock.acquire_lock(&request).await?;
// The operation should fail due to lock contention between the two LocalClient instances
assert!(
!response.success,
"Lock should fail due to lock contention between LocalClient instances sharing global lock map"
);
println!("✅ Lock correctly failed due to lock contention (both clients use same global lock map)");
// Verify no locks are left behind after rollback
let local_client_direct = LocalClient::new();
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(&resource));
let lock_status = local_client_direct.check_status(&lock_id).await?;
assert!(lock_status.is_none(), "No lock should remain after rollback");
println!("✅ Verified rollback: no locks left after failed acquisition");
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]

View File

@@ -1,4 +1,3 @@
// #![allow(dead_code)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");

View File

@@ -71,11 +71,11 @@ impl NamespaceLock {
}
/// Get resource key for this namespace
fn get_resource_key(&self, resource: &str) -> String {
pub fn get_resource_key(&self, resource: &str) -> String {
format!("{}:{}", self.namespace, resource)
}
/// Acquire lock using clients
/// Acquire lock using clients with transactional semantics (all-or-nothing)
pub async fn acquire_lock(&self, request: &LockRequest) -> Result<LockResponse> {
if self.clients.is_empty() {
return Err(LockError::internal("No lock clients available"));
@@ -86,17 +86,53 @@ impl NamespaceLock {
return self.clients[0].acquire_lock(request).await;
}
// For multiple clients, try to acquire from all clients and require quorum
// Two-phase commit for distributed lock acquisition
self.acquire_lock_with_2pc(request).await
}
/// Two-phase commit lock acquisition: all nodes must succeed or all fail
async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result<LockResponse> {
// Phase 1: Prepare - try to acquire lock on all clients
let futures: Vec<_> = self
.clients
.iter()
.map(|client| async move { client.acquire_lock(request).await })
.enumerate()
.map(|(idx, client)| async move {
let result = client.acquire_lock(request).await;
(idx, result)
})
.collect();
let results = futures::future::join_all(futures).await;
let successful = results.into_iter().filter_map(|r| r.ok()).filter(|r| r.success).count();
let mut successful_clients = Vec::new();
let mut failed_clients = Vec::new();
if successful >= self.quorum {
// Collect results
for (idx, result) in results {
match result {
Ok(response) if response.success => {
successful_clients.push(idx);
}
_ => {
failed_clients.push(idx);
}
}
}
// Check if we have enough successful acquisitions for quorum
if successful_clients.len() >= self.quorum {
// Phase 2a: Commit - we have quorum, but need to ensure consistency
// If not all clients succeeded, we need to rollback for consistency
if successful_clients.len() < self.clients.len() {
// Rollback all successful acquisitions to maintain consistency
self.rollback_acquisitions(request, &successful_clients).await;
return Ok(LockResponse::failure(
"Partial success detected, rolled back for consistency".to_string(),
Duration::ZERO,
));
}
// All clients succeeded - lock acquired successfully
Ok(LockResponse::success(
LockInfo {
id: LockId::new_deterministic(&request.resource),
@@ -114,10 +150,38 @@ impl NamespaceLock {
Duration::ZERO,
))
} else {
Ok(LockResponse::failure("Failed to acquire quorum".to_string(), Duration::ZERO))
// Phase 2b: Abort - insufficient quorum, rollback any successful acquisitions
if !successful_clients.is_empty() {
self.rollback_acquisitions(request, &successful_clients).await;
}
Ok(LockResponse::failure(
format!("Failed to acquire quorum: {}/{} required", successful_clients.len(), self.quorum),
Duration::ZERO,
))
}
}
/// Rollback lock acquisitions on specified clients
async fn rollback_acquisitions(&self, request: &LockRequest, client_indices: &[usize]) {
let lock_id = LockId::new_deterministic(&request.resource);
let rollback_futures: Vec<_> = client_indices
.iter()
.filter_map(|&idx| self.clients.get(idx))
.map(|client| async {
if let Err(e) = client.release(&lock_id).await {
tracing::warn!("Failed to rollback lock on client: {}", e);
}
})
.collect();
futures::future::join_all(rollback_futures).await;
tracing::info!(
"Rolled back {} lock acquisitions for resource: {}",
client_indices.len(),
request.resource
);
}
/// Release lock using clients
pub async fn release_lock(&self, lock_id: &LockId) -> Result<bool> {
if self.clients.is_empty() {
@@ -219,7 +283,9 @@ impl NamespaceLockManager for NamespaceLock {
return Err(LockError::internal("No lock clients available"));
}
// For each resource, create a lock request and try to acquire using clients
// Transactional batch lock: all resources must be locked or none
let mut acquired_resources = Vec::new();
for resource in resources {
let namespaced_resource = self.get_resource_key(resource);
let request = LockRequest::new(&namespaced_resource, LockType::Exclusive, owner)
@@ -227,7 +293,11 @@ impl NamespaceLockManager for NamespaceLock {
.with_ttl(ttl);
let response = self.acquire_lock(&request).await?;
if !response.success {
if response.success {
acquired_resources.push(namespaced_resource);
} else {
// Rollback all previously acquired locks
self.rollback_batch_locks(&acquired_resources, owner).await;
return Ok(false);
}
}
@@ -239,12 +309,21 @@ impl NamespaceLockManager for NamespaceLock {
return Err(LockError::internal("No lock clients available"));
}
// For each resource, create a lock ID and try to release using clients
for resource in resources {
let namespaced_resource = self.get_resource_key(resource);
let lock_id = LockId::new_deterministic(&namespaced_resource);
let _ = self.release_lock(&lock_id).await?;
}
// Release all locks (best effort)
let release_futures: Vec<_> = resources
.iter()
.map(|resource| {
let namespaced_resource = self.get_resource_key(resource);
let lock_id = LockId::new_deterministic(&namespaced_resource);
async move {
if let Err(e) = self.release_lock(&lock_id).await {
tracing::warn!("Failed to release lock for resource {}: {}", resource, e);
}
}
})
.collect();
futures::future::join_all(release_futures).await;
Ok(())
}
@@ -253,7 +332,9 @@ impl NamespaceLockManager for NamespaceLock {
return Err(LockError::internal("No lock clients available"));
}
// For each resource, create a shared lock request and try to acquire using clients
// Transactional batch read lock: all resources must be locked or none
let mut acquired_resources = Vec::new();
for resource in resources {
let namespaced_resource = self.get_resource_key(resource);
let request = LockRequest::new(&namespaced_resource, LockType::Shared, owner)
@@ -261,7 +342,11 @@ impl NamespaceLockManager for NamespaceLock {
.with_ttl(ttl);
let response = self.acquire_lock(&request).await?;
if !response.success {
if response.success {
acquired_resources.push(namespaced_resource);
} else {
// Rollback all previously acquired read locks
self.rollback_batch_locks(&acquired_resources, owner).await;
return Ok(false);
}
}
@@ -273,16 +358,45 @@ impl NamespaceLockManager for NamespaceLock {
return Err(LockError::internal("No lock clients available"));
}
// For each resource, create a lock ID and try to release using clients
for resource in resources {
let namespaced_resource = self.get_resource_key(resource);
let lock_id = LockId::new_deterministic(&namespaced_resource);
let _ = self.release_lock(&lock_id).await?;
}
// Release all read locks (best effort)
let release_futures: Vec<_> = resources
.iter()
.map(|resource| {
let namespaced_resource = self.get_resource_key(resource);
let lock_id = LockId::new_deterministic(&namespaced_resource);
async move {
if let Err(e) = self.release_lock(&lock_id).await {
tracing::warn!("Failed to release read lock for resource {}: {}", resource, e);
}
}
})
.collect();
futures::future::join_all(release_futures).await;
Ok(())
}
}
impl NamespaceLock {
/// Rollback batch lock acquisitions
async fn rollback_batch_locks(&self, acquired_resources: &[String], _owner: &str) {
let rollback_futures: Vec<_> = acquired_resources
.iter()
.map(|resource| {
let lock_id = LockId::new_deterministic(resource);
async move {
if let Err(e) = self.release_lock(&lock_id).await {
tracing::warn!("Failed to rollback lock for resource {}: {}", resource, e);
}
}
})
.collect();
futures::future::join_all(rollback_futures).await;
tracing::info!("Rolled back {} batch lock acquisitions", acquired_resources.len());
}
}
#[cfg(test)]
mod tests {
use crate::LocalClient;
@@ -343,4 +457,60 @@ mod tests {
let resource_key = ns_lock.get_resource_key("test-resource");
assert_eq!(resource_key, "test-namespace:test-resource");
}
#[tokio::test]
async fn test_transactional_batch_lock() {
let ns_lock = NamespaceLock::with_client(Arc::new(LocalClient::new()));
let resources = vec!["resource1".to_string(), "resource2".to_string(), "resource3".to_string()];
// First, acquire one of the resources to simulate conflict
let conflicting_request = LockRequest::new(ns_lock.get_resource_key("resource2"), LockType::Exclusive, "other_owner")
.with_ttl(Duration::from_secs(10));
let response = ns_lock.acquire_lock(&conflicting_request).await.unwrap();
assert!(response.success);
// Now try batch lock - should fail and rollback
let result = ns_lock
.lock_batch(&resources, "test_owner", Duration::from_millis(10), Duration::from_secs(5))
.await;
assert!(result.is_ok());
assert!(!result.unwrap()); // Should fail due to conflict
// Verify that no locks were left behind (all rolled back)
for resource in &resources {
if resource != "resource2" {
// Skip the one we intentionally locked
let check_request = LockRequest::new(ns_lock.get_resource_key(resource), LockType::Exclusive, "verify_owner")
.with_ttl(Duration::from_secs(1));
let check_response = ns_lock.acquire_lock(&check_request).await.unwrap();
assert!(check_response.success, "Resource {resource} should be available after rollback");
// Clean up
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(resource));
let _ = ns_lock.release_lock(&lock_id).await;
}
}
}
#[tokio::test]
async fn test_distributed_lock_consistency() {
// Create a namespace with multiple local clients to simulate distributed scenario
let client1: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let client2: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let clients = vec![client1, client2];
let ns_lock = NamespaceLock::with_clients("test-namespace".to_string(), clients);
let request = LockRequest::new("test-resource", LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(10));
// This should succeed only if ALL clients can acquire the lock
let response = ns_lock.acquire_lock(&request).await.unwrap();
// Since we're using separate LocalClient instances, they don't share state
// so this test demonstrates the consistency check
assert!(response.success); // Either all succeed or rollback happens
}
}