Feature: lock support auto release

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-08-09 17:52:08 +08:00
parent b391272e94
commit e369e9f481
5 changed files with 403 additions and 46 deletions

View File

@@ -38,6 +38,79 @@ fn get_cluster_endpoints() -> Vec<Endpoint> {
}]
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_guard_drop_releases_exclusive_lock_local() -> Result<(), Box<dyn Error>> {
// Single local client; no external server required
let client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let ns_lock = NamespaceLock::with_clients("e2e_guard_local".to_string(), vec![client]);
// Acquire exclusive guard
let g1 = ns_lock
.lock_guard("guard_exclusive", "owner1", Duration::from_millis(100), Duration::from_secs(5))
.await?;
assert!(g1.is_some(), "first guard acquisition should succeed");
// While g1 is alive, second exclusive acquisition should fail
let g2 = ns_lock
.lock_guard("guard_exclusive", "owner2", Duration::from_millis(50), Duration::from_secs(5))
.await?;
assert!(g2.is_none(), "second guard acquisition should fail while first is held");
// Drop first guard to trigger background release
drop(g1);
// Give the background unlock worker a short moment to process
sleep(Duration::from_millis(80)).await;
// Now acquisition should succeed
let g3 = ns_lock
.lock_guard("guard_exclusive", "owner2", Duration::from_millis(100), Duration::from_secs(5))
.await?;
assert!(g3.is_some(), "acquisition should succeed after guard drop releases the lock");
drop(g3);
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_guard_shared_then_write_after_drop() -> Result<(), Box<dyn Error>> {
// Two shared read guards should coexist; write should be blocked until they drop
let client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
let ns_lock = NamespaceLock::with_clients("e2e_guard_rw".to_string(), vec![client]);
// Acquire two read guards
let r1 = ns_lock
.rlock_guard("rw_resource", "reader1", Duration::from_millis(100), Duration::from_secs(5))
.await?;
let r2 = ns_lock
.rlock_guard("rw_resource", "reader2", Duration::from_millis(100), Duration::from_secs(5))
.await?;
assert!(r1.is_some() && r2.is_some(), "both read guards should be acquired");
// Attempt write while readers hold the lock should fail
let w_fail = ns_lock
.lock_guard("rw_resource", "writer", Duration::from_millis(50), Duration::from_secs(5))
.await?;
assert!(w_fail.is_none(), "write should be blocked when read guards are active");
// Drop read guards to release
drop(r1);
drop(r2);
sleep(Duration::from_millis(80)).await;
// Now write should succeed
let w_ok = ns_lock
.lock_guard("rw_resource", "writer", Duration::from_millis(150), Duration::from_secs(5))
.await?;
assert!(w_ok.is_some(), "write should succeed after read guards are dropped");
drop(w_ok);
Ok(())
}
#[tokio::test]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]

View File

@@ -3211,6 +3211,20 @@ impl ObjectIO for SetDisks {
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
// Acquire a shared read-lock early to protect read consistency
let mut _read_lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_read_lock_guard = guard_opt;
}
let (fi, files, disks) = self
.get_object_fileinfo(bucket, object, opts, true)
.await
@@ -3256,7 +3270,10 @@ impl ObjectIO for SetDisks {
let object = object.to_owned();
let set_index = self.set_index;
let pool_index = self.pool_index;
// Move the read-lock guard into the task so it lives for the duration of the read
let _guard_to_hold = _read_lock_guard; // moved into closure below
tokio::spawn(async move {
let _guard = _guard_to_hold; // keep guard alive until task ends
if let Err(e) = Self::get_object_with_fileinfo(
&bucket,
&object,
@@ -3284,16 +3301,18 @@ impl ObjectIO for SetDisks {
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
let disks = self.disks.read().await;
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
let mut _object_lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let paths = vec![object.to_string()];
let lock_acquired = self
let guard_opt = self
.namespace_lock
.lock_batch(&paths, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if !lock_acquired {
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_object_lock_guard = guard_opt;
}
let mut user_defined = opts.user_defined.clone();
@@ -3500,14 +3519,6 @@ impl ObjectIO for SetDisks {
self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_dir).await?;
// Release lock if it was acquired
if !opts.no_lock {
let paths = vec![object.to_string()];
if let Err(err) = self.namespace_lock.unlock_batch(&paths, &self.locker_owner).await {
error!("Failed to unlock object {}: {}", object, err);
}
}
for (i, op_disk) in online_disks.iter().enumerate() {
if let Some(disk) = op_disk {
if disk.is_online().await {
@@ -3583,6 +3594,19 @@ impl StorageAPI for SetDisks {
return Err(StorageError::NotImplemented);
}
// Guard lock for source object metadata update
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
{
let guard_opt = self
.namespace_lock
.lock_guard(src_object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
let disks = self.get_disks_internal().await;
let (mut metas, errs) = {
@@ -3676,6 +3700,18 @@ impl StorageAPI for SetDisks {
}
#[tracing::instrument(skip(self))]
async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> {
// Guard lock for single object delete-version
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
{
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
let disks = self.get_disks(0, 0).await?;
let write_quorum = disks.len() / 2 + 1;
@@ -3732,6 +3768,23 @@ impl StorageAPI for SetDisks {
del_errs.push(None)
}
// Per-object guards to keep until function end
let mut _guards: Vec<Option<rustfs_lock::LockGuard>> = Vec::with_capacity(objects.len());
// Acquire locks for all objects first; mark errors for failures
for (i, dobj) in objects.iter().enumerate() {
match self
.namespace_lock
.lock_guard(&dobj.object_name, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?
{
Some(g) => _guards.push(Some(g)),
None => {
del_errs[i] = Some(Error::other("can not get lock. please retry"));
_guards.push(None);
}
}
}
// let mut del_fvers = Vec::with_capacity(objects.len());
let mut vers_map: HashMap<&String, FileInfoVersions> = HashMap::new();
@@ -3788,7 +3841,10 @@ impl StorageAPI for SetDisks {
}
}
vers_map.insert(&dobj.object_name, v);
// Only add to vers_map if we hold the lock
if _guards[i].is_some() {
vers_map.insert(&dobj.object_name, v);
}
}
let mut vers = Vec::with_capacity(vers_map.len());
@@ -3830,6 +3886,18 @@ impl StorageAPI for SetDisks {
#[tracing::instrument(skip(self))]
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
// Guard lock for single object delete
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.delete_prefix {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
if opts.delete_prefix {
self.delete_prefix(bucket, object)
.await
@@ -3952,33 +4020,18 @@ impl StorageAPI for SetDisks {
#[tracing::instrument(skip(self))]
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
// let mut _ns = None;
// if !opts.no_lock {
// let paths = vec![object.to_string()];
// let ns_lock = new_nslock(
// Arc::clone(&self.ns_mutex),
// self.locker_owner.clone(),
// bucket.to_string(),
// paths,
// self.lockers.clone(),
// )
// .await;
// if !ns_lock
// .0
// .write()
// .await
// .get_lock(&Options {
// timeout: Duration::from_secs(5),
// retry_interval: Duration::from_secs(1),
// })
// .await
// .map_err(|err| Error::other(err.to_string()))?
// {
// return Err(Error::other("can not get lock. please retry".to_string()));
// }
// _ns = Some(ns_lock);
// }
// Acquire a shared read-lock to protect consistency during info fetch
let mut _read_lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_read_lock_guard = guard_opt;
}
let (fi, _, _) = self
.get_object_fileinfo(bucket, object, opts, false)
@@ -4010,6 +4063,19 @@ impl StorageAPI for SetDisks {
async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
// TODO: nslock
// Guard lock for metadata update
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
let disks = self.get_disks_internal().await;
let (metas, errs) = {
@@ -4100,12 +4166,18 @@ impl StorageAPI for SetDisks {
}
};
/*if !opts.no_lock {
let lk = self.new_ns_lock(bucket, object);
let lkctx = lk.get_lock(globalDeleteOperationTimeout)?;
//ctx = lkctx.Context()
//defer lk.Unlock(lkctx)
}*/
// Acquire write-lock early; hold for the whole transition operation scope
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
let (mut fi, meta_arr, online_disks) = self.get_object_fileinfo(bucket, object, opts, true).await?;
/*if err != nil {
@@ -4223,6 +4295,18 @@ impl StorageAPI for SetDisks {
#[tracing::instrument(level = "debug", skip(self))]
async fn restore_transitioned_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
// Acquire write-lock early for the restore operation
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
let set_restore_header_fn = async move |oi: &mut ObjectInfo, rerr: Option<Error>| -> Result<()> {
if rerr.is_none() {
return Ok(());
@@ -4296,6 +4380,18 @@ impl StorageAPI for SetDisks {
#[tracing::instrument(level = "debug", skip(self))]
async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
// Acquire write-lock for tag update (metadata write)
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
if !opts.no_lock {
let guard_opt = self
.namespace_lock
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
.await?;
if guard_opt.is_none() {
return Err(Error::other("can not get lock. please retry".to_string()));
}
_lock_guard = guard_opt;
}
let (mut fi, _, disks) = self.get_object_fileinfo(bucket, object, opts, false).await?;
fi.metadata.insert(AMZ_OBJECT_TAGGING.to_owned(), tags.to_owned());

130
crates/lock/src/guard.rs Normal file
View File

@@ -0,0 +1,130 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use once_cell::sync::Lazy;
use tokio::sync::mpsc;
use crate::{client::LockClient, types::LockId};
#[derive(Debug, Clone)]
struct UnlockJob {
lock_id: LockId,
clients: Vec<Arc<dyn LockClient>>, // cloned Arcs; cheap and shares state
}
#[derive(Debug)]
struct UnlockRuntime {
tx: mpsc::Sender<UnlockJob>,
}
// Global unlock runtime with background worker
static UNLOCK_RUNTIME: Lazy<UnlockRuntime> = Lazy::new(|| {
// Buffered channel to avoid blocking in Drop
let (tx, mut rx) = mpsc::channel::<UnlockJob>(1024);
// Spawn background worker when first used; assumes a Tokio runtime is available
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
// Best-effort release across clients; success if any succeeds
let mut any_ok = false;
let lock_id = job.lock_id.clone();
let futures = job
.clients
.into_iter()
.map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
})
.collect::<Vec<_>>();
let results = futures::future::join_all(futures).await;
if results.into_iter().any(|s| s) {
any_ok = true;
}
if !any_ok {
tracing::warn!("LockGuard background release failed for {}", lock_id);
} else {
tracing::debug!("LockGuard background released {}", lock_id);
}
}
});
UnlockRuntime { tx }
});
/// A RAII guard that releases the lock asynchronously when dropped.
#[derive(Debug)]
pub struct LockGuard {
lock_id: LockId,
clients: Vec<Arc<dyn LockClient>>,
/// If true, Drop will not try to release (used if user manually released).
disarmed: bool,
}
impl LockGuard {
pub(crate) fn new(lock_id: LockId, clients: Vec<Arc<dyn LockClient>>) -> Self {
Self {
lock_id,
clients,
disarmed: false,
}
}
/// Get the lock id associated with this guard
pub fn lock_id(&self) -> &LockId {
&self.lock_id
}
/// Manually disarm the guard so dropping it won't release the lock.
/// Call this if you explicitly released the lock elsewhere.
pub fn disarm(&mut self) {
self.disarmed = true;
}
}
impl Drop for LockGuard {
fn drop(&mut self) {
if self.disarmed {
return;
}
let job = UnlockJob {
lock_id: self.lock_id.clone(),
clients: self.clients.clone(),
};
// Try a non-blocking send to avoid panics in Drop
if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
// Channel full or closed; best-effort fallback: spawn a detached task
let lock_id = self.lock_id.clone();
let clients = self.clients.clone();
tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id);
// If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts.
let _ = tokio::spawn(async move {
let futures = clients
.into_iter()
.map(|client| {
let id = lock_id.clone();
async move { client.release(&id).await.unwrap_or(false) }
})
.collect::<Vec<_>>();
let _ = futures::future::join_all(futures).await;
});
}
}
}

View File

@@ -27,6 +27,7 @@ pub mod local;
// Core Modules
pub mod error;
pub mod guard;
pub mod types;
// ============================================================================
@@ -39,6 +40,7 @@ pub use crate::{
client::{LockClient, local::LocalClient, remote::RemoteClient},
// Error types
error::{LockError, Result},
guard::LockGuard,
local::LocalLockMap,
// Main components
namespace::{NamespaceLock, NamespaceLockManager},

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use crate::{
client::LockClient,
error::{LockError, Result},
guard::LockGuard,
types::{LockId, LockInfo, LockRequest, LockResponse, LockStatus, LockType},
};
@@ -90,6 +91,34 @@ impl NamespaceLock {
self.acquire_lock_with_2pc(request).await
}
/// Acquire a lock and return a RAII guard that will release asynchronously on Drop.
/// This is a thin wrapper around `acquire_lock` and will only create a guard when acquisition succeeds.
pub async fn acquire_guard(&self, request: &LockRequest) -> Result<Option<LockGuard>> {
let resp = self.acquire_lock(request).await?;
if resp.success {
let guard = LockGuard::new(LockId::new_deterministic(&request.resource), self.clients.clone());
Ok(Some(guard))
} else {
Ok(None)
}
}
/// Convenience: acquire exclusive lock as a guard
pub async fn lock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result<Option<LockGuard>> {
let req = LockRequest::new(self.get_resource_key(resource), LockType::Exclusive, owner)
.with_acquire_timeout(timeout)
.with_ttl(ttl);
self.acquire_guard(&req).await
}
/// Convenience: acquire shared lock as a guard
pub async fn rlock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result<Option<LockGuard>> {
let req = LockRequest::new(self.get_resource_key(resource), LockType::Shared, owner)
.with_acquire_timeout(timeout)
.with_ttl(ttl);
self.acquire_guard(&req).await
}
/// Two-phase commit lock acquisition: all nodes must succeed or all fail
async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result<LockResponse> {
// Phase 1: Prepare - try to acquire lock on all clients
@@ -420,6 +449,33 @@ mod tests {
assert!(result.is_ok());
}
#[tokio::test]
async fn test_guard_acquire_and_drop_release() {
let ns_lock = NamespaceLock::with_client(Arc::new(LocalClient::new()));
// Acquire guard
let guard = ns_lock
.lock_guard("guard-resource", "owner", Duration::from_millis(100), Duration::from_secs(5))
.await
.unwrap();
assert!(guard.is_some());
let lock_id = guard.as_ref().unwrap().lock_id().clone();
// Drop guard to trigger background release
drop(guard);
// Give background worker a moment to process
tokio::time::sleep(Duration::from_millis(50)).await;
// Re-acquire should succeed (previous lock released)
let req = LockRequest::new(&lock_id.resource, LockType::Exclusive, "owner").with_ttl(Duration::from_secs(2));
let resp = ns_lock.acquire_lock(&req).await.unwrap();
assert!(resp.success);
// Cleanup
let _ = ns_lock.release_lock(&LockId::new_deterministic(&lock_id.resource)).await;
}
#[tokio::test]
async fn test_connection_health() {
let local_lock = NamespaceLock::new("test-namespace".to_string());