mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Improve lock (#596)
* improve lock Signed-off-by: Mu junxiang <1948535941@qq.com> * feat(tests): add wait_for_object_absence helper and improve lifecycle test reliability Signed-off-by: Mu junxiang <1948535941@qq.com> * chore: remove dirty docs Signed-off-by: Mu junxiang <1948535941@qq.com> --------- Signed-off-by: Mu junxiang <1948535941@qq.com>
This commit is contained in:
@@ -302,6 +302,22 @@ async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &s
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_object_absence(ecstore: &Arc<ECStore>, bucket: &str, object: &str, timeout: Duration) -> bool {
|
||||
let deadline = tokio::time::Instant::now() + timeout;
|
||||
|
||||
loop {
|
||||
if !object_exists(ecstore, bucket, object).await {
|
||||
return true;
|
||||
}
|
||||
|
||||
if tokio::time::Instant::now() >= deadline {
|
||||
return false;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
|
||||
mod serial_tests {
|
||||
use super::*;
|
||||
|
||||
@@ -311,25 +327,26 @@ mod serial_tests {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
// Create test bucket and object
|
||||
let bucket_name = "test-lifecycle-expiry-basic-bucket";
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-expiry-basic-{}", &suffix[..8]);
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name).await;
|
||||
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name, object_name).await);
|
||||
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
|
||||
set_bucket_lifecycle(bucket_name)
|
||||
set_bucket_lifecycle(bucket_name.as_str())
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
|
||||
println!("✅ Lifecycle configuration set for bucket: {}", bucket_name);
|
||||
|
||||
// Verify lifecycle configuration was set
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await {
|
||||
Ok(bucket_meta) => {
|
||||
assert!(bucket_meta.lifecycle_config.is_some());
|
||||
println!("✅ Bucket metadata retrieved successfully");
|
||||
@@ -360,20 +377,60 @@ mod serial_tests {
|
||||
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
|
||||
println!("✅ Manual scan cycle completed");
|
||||
|
||||
// Wait a bit more for background workers to process expiry tasks
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
let mut expired = false;
|
||||
for attempt in 0..3 {
|
||||
if attempt > 0 {
|
||||
scanner.scan_cycle().await.expect("Failed to trigger scan cycle on retry");
|
||||
}
|
||||
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(5)).await;
|
||||
if expired {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if object has been expired (delete_marker)
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object is_delete_marker after lifecycle processing: {check_result}");
|
||||
println!("Object is_delete_marker after lifecycle processing: {}", !expired);
|
||||
|
||||
if check_result {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
if !expired {
|
||||
let pending = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::GLOBAL_ExpiryState
|
||||
.read()
|
||||
.await
|
||||
.pending_tasks()
|
||||
.await;
|
||||
println!("Pending expiry tasks: {pending}");
|
||||
|
||||
if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
|
||||
if let Ok(object_info) = ecstore
|
||||
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
|
||||
&lc_config,
|
||||
None,
|
||||
None,
|
||||
&object_info,
|
||||
)
|
||||
.await;
|
||||
|
||||
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
|
||||
ecstore.clone(),
|
||||
&object_info,
|
||||
&event,
|
||||
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
|
||||
)
|
||||
.await;
|
||||
|
||||
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
|
||||
}
|
||||
}
|
||||
|
||||
if !expired {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
}
|
||||
} else {
|
||||
println!("✅ Object was successfully deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(obj_info) => {
|
||||
@@ -388,7 +445,7 @@ mod serial_tests {
|
||||
}
|
||||
}
|
||||
|
||||
assert!(!check_result);
|
||||
assert!(expired);
|
||||
println!("✅ Object successfully expired");
|
||||
|
||||
// Stop scanner
|
||||
@@ -404,25 +461,26 @@ mod serial_tests {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
// Create test bucket and object
|
||||
let bucket_name = "test-lifecycle-expiry-deletemarker-bucket";
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-expiry-marker-{}", &suffix[..8]);
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name).await;
|
||||
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name, object_name).await);
|
||||
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
|
||||
set_bucket_lifecycle_deletemarker(bucket_name)
|
||||
set_bucket_lifecycle_deletemarker(bucket_name.as_str())
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
|
||||
println!("✅ Lifecycle configuration set for bucket: {}", bucket_name);
|
||||
|
||||
// Verify lifecycle configuration was set
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await {
|
||||
Ok(bucket_meta) => {
|
||||
assert!(bucket_meta.lifecycle_config.is_some());
|
||||
println!("✅ Bucket metadata retrieved successfully");
|
||||
@@ -453,36 +511,64 @@ mod serial_tests {
|
||||
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
|
||||
println!("✅ Manual scan cycle completed");
|
||||
|
||||
// Wait a bit more for background workers to process expiry tasks
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
let mut deleted = false;
|
||||
for attempt in 0..3 {
|
||||
if attempt > 0 {
|
||||
scanner.scan_cycle().await.expect("Failed to trigger scan cycle on retry");
|
||||
}
|
||||
deleted = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(5)).await;
|
||||
if deleted {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if object has been expired (deleted)
|
||||
//let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await;
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object exists after lifecycle processing: {check_result}");
|
||||
println!("Object exists after lifecycle processing: {}", !deleted);
|
||||
|
||||
if check_result {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
if !deleted {
|
||||
let pending = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::GLOBAL_ExpiryState
|
||||
.read()
|
||||
.await
|
||||
{
|
||||
Ok(obj_info) => {
|
||||
println!(
|
||||
"Object info: name={}, size={}, mod_time={:?}",
|
||||
obj_info.name, obj_info.size, obj_info.mod_time
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error getting object info: {e:?}");
|
||||
.pending_tasks()
|
||||
.await;
|
||||
println!("Pending expiry tasks: {pending}");
|
||||
|
||||
if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
|
||||
if let Ok(obj_info) = ecstore
|
||||
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
|
||||
&lc_config, None, None, &obj_info,
|
||||
)
|
||||
.await;
|
||||
|
||||
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
|
||||
ecstore.clone(),
|
||||
&obj_info,
|
||||
&event,
|
||||
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
|
||||
)
|
||||
.await;
|
||||
|
||||
deleted = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
|
||||
|
||||
if !deleted {
|
||||
println!(
|
||||
"Object info: name={}, size={}, mod_time={:?}",
|
||||
obj_info.name, obj_info.size, obj_info.mod_time
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !deleted {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
}
|
||||
} else {
|
||||
println!("✅ Object was successfully deleted by lifecycle processing");
|
||||
}
|
||||
|
||||
assert!(!check_result);
|
||||
assert!(deleted);
|
||||
println!("✅ Object successfully expired");
|
||||
|
||||
// Stop scanner
|
||||
@@ -500,15 +586,16 @@ mod serial_tests {
|
||||
//create_test_tier().await;
|
||||
|
||||
// Create test bucket and object
|
||||
let bucket_name = "test-lifecycle-transition-basic-bucket";
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-transition-{}", &suffix[..8]);
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name).await;
|
||||
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name, object_name).await);
|
||||
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
|
||||
@@ -554,14 +641,14 @@ mod serial_tests {
|
||||
|
||||
// Check if object has been expired (deleted)
|
||||
//let check_result = object_is_transitioned(&ecstore, bucket_name, object_name).await;
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
let check_result = object_exists(&ecstore, bucket_name.as_str(), object_name).await;
|
||||
println!("Object exists after lifecycle processing: {check_result}");
|
||||
|
||||
if check_result {
|
||||
println!("✅ Object was not deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(obj_info) => {
|
||||
|
||||
@@ -14,14 +14,14 @@
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rustfs_ecstore::{disk::endpoint::Endpoint, lock_utils::create_unique_clients};
|
||||
use rustfs_lock::client::{LockClient, local::LocalClient};
|
||||
use rustfs_ecstore::disk::endpoint::Endpoint;
|
||||
use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient};
|
||||
use rustfs_lock::types::{LockInfo, LockResponse, LockStats};
|
||||
use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType};
|
||||
use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager};
|
||||
use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
|
||||
use serial_test::serial;
|
||||
use std::{error::Error, sync::Arc, time::Duration};
|
||||
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration};
|
||||
use tokio::time::sleep;
|
||||
use tonic::Request;
|
||||
use url::Url;
|
||||
@@ -38,6 +38,34 @@ fn get_cluster_endpoints() -> Vec<Endpoint> {
|
||||
}]
|
||||
}
|
||||
|
||||
async fn create_unique_clients(endpoints: &[Endpoint]) -> Result<Vec<Arc<dyn LockClient>>, Box<dyn Error>> {
|
||||
let mut unique_endpoints: HashMap<String, &Endpoint> = HashMap::new();
|
||||
|
||||
for endpoint in endpoints {
|
||||
if endpoint.is_local {
|
||||
unique_endpoints.insert("local".to_string(), endpoint);
|
||||
} else {
|
||||
let host_port = format!(
|
||||
"{}:{}",
|
||||
endpoint.url.host_str().unwrap_or("localhost"),
|
||||
endpoint.url.port().unwrap_or(9000)
|
||||
);
|
||||
unique_endpoints.insert(host_port, endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
let mut clients = Vec::new();
|
||||
for (_key, endpoint) in unique_endpoints {
|
||||
if endpoint.is_local {
|
||||
clients.push(Arc::new(LocalClient::new()) as Arc<dyn LockClient>);
|
||||
} else {
|
||||
clients.push(Arc::new(RemoteClient::new(endpoint.url.to_string())) as Arc<dyn LockClient>);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(clients)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
#[ignore = "requires running RustFS server at localhost:9000"]
|
||||
|
||||
@@ -31,7 +31,6 @@ pub mod erasure_coding;
|
||||
pub mod error;
|
||||
pub mod file_cache;
|
||||
pub mod global;
|
||||
pub mod lock_utils;
|
||||
pub mod metrics_realtime;
|
||||
pub mod notification_sys;
|
||||
pub mod pools;
|
||||
|
||||
@@ -1,136 +0,0 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::disk::endpoint::Endpoint;
|
||||
use crate::error::Result;
|
||||
use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Create unique lock clients from endpoints
|
||||
/// This function creates one client per unique host:port combination
|
||||
/// to avoid duplicate connections to the same server
|
||||
pub async fn create_unique_clients(endpoints: &[Endpoint]) -> Result<Vec<Arc<dyn LockClient>>> {
|
||||
let mut unique_endpoints: HashMap<String, &Endpoint> = HashMap::new();
|
||||
|
||||
// Collect unique endpoints based on host:port
|
||||
for endpoint in endpoints {
|
||||
if endpoint.is_local {
|
||||
// For local endpoints, use "local" as the key
|
||||
unique_endpoints.insert("local".to_string(), endpoint);
|
||||
} else {
|
||||
// For remote endpoints, use host:port as the key
|
||||
let host_port = format!(
|
||||
"{}:{}",
|
||||
endpoint.url.host_str().unwrap_or("localhost"),
|
||||
endpoint.url.port().unwrap_or(9000)
|
||||
);
|
||||
unique_endpoints.insert(host_port, endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
let mut clients = Vec::new();
|
||||
|
||||
// Create clients for unique endpoints
|
||||
for (_key, endpoint) in unique_endpoints {
|
||||
if endpoint.is_local {
|
||||
// For local endpoints, create a local lock client
|
||||
let local_client = LocalClient::new();
|
||||
clients.push(Arc::new(local_client) as Arc<dyn LockClient>);
|
||||
} else {
|
||||
// For remote endpoints, create a remote lock client
|
||||
let remote_client = RemoteClient::new(endpoint.url.to_string());
|
||||
clients.push(Arc::new(remote_client) as Arc<dyn LockClient>);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(clients)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use url::Url;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_unique_clients_local() {
|
||||
let endpoints = vec![
|
||||
Endpoint {
|
||||
url: Url::parse("http://localhost:9000").unwrap(),
|
||||
is_local: true,
|
||||
pool_idx: 0,
|
||||
set_idx: 0,
|
||||
disk_idx: 0,
|
||||
},
|
||||
Endpoint {
|
||||
url: Url::parse("http://localhost:9000").unwrap(),
|
||||
is_local: true,
|
||||
pool_idx: 0,
|
||||
set_idx: 0,
|
||||
disk_idx: 1,
|
||||
},
|
||||
];
|
||||
|
||||
let clients = create_unique_clients(&endpoints).await.unwrap();
|
||||
// Should only create one client for local endpoints
|
||||
assert_eq!(clients.len(), 1);
|
||||
assert!(clients[0].is_local().await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_unique_clients_mixed() {
|
||||
let endpoints = vec![
|
||||
Endpoint {
|
||||
url: Url::parse("http://localhost:9000").unwrap(),
|
||||
is_local: true,
|
||||
pool_idx: 0,
|
||||
set_idx: 0,
|
||||
disk_idx: 0,
|
||||
},
|
||||
Endpoint {
|
||||
url: Url::parse("http://remote1:9000").unwrap(),
|
||||
is_local: false,
|
||||
pool_idx: 0,
|
||||
set_idx: 0,
|
||||
disk_idx: 1,
|
||||
},
|
||||
Endpoint {
|
||||
url: Url::parse("http://remote1:9000").unwrap(),
|
||||
is_local: false,
|
||||
pool_idx: 0,
|
||||
set_idx: 0,
|
||||
disk_idx: 2,
|
||||
},
|
||||
Endpoint {
|
||||
url: Url::parse("http://remote2:9000").unwrap(),
|
||||
is_local: false,
|
||||
pool_idx: 0,
|
||||
set_idx: 0,
|
||||
disk_idx: 3,
|
||||
},
|
||||
];
|
||||
|
||||
let clients = create_unique_clients(&endpoints).await.unwrap();
|
||||
// Should create 3 clients: 1 local + 2 unique remote
|
||||
assert_eq!(clients.len(), 3);
|
||||
|
||||
// Check that we have one local client
|
||||
let local_count = clients.iter().filter(|c| futures::executor::block_on(c.is_local())).count();
|
||||
assert_eq!(local_count, 1);
|
||||
|
||||
// Check that we have two remote clients
|
||||
let remote_count = clients.iter().filter(|c| !futures::executor::block_on(c.is_local())).count();
|
||||
assert_eq!(remote_count, 2);
|
||||
}
|
||||
}
|
||||
@@ -4041,34 +4041,34 @@ impl StorageAPI for SetDisks {
|
||||
del_errs.push(None)
|
||||
}
|
||||
|
||||
// Use fast batch locking to acquire all locks atomically
|
||||
let mut _guards: HashMap<String, rustfs_lock::FastLockGuard> = HashMap::new();
|
||||
// Acquire locks in batch mode (best effort, matching previous behavior)
|
||||
let mut batch = rustfs_lock::BatchLockRequest::new(self.locker_owner.as_str()).with_all_or_nothing(false);
|
||||
let mut unique_objects: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
|
||||
// Collect unique object names
|
||||
for dobj in &objects {
|
||||
unique_objects.insert(dobj.object_name.clone());
|
||||
if unique_objects.insert(dobj.object_name.clone()) {
|
||||
batch = batch.add_write_lock(bucket, dobj.object_name.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire all locks in batch to prevent deadlocks
|
||||
for object_name in unique_objects {
|
||||
match self
|
||||
.fast_lock_manager
|
||||
.acquire_write_lock(bucket, object_name.as_str(), self.locker_owner.as_str())
|
||||
.await
|
||||
{
|
||||
Ok(guard) => {
|
||||
_guards.insert(object_name, guard);
|
||||
}
|
||||
Err(err) => {
|
||||
let message = self.format_lock_error(bucket, object_name.as_str(), "write", &err);
|
||||
// Mark all operations on this object as failed
|
||||
for (i, dobj) in objects.iter().enumerate() {
|
||||
if dobj.object_name == object_name {
|
||||
del_errs[i] = Some(Error::other(message.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
let batch_result = self.fast_lock_manager.acquire_locks_batch(batch).await;
|
||||
let locked_objects: HashSet<String> = batch_result
|
||||
.successful_locks
|
||||
.iter()
|
||||
.map(|key| key.object.as_ref().to_string())
|
||||
.collect();
|
||||
let _lock_guards = batch_result.guards;
|
||||
|
||||
let failed_map: HashMap<(String, String), rustfs_lock::fast_lock::LockResult> = batch_result
|
||||
.failed_locks
|
||||
.into_iter()
|
||||
.map(|(key, err)| ((key.bucket.as_ref().to_string(), key.object.as_ref().to_string()), err))
|
||||
.collect();
|
||||
|
||||
// Mark failures for objects that could not be locked
|
||||
for (i, dobj) in objects.iter().enumerate() {
|
||||
if let Some(err) = failed_map.get(&(bucket.to_string(), dobj.object_name.clone())) {
|
||||
let message = self.format_lock_error(bucket, dobj.object_name.as_str(), "write", err);
|
||||
del_errs[i] = Some(Error::other(message));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4137,7 +4137,7 @@ impl StorageAPI for SetDisks {
|
||||
}
|
||||
|
||||
// Only add to vers_map if we hold the lock
|
||||
if _guards.contains_key(&dobj.object_name) {
|
||||
if locked_objects.contains(&dobj.object_name) {
|
||||
vers_map.insert(&dobj.object_name, v);
|
||||
}
|
||||
}
|
||||
@@ -4558,7 +4558,6 @@ impl StorageAPI for SetDisks {
|
||||
};
|
||||
|
||||
// Acquire write-lock early; hold for the whole transition operation scope
|
||||
// let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
// if !opts.no_lock {
|
||||
// let guard_opt = self
|
||||
// .namespace_lock
|
||||
@@ -4687,7 +4686,6 @@ impl StorageAPI for SetDisks {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn restore_transitioned_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
|
||||
// Acquire write-lock early for the restore operation
|
||||
// let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
// if !opts.no_lock {
|
||||
// let guard_opt = self
|
||||
// .namespace_lock
|
||||
@@ -4772,7 +4770,6 @@ impl StorageAPI for SetDisks {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
// Acquire write-lock for tag update (metadata write)
|
||||
// let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
// if !opts.no_lock {
|
||||
// let guard_opt = self
|
||||
// .namespace_lock
|
||||
@@ -5433,7 +5430,6 @@ impl StorageAPI for SetDisks {
|
||||
// let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
|
||||
|
||||
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
|
||||
// let mut _object_lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if let Some(http_preconditions) = opts.http_preconditions.clone() {
|
||||
// if !opts.no_lock {
|
||||
// let guard_opt = self
|
||||
|
||||
@@ -56,8 +56,6 @@ use tokio::time::Duration;
|
||||
use tracing::warn;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::lock_utils::create_unique_clients;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Sets {
|
||||
pub id: Uuid,
|
||||
@@ -164,8 +162,6 @@ impl Sets {
|
||||
}
|
||||
}
|
||||
|
||||
let _lock_clients = create_unique_clients(&set_endpoints).await?;
|
||||
|
||||
// Note: write_quorum was used for the old lock system, no longer needed with FastLock
|
||||
let _write_quorum = set_drive_count - parity_count;
|
||||
// Create fast lock manager for high performance
|
||||
|
||||
@@ -98,12 +98,18 @@ impl DisabledLockManager {
|
||||
|
||||
/// Always succeeds - all locks acquired
|
||||
pub async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult {
|
||||
let successful_locks: Vec<ObjectKey> = batch_request.requests.into_iter().map(|req| req.key).collect();
|
||||
let successful_locks: Vec<ObjectKey> = batch_request.requests.iter().map(|req| req.key.clone()).collect();
|
||||
let guards = batch_request
|
||||
.requests
|
||||
.into_iter()
|
||||
.map(|req| FastLockGuard::new_disabled(req.key, req.mode, req.owner))
|
||||
.collect();
|
||||
|
||||
BatchLockResult {
|
||||
successful_locks,
|
||||
failed_locks: Vec::new(),
|
||||
all_acquired: true,
|
||||
guards,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -217,20 +217,33 @@ impl FastObjectLockManager {
|
||||
) -> BatchLockResult {
|
||||
let mut all_successful = Vec::new();
|
||||
let mut all_failed = Vec::new();
|
||||
let mut guards = Vec::new();
|
||||
|
||||
for (&shard_id, requests) in shard_groups {
|
||||
let shard = &self.shards[shard_id];
|
||||
let shard = self.shards[shard_id].clone();
|
||||
|
||||
// Try fast path first for each request
|
||||
for request in requests {
|
||||
if shard.try_fast_path_only(request) {
|
||||
all_successful.push(request.key.clone());
|
||||
let key = request.key.clone();
|
||||
let owner = request.owner.clone();
|
||||
let mode = request.mode;
|
||||
|
||||
let acquired = if shard.try_fast_path_only(request) {
|
||||
true
|
||||
} else {
|
||||
// Fallback to slow path
|
||||
match shard.acquire_lock(request).await {
|
||||
Ok(()) => all_successful.push(request.key.clone()),
|
||||
Err(err) => all_failed.push((request.key.clone(), err)),
|
||||
Ok(()) => true,
|
||||
Err(err) => {
|
||||
all_failed.push((key.clone(), err));
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if acquired {
|
||||
let guard = FastLockGuard::new(key.clone(), mode, owner.clone(), shard.clone());
|
||||
shard.register_guard(guard.guard_id());
|
||||
all_successful.push(key);
|
||||
guards.push(guard);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -240,6 +253,7 @@ impl FastObjectLockManager {
|
||||
successful_locks: all_successful,
|
||||
failed_locks: all_failed,
|
||||
all_acquired,
|
||||
guards,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,16 +263,18 @@ impl FastObjectLockManager {
|
||||
shard_groups: &std::collections::HashMap<usize, Vec<ObjectLockRequest>>,
|
||||
) -> BatchLockResult {
|
||||
// Phase 1: Try to acquire all locks
|
||||
let mut acquired_locks = Vec::new();
|
||||
let mut acquired_guards = Vec::new();
|
||||
let mut failed_locks = Vec::new();
|
||||
|
||||
'outer: for (&shard_id, requests) in shard_groups {
|
||||
let shard = &self.shards[shard_id];
|
||||
let shard = self.shards[shard_id].clone();
|
||||
|
||||
for request in requests {
|
||||
match shard.acquire_lock(request).await {
|
||||
Ok(()) => {
|
||||
acquired_locks.push((request.key.clone(), request.mode, request.owner.clone()));
|
||||
let guard = FastLockGuard::new(request.key.clone(), request.mode, request.owner.clone(), shard.clone());
|
||||
shard.register_guard(guard.guard_id());
|
||||
acquired_guards.push(guard);
|
||||
}
|
||||
Err(err) => {
|
||||
failed_locks.push((request.key.clone(), err));
|
||||
@@ -270,35 +286,22 @@ impl FastObjectLockManager {
|
||||
|
||||
// Phase 2: If any failed, release all acquired locks with error tracking
|
||||
if !failed_locks.is_empty() {
|
||||
let mut cleanup_failures = 0;
|
||||
for (key, mode, owner) in acquired_locks {
|
||||
let shard = self.get_shard(&key);
|
||||
if !shard.release_lock(&key, &owner, mode) {
|
||||
cleanup_failures += 1;
|
||||
tracing::warn!(
|
||||
"Failed to release lock during batch cleanup: bucket={}, object={}",
|
||||
key.bucket,
|
||||
key.object
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if cleanup_failures > 0 {
|
||||
tracing::error!("Batch lock cleanup had {} failures", cleanup_failures);
|
||||
}
|
||||
|
||||
// Drop guards to release any acquired locks.
|
||||
drop(acquired_guards);
|
||||
return BatchLockResult {
|
||||
successful_locks: Vec::new(),
|
||||
failed_locks,
|
||||
all_acquired: false,
|
||||
guards: Vec::new(),
|
||||
};
|
||||
}
|
||||
|
||||
// All successful
|
||||
let successful_locks = acquired_guards.iter().map(|guard| guard.key().clone()).collect();
|
||||
BatchLockResult {
|
||||
successful_locks: acquired_locks.into_iter().map(|(key, _, _)| key).collect(),
|
||||
successful_locks,
|
||||
failed_locks: Vec::new(),
|
||||
all_acquired: true,
|
||||
guards: acquired_guards,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ use std::hash::{Hash, Hasher};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use crate::fast_lock::guard::FastLockGuard;
|
||||
|
||||
/// Object key for version-aware locking
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
pub struct ObjectKey {
|
||||
@@ -340,6 +342,7 @@ pub struct BatchLockResult {
|
||||
pub successful_locks: Vec<ObjectKey>,
|
||||
pub failed_locks: Vec<(ObjectKey, LockResult)>,
|
||||
pub all_acquired: bool,
|
||||
pub guards: Vec<FastLockGuard>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user