mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Test: Add e2e test case for lock transactional
Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3573,6 +3573,7 @@ checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
|
||||
name = "e2e_test"
|
||||
version = "0.0.5"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
"bytes",
|
||||
|
||||
@@ -41,3 +41,4 @@ bytes.workspace = true
|
||||
serial_test = "3.2.0"
|
||||
aws-sdk-s3 = "1.99.0"
|
||||
aws-config = "1.8.3"
|
||||
async-trait = "0.1"
|
||||
|
||||
@@ -13,12 +13,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rustfs_ecstore::{disk::endpoint::Endpoint, lock_utils::create_unique_clients};
|
||||
use rustfs_lock::client::{LockClient, local::LocalClient};
|
||||
use rustfs_lock::types::{LockInfo, LockResponse, LockStats};
|
||||
use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType};
|
||||
use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager};
|
||||
use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
|
||||
use serial_test::serial;
|
||||
use std::{error::Error, time::Duration};
|
||||
use std::{error::Error, sync::Arc, time::Duration};
|
||||
use tokio::time::sleep;
|
||||
use tonic::Request;
|
||||
use url::Url;
|
||||
@@ -72,6 +75,216 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mock client that simulates remote node failures
|
||||
#[derive(Debug)]
|
||||
struct FailingMockClient {
|
||||
local_client: Arc<dyn LockClient>,
|
||||
should_fail_acquire: bool,
|
||||
should_fail_release: bool,
|
||||
}
|
||||
|
||||
impl FailingMockClient {
|
||||
fn new(should_fail_acquire: bool, should_fail_release: bool) -> Self {
|
||||
Self {
|
||||
local_client: Arc::new(LocalClient::new()),
|
||||
should_fail_acquire,
|
||||
should_fail_release,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LockClient for FailingMockClient {
|
||||
async fn acquire_exclusive(&self, request: &LockRequest) -> rustfs_lock::error::Result<LockResponse> {
|
||||
if self.should_fail_acquire {
|
||||
// Simulate network timeout or remote node failure
|
||||
return Ok(LockResponse::failure("Simulated remote node failure", Duration::from_millis(100)));
|
||||
}
|
||||
self.local_client.acquire_exclusive(request).await
|
||||
}
|
||||
|
||||
async fn acquire_shared(&self, request: &LockRequest) -> rustfs_lock::error::Result<LockResponse> {
|
||||
if self.should_fail_acquire {
|
||||
return Ok(LockResponse::failure("Simulated remote node failure", Duration::from_millis(100)));
|
||||
}
|
||||
self.local_client.acquire_shared(request).await
|
||||
}
|
||||
|
||||
async fn release(&self, lock_id: &LockId) -> rustfs_lock::error::Result<bool> {
|
||||
if self.should_fail_release {
|
||||
return Err(rustfs_lock::error::LockError::internal("Simulated release failure"));
|
||||
}
|
||||
self.local_client.release(lock_id).await
|
||||
}
|
||||
|
||||
async fn refresh(&self, lock_id: &LockId) -> rustfs_lock::error::Result<bool> {
|
||||
self.local_client.refresh(lock_id).await
|
||||
}
|
||||
|
||||
async fn force_release(&self, lock_id: &LockId) -> rustfs_lock::error::Result<bool> {
|
||||
self.local_client.force_release(lock_id).await
|
||||
}
|
||||
|
||||
async fn check_status(&self, lock_id: &LockId) -> rustfs_lock::error::Result<Option<LockInfo>> {
|
||||
self.local_client.check_status(lock_id).await
|
||||
}
|
||||
|
||||
async fn get_stats(&self) -> rustfs_lock::error::Result<LockStats> {
|
||||
self.local_client.get_stats().await
|
||||
}
|
||||
|
||||
async fn close(&self) -> rustfs_lock::error::Result<()> {
|
||||
self.local_client.close().await
|
||||
}
|
||||
|
||||
async fn is_online(&self) -> bool {
|
||||
if self.should_fail_acquire {
|
||||
return false; // Simulate offline node
|
||||
}
|
||||
true // Simulate online node
|
||||
}
|
||||
|
||||
async fn is_local(&self) -> bool {
|
||||
false // Simulate remote client
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_transactional_lock_with_remote_failure() -> Result<(), Box<dyn Error>> {
|
||||
println!("🧪 Testing transactional lock with simulated remote node failure");
|
||||
|
||||
// Create a two-node cluster: one local (success) + one remote (failure)
|
||||
let local_client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let failing_remote_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(true, false));
|
||||
|
||||
let clients = vec![local_client, failing_remote_client];
|
||||
let ns_lock = NamespaceLock::with_clients("test_transactional".to_string(), clients);
|
||||
|
||||
let resource = "critical_resource".to_string();
|
||||
|
||||
// Test single lock operation with 2PC
|
||||
println!("📝 Testing single lock with remote failure...");
|
||||
let request = LockRequest::new(&resource, LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(30));
|
||||
|
||||
let response = ns_lock.acquire_lock(&request).await?;
|
||||
|
||||
// Should fail because quorum (2/2) is not met due to remote failure
|
||||
assert!(!response.success, "Lock should fail due to remote node failure");
|
||||
println!("✅ Single lock correctly failed due to remote node failure");
|
||||
|
||||
// Verify no locks are left behind on the local node
|
||||
let local_client_direct = LocalClient::new();
|
||||
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(&resource));
|
||||
let lock_status = local_client_direct.check_status(&lock_id).await?;
|
||||
assert!(lock_status.is_none(), "No lock should remain on local node after rollback");
|
||||
println!("✅ Verified rollback: no locks left on local node");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_transactional_batch_lock_with_mixed_failures() -> Result<(), Box<dyn Error>> {
|
||||
println!("🧪 Testing transactional batch lock with mixed node failures");
|
||||
|
||||
// Create a cluster with different failure patterns
|
||||
let local_client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let failing_remote_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(true, false));
|
||||
|
||||
let clients = vec![local_client, failing_remote_client];
|
||||
let ns_lock = NamespaceLock::with_clients("test_batch_transactional".to_string(), clients);
|
||||
|
||||
let resources = vec!["resource_1".to_string(), "resource_2".to_string(), "resource_3".to_string()];
|
||||
|
||||
println!("📝 Testing batch lock with remote failure...");
|
||||
let result = ns_lock
|
||||
.lock_batch(&resources, "batch_owner", Duration::from_millis(100), Duration::from_secs(30))
|
||||
.await?;
|
||||
|
||||
// Should fail because remote node cannot acquire locks
|
||||
assert!(!result, "Batch lock should fail due to remote node failure");
|
||||
println!("✅ Batch lock correctly failed due to remote node failure");
|
||||
|
||||
// Verify no locks are left behind on any resource
|
||||
let local_client_direct = LocalClient::new();
|
||||
for resource in &resources {
|
||||
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(resource));
|
||||
let lock_status = local_client_direct.check_status(&lock_id).await?;
|
||||
assert!(lock_status.is_none(), "No lock should remain for resource: {resource}");
|
||||
}
|
||||
println!("✅ Verified rollback: no locks left on any resource");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_transactional_lock_with_quorum_success() -> Result<(), Box<dyn Error>> {
|
||||
println!("🧪 Testing transactional lock with quorum success");
|
||||
|
||||
// Create a three-node cluster where 2 succeed and 1 fails (quorum = 2 automatically)
|
||||
let local_client1: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let local_client2: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let failing_remote_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(true, false));
|
||||
|
||||
let clients = vec![local_client1, local_client2, failing_remote_client];
|
||||
let ns_lock = NamespaceLock::with_clients("test_quorum".to_string(), clients);
|
||||
|
||||
let resource = "quorum_resource".to_string();
|
||||
|
||||
println!("📝 Testing lock with automatic quorum=2, 2 success + 1 failure...");
|
||||
let request = LockRequest::new(&resource, LockType::Exclusive, "quorum_owner").with_ttl(Duration::from_secs(30));
|
||||
|
||||
let response = ns_lock.acquire_lock(&request).await?;
|
||||
|
||||
// Should fail because we require all nodes to succeed for consistency
|
||||
// (even though quorum is met, the implementation requires all nodes for consistency)
|
||||
assert!(!response.success, "Lock should fail due to consistency requirement");
|
||||
println!("✅ Lock correctly failed due to consistency requirement (partial success rolled back)");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_transactional_lock_rollback_on_release_failure() -> Result<(), Box<dyn Error>> {
|
||||
println!("🧪 Testing rollback behavior when release fails");
|
||||
|
||||
// Create clients where acquire succeeds but release fails
|
||||
let local_client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let failing_release_client: Arc<dyn LockClient> = Arc::new(FailingMockClient::new(false, true));
|
||||
|
||||
let clients = vec![local_client, failing_release_client];
|
||||
let ns_lock = NamespaceLock::with_clients("test_release_failure".to_string(), clients);
|
||||
|
||||
let resource = "release_test_resource".to_string();
|
||||
|
||||
println!("📝 Testing lock acquisition with release failure handling...");
|
||||
let request = LockRequest::new(&resource, LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(30));
|
||||
|
||||
// This should fail because both LocalClient instances share the same global lock map
|
||||
// The first client (LocalClient) will acquire the lock, but the second client
|
||||
// (FailingMockClient's internal LocalClient) will fail to acquire the same resource
|
||||
let response = ns_lock.acquire_lock(&request).await?;
|
||||
|
||||
// The operation should fail due to lock contention between the two LocalClient instances
|
||||
assert!(
|
||||
!response.success,
|
||||
"Lock should fail due to lock contention between LocalClient instances sharing global lock map"
|
||||
);
|
||||
println!("✅ Lock correctly failed due to lock contention (both clients use same global lock map)");
|
||||
|
||||
// Verify no locks are left behind after rollback
|
||||
let local_client_direct = LocalClient::new();
|
||||
let lock_id = LockId::new_deterministic(&ns_lock.get_resource_key(&resource));
|
||||
let lock_status = local_client_direct.check_status(&lock_id).await?;
|
||||
assert!(lock_status.is_none(), "No lock should remain after rollback");
|
||||
println!("✅ Verified rollback: no locks left after failed acquisition");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
#[ignore = "requires running RustFS server at localhost:9000"]
|
||||
|
||||
Reference in New Issue
Block a user