From be66cf8bd374a26e2568212acf2f3e844f30ba38 Mon Sep 17 00:00:00 2001 From: guojidan <63799833+guojidan@users.noreply.github.com> Date: Sun, 28 Sep 2025 08:57:56 +0800 Subject: [PATCH] Improve lock (#596) * improve lock Signed-off-by: Mu junxiang <1948535941@qq.com> * feat(tests): add wait_for_object_absence helper and improve lifecycle test reliability Signed-off-by: Mu junxiang <1948535941@qq.com> * chore: remove dirty docs Signed-off-by: Mu junxiang <1948535941@qq.com> --------- Signed-off-by: Mu junxiang <1948535941@qq.com> --- .../ahm/tests/lifecycle_integration_test.rs | 187 +++++++++++++----- crates/e2e_test/src/reliant/lock.rs | 34 +++- crates/ecstore/src/lib.rs | 1 - crates/ecstore/src/lock_utils.rs | 136 ------------- crates/ecstore/src/set_disk.rs | 54 +++-- crates/ecstore/src/sets.rs | 4 - crates/lock/src/fast_lock/disabled_manager.rs | 8 +- crates/lock/src/fast_lock/manager.rs | 61 +++--- crates/lock/src/fast_lock/types.rs | 3 + 9 files changed, 235 insertions(+), 253 deletions(-) delete mode 100644 crates/ecstore/src/lock_utils.rs diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index 67a17bfd..ab6cf76a 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -302,6 +302,22 @@ async fn object_is_transitioned(ecstore: &Arc, bucket: &str, object: &s } } +async fn wait_for_object_absence(ecstore: &Arc, bucket: &str, object: &str, timeout: Duration) -> bool { + let deadline = tokio::time::Instant::now() + timeout; + + loop { + if !object_exists(ecstore, bucket, object).await { + return true; + } + + if tokio::time::Instant::now() >= deadline { + return false; + } + + tokio::time::sleep(Duration::from_millis(200)).await; + } +} + mod serial_tests { use super::*; @@ -311,25 +327,26 @@ mod serial_tests { let (_disk_paths, ecstore) = setup_test_env().await; // Create test bucket and object - let bucket_name = "test-lifecycle-expiry-basic-bucket"; + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("test-lc-expiry-basic-{}", &suffix[..8]); let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" let test_data = b"Hello, this is test data for lifecycle expiry!"; - create_test_lock_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; + upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await; // Verify object exists initially - assert!(object_exists(&ecstore, bucket_name, object_name).await); + assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await); println!("✅ Object exists before lifecycle processing"); // Set lifecycle configuration with very short expiry (0 days = immediate expiry) - set_bucket_lifecycle(bucket_name) + set_bucket_lifecycle(bucket_name.as_str()) .await .expect("Failed to set lifecycle configuration"); - println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + println!("✅ Lifecycle configuration set for bucket: {}", bucket_name); // Verify lifecycle configuration was set - match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await { Ok(bucket_meta) => { assert!(bucket_meta.lifecycle_config.is_some()); println!("✅ Bucket metadata retrieved successfully"); @@ -360,20 +377,60 @@ mod serial_tests { scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); println!("✅ Manual scan cycle completed"); - // Wait a bit more for background workers to process expiry tasks - tokio::time::sleep(Duration::from_secs(5)).await; + let mut expired = false; + for attempt in 0..3 { + if attempt > 0 { + scanner.scan_cycle().await.expect("Failed to trigger scan cycle on retry"); + } + expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(5)).await; + if expired { + break; + } + } - // Check if object has been expired (delete_marker) - let check_result = object_exists(&ecstore, bucket_name, object_name).await; - println!("Object is_delete_marker after lifecycle processing: {check_result}"); + println!("Object is_delete_marker after lifecycle processing: {}", !expired); - if check_result { - println!("❌ Object was not deleted by lifecycle processing"); + if !expired { + let pending = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::GLOBAL_ExpiryState + .read() + .await + .pending_tasks() + .await; + println!("Pending expiry tasks: {pending}"); + + if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await { + if let Ok(object_info) = ecstore + .get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle( + &lc_config, + None, + None, + &object_info, + ) + .await; + + rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects( + ecstore.clone(), + &object_info, + &event, + &rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner, + ) + .await; + + expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await; + } + } + + if !expired { + println!("❌ Object was not deleted by lifecycle processing"); + } } else { println!("✅ Object was successfully deleted by lifecycle processing"); // Let's try to get object info to see its details match ecstore - .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) .await { Ok(obj_info) => { @@ -388,7 +445,7 @@ mod serial_tests { } } - assert!(!check_result); + assert!(expired); println!("✅ Object successfully expired"); // Stop scanner @@ -404,25 +461,26 @@ mod serial_tests { let (_disk_paths, ecstore) = setup_test_env().await; // Create test bucket and object - let bucket_name = "test-lifecycle-expiry-deletemarker-bucket"; + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("test-lc-expiry-marker-{}", &suffix[..8]); let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" let test_data = b"Hello, this is test data for lifecycle expiry!"; - create_test_lock_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; + upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await; // Verify object exists initially - assert!(object_exists(&ecstore, bucket_name, object_name).await); + assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await); println!("✅ Object exists before lifecycle processing"); // Set lifecycle configuration with very short expiry (0 days = immediate expiry) - set_bucket_lifecycle_deletemarker(bucket_name) + set_bucket_lifecycle_deletemarker(bucket_name.as_str()) .await .expect("Failed to set lifecycle configuration"); - println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + println!("✅ Lifecycle configuration set for bucket: {}", bucket_name); // Verify lifecycle configuration was set - match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await { Ok(bucket_meta) => { assert!(bucket_meta.lifecycle_config.is_some()); println!("✅ Bucket metadata retrieved successfully"); @@ -453,36 +511,64 @@ mod serial_tests { scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); println!("✅ Manual scan cycle completed"); - // Wait a bit more for background workers to process expiry tasks - tokio::time::sleep(Duration::from_secs(5)).await; + let mut deleted = false; + for attempt in 0..3 { + if attempt > 0 { + scanner.scan_cycle().await.expect("Failed to trigger scan cycle on retry"); + } + deleted = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(5)).await; + if deleted { + break; + } + } - // Check if object has been expired (deleted) - //let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await; - let check_result = object_exists(&ecstore, bucket_name, object_name).await; - println!("Object exists after lifecycle processing: {check_result}"); + println!("Object exists after lifecycle processing: {}", !deleted); - if check_result { - println!("❌ Object was not deleted by lifecycle processing"); - // Let's try to get object info to see its details - match ecstore - .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + if !deleted { + let pending = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::GLOBAL_ExpiryState + .read() .await - { - Ok(obj_info) => { - println!( - "Object info: name={}, size={}, mod_time={:?}", - obj_info.name, obj_info.size, obj_info.mod_time - ); - } - Err(e) => { - println!("Error getting object info: {e:?}"); + .pending_tasks() + .await; + println!("Pending expiry tasks: {pending}"); + + if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await { + if let Ok(obj_info) = ecstore + .get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle( + &lc_config, None, None, &obj_info, + ) + .await; + + rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects( + ecstore.clone(), + &obj_info, + &event, + &rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner, + ) + .await; + + deleted = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await; + + if !deleted { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + } } } + + if !deleted { + println!("❌ Object was not deleted by lifecycle processing"); + } } else { println!("✅ Object was successfully deleted by lifecycle processing"); } - assert!(!check_result); + assert!(deleted); println!("✅ Object successfully expired"); // Stop scanner @@ -500,15 +586,16 @@ mod serial_tests { //create_test_tier().await; // Create test bucket and object - let bucket_name = "test-lifecycle-transition-basic-bucket"; + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("test-lc-transition-{}", &suffix[..8]); let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" let test_data = b"Hello, this is test data for lifecycle expiry!"; - create_test_lock_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; + upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await; // Verify object exists initially - assert!(object_exists(&ecstore, bucket_name, object_name).await); + assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await); println!("✅ Object exists before lifecycle processing"); // Set lifecycle configuration with very short expiry (0 days = immediate expiry) @@ -554,14 +641,14 @@ mod serial_tests { // Check if object has been expired (deleted) //let check_result = object_is_transitioned(&ecstore, bucket_name, object_name).await; - let check_result = object_exists(&ecstore, bucket_name, object_name).await; + let check_result = object_exists(&ecstore, bucket_name.as_str(), object_name).await; println!("Object exists after lifecycle processing: {check_result}"); if check_result { println!("✅ Object was not deleted by lifecycle processing"); // Let's try to get object info to see its details match ecstore - .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) .await { Ok(obj_info) => { diff --git a/crates/e2e_test/src/reliant/lock.rs b/crates/e2e_test/src/reliant/lock.rs index 66ab140c..126ab7b0 100644 --- a/crates/e2e_test/src/reliant/lock.rs +++ b/crates/e2e_test/src/reliant/lock.rs @@ -14,14 +14,14 @@ // limitations under the License. use async_trait::async_trait; -use rustfs_ecstore::{disk::endpoint::Endpoint, lock_utils::create_unique_clients}; -use rustfs_lock::client::{LockClient, local::LocalClient}; +use rustfs_ecstore::disk::endpoint::Endpoint; +use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient}; use rustfs_lock::types::{LockInfo, LockResponse, LockStats}; use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType}; use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager}; use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest}; use serial_test::serial; -use std::{error::Error, sync::Arc, time::Duration}; +use std::{collections::HashMap, error::Error, sync::Arc, time::Duration}; use tokio::time::sleep; use tonic::Request; use url::Url; @@ -38,6 +38,34 @@ fn get_cluster_endpoints() -> Vec { }] } +async fn create_unique_clients(endpoints: &[Endpoint]) -> Result>, Box> { + let mut unique_endpoints: HashMap = HashMap::new(); + + for endpoint in endpoints { + if endpoint.is_local { + unique_endpoints.insert("local".to_string(), endpoint); + } else { + let host_port = format!( + "{}:{}", + endpoint.url.host_str().unwrap_or("localhost"), + endpoint.url.port().unwrap_or(9000) + ); + unique_endpoints.insert(host_port, endpoint); + } + } + + let mut clients = Vec::new(); + for (_key, endpoint) in unique_endpoints { + if endpoint.is_local { + clients.push(Arc::new(LocalClient::new()) as Arc); + } else { + clients.push(Arc::new(RemoteClient::new(endpoint.url.to_string())) as Arc); + } + } + + Ok(clients) +} + #[tokio::test] #[serial] #[ignore = "requires running RustFS server at localhost:9000"] diff --git a/crates/ecstore/src/lib.rs b/crates/ecstore/src/lib.rs index 43558d39..b28ce0cb 100644 --- a/crates/ecstore/src/lib.rs +++ b/crates/ecstore/src/lib.rs @@ -31,7 +31,6 @@ pub mod erasure_coding; pub mod error; pub mod file_cache; pub mod global; -pub mod lock_utils; pub mod metrics_realtime; pub mod notification_sys; pub mod pools; diff --git a/crates/ecstore/src/lock_utils.rs b/crates/ecstore/src/lock_utils.rs deleted file mode 100644 index 98b9f320..00000000 --- a/crates/ecstore/src/lock_utils.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::disk::endpoint::Endpoint; -use crate::error::Result; -use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient}; -use std::collections::HashMap; -use std::sync::Arc; - -/// Create unique lock clients from endpoints -/// This function creates one client per unique host:port combination -/// to avoid duplicate connections to the same server -pub async fn create_unique_clients(endpoints: &[Endpoint]) -> Result>> { - let mut unique_endpoints: HashMap = HashMap::new(); - - // Collect unique endpoints based on host:port - for endpoint in endpoints { - if endpoint.is_local { - // For local endpoints, use "local" as the key - unique_endpoints.insert("local".to_string(), endpoint); - } else { - // For remote endpoints, use host:port as the key - let host_port = format!( - "{}:{}", - endpoint.url.host_str().unwrap_or("localhost"), - endpoint.url.port().unwrap_or(9000) - ); - unique_endpoints.insert(host_port, endpoint); - } - } - - let mut clients = Vec::new(); - - // Create clients for unique endpoints - for (_key, endpoint) in unique_endpoints { - if endpoint.is_local { - // For local endpoints, create a local lock client - let local_client = LocalClient::new(); - clients.push(Arc::new(local_client) as Arc); - } else { - // For remote endpoints, create a remote lock client - let remote_client = RemoteClient::new(endpoint.url.to_string()); - clients.push(Arc::new(remote_client) as Arc); - } - } - - Ok(clients) -} - -#[cfg(test)] -mod tests { - use super::*; - use url::Url; - - #[tokio::test] - async fn test_create_unique_clients_local() { - let endpoints = vec![ - Endpoint { - url: Url::parse("http://localhost:9000").unwrap(), - is_local: true, - pool_idx: 0, - set_idx: 0, - disk_idx: 0, - }, - Endpoint { - url: Url::parse("http://localhost:9000").unwrap(), - is_local: true, - pool_idx: 0, - set_idx: 0, - disk_idx: 1, - }, - ]; - - let clients = create_unique_clients(&endpoints).await.unwrap(); - // Should only create one client for local endpoints - assert_eq!(clients.len(), 1); - assert!(clients[0].is_local().await); - } - - #[tokio::test] - async fn test_create_unique_clients_mixed() { - let endpoints = vec![ - Endpoint { - url: Url::parse("http://localhost:9000").unwrap(), - is_local: true, - pool_idx: 0, - set_idx: 0, - disk_idx: 0, - }, - Endpoint { - url: Url::parse("http://remote1:9000").unwrap(), - is_local: false, - pool_idx: 0, - set_idx: 0, - disk_idx: 1, - }, - Endpoint { - url: Url::parse("http://remote1:9000").unwrap(), - is_local: false, - pool_idx: 0, - set_idx: 0, - disk_idx: 2, - }, - Endpoint { - url: Url::parse("http://remote2:9000").unwrap(), - is_local: false, - pool_idx: 0, - set_idx: 0, - disk_idx: 3, - }, - ]; - - let clients = create_unique_clients(&endpoints).await.unwrap(); - // Should create 3 clients: 1 local + 2 unique remote - assert_eq!(clients.len(), 3); - - // Check that we have one local client - let local_count = clients.iter().filter(|c| futures::executor::block_on(c.is_local())).count(); - assert_eq!(local_count, 1); - - // Check that we have two remote clients - let remote_count = clients.iter().filter(|c| !futures::executor::block_on(c.is_local())).count(); - assert_eq!(remote_count, 2); - } -} diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 586773a8..3e238141 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -4041,34 +4041,34 @@ impl StorageAPI for SetDisks { del_errs.push(None) } - // Use fast batch locking to acquire all locks atomically - let mut _guards: HashMap = HashMap::new(); + // Acquire locks in batch mode (best effort, matching previous behavior) + let mut batch = rustfs_lock::BatchLockRequest::new(self.locker_owner.as_str()).with_all_or_nothing(false); let mut unique_objects: std::collections::HashSet = std::collections::HashSet::new(); - - // Collect unique object names for dobj in &objects { - unique_objects.insert(dobj.object_name.clone()); + if unique_objects.insert(dobj.object_name.clone()) { + batch = batch.add_write_lock(bucket, dobj.object_name.clone()); + } } - // Acquire all locks in batch to prevent deadlocks - for object_name in unique_objects { - match self - .fast_lock_manager - .acquire_write_lock(bucket, object_name.as_str(), self.locker_owner.as_str()) - .await - { - Ok(guard) => { - _guards.insert(object_name, guard); - } - Err(err) => { - let message = self.format_lock_error(bucket, object_name.as_str(), "write", &err); - // Mark all operations on this object as failed - for (i, dobj) in objects.iter().enumerate() { - if dobj.object_name == object_name { - del_errs[i] = Some(Error::other(message.clone())); - } - } - } + let batch_result = self.fast_lock_manager.acquire_locks_batch(batch).await; + let locked_objects: HashSet = batch_result + .successful_locks + .iter() + .map(|key| key.object.as_ref().to_string()) + .collect(); + let _lock_guards = batch_result.guards; + + let failed_map: HashMap<(String, String), rustfs_lock::fast_lock::LockResult> = batch_result + .failed_locks + .into_iter() + .map(|(key, err)| ((key.bucket.as_ref().to_string(), key.object.as_ref().to_string()), err)) + .collect(); + + // Mark failures for objects that could not be locked + for (i, dobj) in objects.iter().enumerate() { + if let Some(err) = failed_map.get(&(bucket.to_string(), dobj.object_name.clone())) { + let message = self.format_lock_error(bucket, dobj.object_name.as_str(), "write", err); + del_errs[i] = Some(Error::other(message)); } } @@ -4137,7 +4137,7 @@ impl StorageAPI for SetDisks { } // Only add to vers_map if we hold the lock - if _guards.contains_key(&dobj.object_name) { + if locked_objects.contains(&dobj.object_name) { vers_map.insert(&dobj.object_name, v); } } @@ -4558,7 +4558,6 @@ impl StorageAPI for SetDisks { }; // Acquire write-lock early; hold for the whole transition operation scope - // let mut _lock_guard: Option = None; // if !opts.no_lock { // let guard_opt = self // .namespace_lock @@ -4687,7 +4686,6 @@ impl StorageAPI for SetDisks { #[tracing::instrument(level = "debug", skip(self))] async fn restore_transitioned_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { // Acquire write-lock early for the restore operation - // let mut _lock_guard: Option = None; // if !opts.no_lock { // let guard_opt = self // .namespace_lock @@ -4772,7 +4770,6 @@ impl StorageAPI for SetDisks { #[tracing::instrument(level = "debug", skip(self))] async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result { // Acquire write-lock for tag update (metadata write) - // let mut _lock_guard: Option = None; // if !opts.no_lock { // let guard_opt = self // .namespace_lock @@ -5433,7 +5430,6 @@ impl StorageAPI for SetDisks { // let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution); // Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop. - // let mut _object_lock_guard: Option = None; if let Some(http_preconditions) = opts.http_preconditions.clone() { // if !opts.no_lock { // let guard_opt = self diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 2ba142f0..02a95179 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -56,8 +56,6 @@ use tokio::time::Duration; use tracing::warn; use tracing::{error, info}; -use crate::lock_utils::create_unique_clients; - #[derive(Debug, Clone)] pub struct Sets { pub id: Uuid, @@ -164,8 +162,6 @@ impl Sets { } } - let _lock_clients = create_unique_clients(&set_endpoints).await?; - // Note: write_quorum was used for the old lock system, no longer needed with FastLock let _write_quorum = set_drive_count - parity_count; // Create fast lock manager for high performance diff --git a/crates/lock/src/fast_lock/disabled_manager.rs b/crates/lock/src/fast_lock/disabled_manager.rs index caf4f329..b1e9b34c 100644 --- a/crates/lock/src/fast_lock/disabled_manager.rs +++ b/crates/lock/src/fast_lock/disabled_manager.rs @@ -98,12 +98,18 @@ impl DisabledLockManager { /// Always succeeds - all locks acquired pub async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult { - let successful_locks: Vec = batch_request.requests.into_iter().map(|req| req.key).collect(); + let successful_locks: Vec = batch_request.requests.iter().map(|req| req.key.clone()).collect(); + let guards = batch_request + .requests + .into_iter() + .map(|req| FastLockGuard::new_disabled(req.key, req.mode, req.owner)) + .collect(); BatchLockResult { successful_locks, failed_locks: Vec::new(), all_acquired: true, + guards, } } diff --git a/crates/lock/src/fast_lock/manager.rs b/crates/lock/src/fast_lock/manager.rs index 20b10bf6..64767bde 100644 --- a/crates/lock/src/fast_lock/manager.rs +++ b/crates/lock/src/fast_lock/manager.rs @@ -217,20 +217,33 @@ impl FastObjectLockManager { ) -> BatchLockResult { let mut all_successful = Vec::new(); let mut all_failed = Vec::new(); + let mut guards = Vec::new(); for (&shard_id, requests) in shard_groups { - let shard = &self.shards[shard_id]; + let shard = self.shards[shard_id].clone(); - // Try fast path first for each request for request in requests { - if shard.try_fast_path_only(request) { - all_successful.push(request.key.clone()); + let key = request.key.clone(); + let owner = request.owner.clone(); + let mode = request.mode; + + let acquired = if shard.try_fast_path_only(request) { + true } else { - // Fallback to slow path match shard.acquire_lock(request).await { - Ok(()) => all_successful.push(request.key.clone()), - Err(err) => all_failed.push((request.key.clone(), err)), + Ok(()) => true, + Err(err) => { + all_failed.push((key.clone(), err)); + false + } } + }; + + if acquired { + let guard = FastLockGuard::new(key.clone(), mode, owner.clone(), shard.clone()); + shard.register_guard(guard.guard_id()); + all_successful.push(key); + guards.push(guard); } } } @@ -240,6 +253,7 @@ impl FastObjectLockManager { successful_locks: all_successful, failed_locks: all_failed, all_acquired, + guards, } } @@ -249,16 +263,18 @@ impl FastObjectLockManager { shard_groups: &std::collections::HashMap>, ) -> BatchLockResult { // Phase 1: Try to acquire all locks - let mut acquired_locks = Vec::new(); + let mut acquired_guards = Vec::new(); let mut failed_locks = Vec::new(); 'outer: for (&shard_id, requests) in shard_groups { - let shard = &self.shards[shard_id]; + let shard = self.shards[shard_id].clone(); for request in requests { match shard.acquire_lock(request).await { Ok(()) => { - acquired_locks.push((request.key.clone(), request.mode, request.owner.clone())); + let guard = FastLockGuard::new(request.key.clone(), request.mode, request.owner.clone(), shard.clone()); + shard.register_guard(guard.guard_id()); + acquired_guards.push(guard); } Err(err) => { failed_locks.push((request.key.clone(), err)); @@ -270,35 +286,22 @@ impl FastObjectLockManager { // Phase 2: If any failed, release all acquired locks with error tracking if !failed_locks.is_empty() { - let mut cleanup_failures = 0; - for (key, mode, owner) in acquired_locks { - let shard = self.get_shard(&key); - if !shard.release_lock(&key, &owner, mode) { - cleanup_failures += 1; - tracing::warn!( - "Failed to release lock during batch cleanup: bucket={}, object={}", - key.bucket, - key.object - ); - } - } - - if cleanup_failures > 0 { - tracing::error!("Batch lock cleanup had {} failures", cleanup_failures); - } - + // Drop guards to release any acquired locks. + drop(acquired_guards); return BatchLockResult { successful_locks: Vec::new(), failed_locks, all_acquired: false, + guards: Vec::new(), }; } - // All successful + let successful_locks = acquired_guards.iter().map(|guard| guard.key().clone()).collect(); BatchLockResult { - successful_locks: acquired_locks.into_iter().map(|(key, _, _)| key).collect(), + successful_locks, failed_locks: Vec::new(), all_acquired: true, + guards: acquired_guards, } } diff --git a/crates/lock/src/fast_lock/types.rs b/crates/lock/src/fast_lock/types.rs index ea568f12..562cc1b3 100644 --- a/crates/lock/src/fast_lock/types.rs +++ b/crates/lock/src/fast_lock/types.rs @@ -19,6 +19,8 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use crate::fast_lock::guard::FastLockGuard; + /// Object key for version-aware locking #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct ObjectKey { @@ -340,6 +342,7 @@ pub struct BatchLockResult { pub successful_locks: Vec, pub failed_locks: Vec<(ObjectKey, LockResult)>, pub all_acquired: bool, + pub guards: Vec, } #[cfg(test)]