From e369e9f4819fff43983ce95a5037ba27fc63e408 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Sat, 9 Aug 2025 17:52:08 +0800 Subject: [PATCH] Feature: lock support auto release Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/e2e_test/src/reliant/lock.rs | 73 +++++++++++ crates/ecstore/src/set_disk.rs | 188 +++++++++++++++++++++------- crates/lock/src/guard.rs | 130 +++++++++++++++++++ crates/lock/src/lib.rs | 2 + crates/lock/src/namespace.rs | 56 +++++++++ 5 files changed, 403 insertions(+), 46 deletions(-) create mode 100644 crates/lock/src/guard.rs diff --git a/crates/e2e_test/src/reliant/lock.rs b/crates/e2e_test/src/reliant/lock.rs index 050d559e..66ab140c 100644 --- a/crates/e2e_test/src/reliant/lock.rs +++ b/crates/e2e_test/src/reliant/lock.rs @@ -38,6 +38,79 @@ fn get_cluster_endpoints() -> Vec { }] } +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_guard_drop_releases_exclusive_lock_local() -> Result<(), Box> { + // Single local client; no external server required + let client: Arc = Arc::new(LocalClient::new()); + let ns_lock = NamespaceLock::with_clients("e2e_guard_local".to_string(), vec![client]); + + // Acquire exclusive guard + let g1 = ns_lock + .lock_guard("guard_exclusive", "owner1", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + assert!(g1.is_some(), "first guard acquisition should succeed"); + + // While g1 is alive, second exclusive acquisition should fail + let g2 = ns_lock + .lock_guard("guard_exclusive", "owner2", Duration::from_millis(50), Duration::from_secs(5)) + .await?; + assert!(g2.is_none(), "second guard acquisition should fail while first is held"); + + // Drop first guard to trigger background release + drop(g1); + // Give the background unlock worker a short moment to process + sleep(Duration::from_millis(80)).await; + + // Now acquisition should succeed + let g3 = ns_lock + .lock_guard("guard_exclusive", "owner2", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + assert!(g3.is_some(), "acquisition should succeed after guard drop releases the lock"); + drop(g3); + + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_guard_shared_then_write_after_drop() -> Result<(), Box> { + // Two shared read guards should coexist; write should be blocked until they drop + let client: Arc = Arc::new(LocalClient::new()); + let ns_lock = NamespaceLock::with_clients("e2e_guard_rw".to_string(), vec![client]); + + // Acquire two read guards + let r1 = ns_lock + .rlock_guard("rw_resource", "reader1", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + let r2 = ns_lock + .rlock_guard("rw_resource", "reader2", Duration::from_millis(100), Duration::from_secs(5)) + .await?; + assert!(r1.is_some() && r2.is_some(), "both read guards should be acquired"); + + // Attempt write while readers hold the lock should fail + let w_fail = ns_lock + .lock_guard("rw_resource", "writer", Duration::from_millis(50), Duration::from_secs(5)) + .await?; + assert!(w_fail.is_none(), "write should be blocked when read guards are active"); + + // Drop read guards to release + drop(r1); + drop(r2); + sleep(Duration::from_millis(80)).await; + + // Now write should succeed + let w_ok = ns_lock + .lock_guard("rw_resource", "writer", Duration::from_millis(150), Duration::from_secs(5)) + .await?; + assert!(w_ok.is_some(), "write should succeed after read guards are dropped"); + drop(w_ok); + + Ok(()) +} + #[tokio::test] #[serial] #[ignore = "requires running RustFS server at localhost:9000"] diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 2a4d8409..8f486fd4 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -3211,6 +3211,20 @@ impl ObjectIO for SetDisks { h: HeaderMap, opts: &ObjectOptions, ) -> Result { + // Acquire a shared read-lock early to protect read consistency + let mut _read_lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _read_lock_guard = guard_opt; + } + let (fi, files, disks) = self .get_object_fileinfo(bucket, object, opts, true) .await @@ -3256,7 +3270,10 @@ impl ObjectIO for SetDisks { let object = object.to_owned(); let set_index = self.set_index; let pool_index = self.pool_index; + // Move the read-lock guard into the task so it lives for the duration of the read + let _guard_to_hold = _read_lock_guard; // moved into closure below tokio::spawn(async move { + let _guard = _guard_to_hold; // keep guard alive until task ends if let Err(e) = Self::get_object_with_fileinfo( &bucket, &object, @@ -3284,16 +3301,18 @@ impl ObjectIO for SetDisks { async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result { let disks = self.disks.read().await; + // Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop. + let mut _object_lock_guard: Option = None; if !opts.no_lock { - let paths = vec![object.to_string()]; - let lock_acquired = self + let guard_opt = self .namespace_lock - .lock_batch(&paths, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) .await?; - if !lock_acquired { + if guard_opt.is_none() { return Err(Error::other("can not get lock. please retry".to_string())); } + _object_lock_guard = guard_opt; } let mut user_defined = opts.user_defined.clone(); @@ -3500,14 +3519,6 @@ impl ObjectIO for SetDisks { self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_dir).await?; - // Release lock if it was acquired - if !opts.no_lock { - let paths = vec![object.to_string()]; - if let Err(err) = self.namespace_lock.unlock_batch(&paths, &self.locker_owner).await { - error!("Failed to unlock object {}: {}", object, err); - } - } - for (i, op_disk) in online_disks.iter().enumerate() { if let Some(disk) = op_disk { if disk.is_online().await { @@ -3583,6 +3594,19 @@ impl StorageAPI for SetDisks { return Err(StorageError::NotImplemented); } + // Guard lock for source object metadata update + let mut _lock_guard: Option = None; + { + let guard_opt = self + .namespace_lock + .lock_guard(src_object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } + let disks = self.get_disks_internal().await; let (mut metas, errs) = { @@ -3676,6 +3700,18 @@ impl StorageAPI for SetDisks { } #[tracing::instrument(skip(self))] async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> { + // Guard lock for single object delete-version + let mut _lock_guard: Option = None; + { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let disks = self.get_disks(0, 0).await?; let write_quorum = disks.len() / 2 + 1; @@ -3732,6 +3768,23 @@ impl StorageAPI for SetDisks { del_errs.push(None) } + // Per-object guards to keep until function end + let mut _guards: Vec> = Vec::with_capacity(objects.len()); + // Acquire locks for all objects first; mark errors for failures + for (i, dobj) in objects.iter().enumerate() { + match self + .namespace_lock + .lock_guard(&dobj.object_name, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await? + { + Some(g) => _guards.push(Some(g)), + None => { + del_errs[i] = Some(Error::other("can not get lock. please retry")); + _guards.push(None); + } + } + } + // let mut del_fvers = Vec::with_capacity(objects.len()); let mut vers_map: HashMap<&String, FileInfoVersions> = HashMap::new(); @@ -3788,7 +3841,10 @@ impl StorageAPI for SetDisks { } } - vers_map.insert(&dobj.object_name, v); + // Only add to vers_map if we hold the lock + if _guards[i].is_some() { + vers_map.insert(&dobj.object_name, v); + } } let mut vers = Vec::with_capacity(vers_map.len()); @@ -3830,6 +3886,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { + // Guard lock for single object delete + let mut _lock_guard: Option = None; + if !opts.delete_prefix { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } if opts.delete_prefix { self.delete_prefix(bucket, object) .await @@ -3952,33 +4020,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { - // let mut _ns = None; - // if !opts.no_lock { - // let paths = vec![object.to_string()]; - // let ns_lock = new_nslock( - // Arc::clone(&self.ns_mutex), - // self.locker_owner.clone(), - // bucket.to_string(), - // paths, - // self.lockers.clone(), - // ) - // .await; - // if !ns_lock - // .0 - // .write() - // .await - // .get_lock(&Options { - // timeout: Duration::from_secs(5), - // retry_interval: Duration::from_secs(1), - // }) - // .await - // .map_err(|err| Error::other(err.to_string()))? - // { - // return Err(Error::other("can not get lock. please retry".to_string())); - // } - - // _ns = Some(ns_lock); - // } + // Acquire a shared read-lock to protect consistency during info fetch + let mut _read_lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _read_lock_guard = guard_opt; + } let (fi, _, _) = self .get_object_fileinfo(bucket, object, opts, false) @@ -4010,6 +4063,19 @@ impl StorageAPI for SetDisks { async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { // TODO: nslock + // Guard lock for metadata update + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } + let disks = self.get_disks_internal().await; let (metas, errs) = { @@ -4100,12 +4166,18 @@ impl StorageAPI for SetDisks { } }; - /*if !opts.no_lock { - let lk = self.new_ns_lock(bucket, object); - let lkctx = lk.get_lock(globalDeleteOperationTimeout)?; - //ctx = lkctx.Context() - //defer lk.Unlock(lkctx) - }*/ + // Acquire write-lock early; hold for the whole transition operation scope + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let (mut fi, meta_arr, online_disks) = self.get_object_fileinfo(bucket, object, opts, true).await?; /*if err != nil { @@ -4223,6 +4295,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(level = "debug", skip(self))] async fn restore_transitioned_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> { + // Acquire write-lock early for the restore operation + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let set_restore_header_fn = async move |oi: &mut ObjectInfo, rerr: Option| -> Result<()> { if rerr.is_none() { return Ok(()); @@ -4296,6 +4380,18 @@ impl StorageAPI for SetDisks { #[tracing::instrument(level = "debug", skip(self))] async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result { + // Acquire write-lock for tag update (metadata write) + let mut _lock_guard: Option = None; + if !opts.no_lock { + let guard_opt = self + .namespace_lock + .lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10)) + .await?; + if guard_opt.is_none() { + return Err(Error::other("can not get lock. please retry".to_string())); + } + _lock_guard = guard_opt; + } let (mut fi, _, disks) = self.get_object_fileinfo(bucket, object, opts, false).await?; fi.metadata.insert(AMZ_OBJECT_TAGGING.to_owned(), tags.to_owned()); diff --git a/crates/lock/src/guard.rs b/crates/lock/src/guard.rs new file mode 100644 index 00000000..490c745b --- /dev/null +++ b/crates/lock/src/guard.rs @@ -0,0 +1,130 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use once_cell::sync::Lazy; +use tokio::sync::mpsc; + +use crate::{client::LockClient, types::LockId}; + +#[derive(Debug, Clone)] +struct UnlockJob { + lock_id: LockId, + clients: Vec>, // cloned Arcs; cheap and shares state +} + +#[derive(Debug)] +struct UnlockRuntime { + tx: mpsc::Sender, +} + +// Global unlock runtime with background worker +static UNLOCK_RUNTIME: Lazy = Lazy::new(|| { + // Buffered channel to avoid blocking in Drop + let (tx, mut rx) = mpsc::channel::(1024); + + // Spawn background worker when first used; assumes a Tokio runtime is available + tokio::spawn(async move { + while let Some(job) = rx.recv().await { + // Best-effort release across clients; success if any succeeds + let mut any_ok = false; + let lock_id = job.lock_id.clone(); + let futures = job + .clients + .into_iter() + .map(|client| { + let id = lock_id.clone(); + async move { client.release(&id).await.unwrap_or(false) } + }) + .collect::>(); + + let results = futures::future::join_all(futures).await; + if results.into_iter().any(|s| s) { + any_ok = true; + } + + if !any_ok { + tracing::warn!("LockGuard background release failed for {}", lock_id); + } else { + tracing::debug!("LockGuard background released {}", lock_id); + } + } + }); + + UnlockRuntime { tx } +}); + +/// A RAII guard that releases the lock asynchronously when dropped. +#[derive(Debug)] +pub struct LockGuard { + lock_id: LockId, + clients: Vec>, + /// If true, Drop will not try to release (used if user manually released). + disarmed: bool, +} + +impl LockGuard { + pub(crate) fn new(lock_id: LockId, clients: Vec>) -> Self { + Self { + lock_id, + clients, + disarmed: false, + } + } + + /// Get the lock id associated with this guard + pub fn lock_id(&self) -> &LockId { + &self.lock_id + } + + /// Manually disarm the guard so dropping it won't release the lock. + /// Call this if you explicitly released the lock elsewhere. + pub fn disarm(&mut self) { + self.disarmed = true; + } +} + +impl Drop for LockGuard { + fn drop(&mut self) { + if self.disarmed { + return; + } + + let job = UnlockJob { + lock_id: self.lock_id.clone(), + clients: self.clients.clone(), + }; + + // Try a non-blocking send to avoid panics in Drop + if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) { + // Channel full or closed; best-effort fallback: spawn a detached task + let lock_id = self.lock_id.clone(); + let clients = self.clients.clone(); + tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id); + + // If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts. + let _ = tokio::spawn(async move { + let futures = clients + .into_iter() + .map(|client| { + let id = lock_id.clone(); + async move { client.release(&id).await.unwrap_or(false) } + }) + .collect::>(); + let _ = futures::future::join_all(futures).await; + }); + } + } +} diff --git a/crates/lock/src/lib.rs b/crates/lock/src/lib.rs index d831bf89..d047668b 100644 --- a/crates/lock/src/lib.rs +++ b/crates/lock/src/lib.rs @@ -27,6 +27,7 @@ pub mod local; // Core Modules pub mod error; +pub mod guard; pub mod types; // ============================================================================ @@ -39,6 +40,7 @@ pub use crate::{ client::{LockClient, local::LocalClient, remote::RemoteClient}, // Error types error::{LockError, Result}, + guard::LockGuard, local::LocalLockMap, // Main components namespace::{NamespaceLock, NamespaceLockManager}, diff --git a/crates/lock/src/namespace.rs b/crates/lock/src/namespace.rs index 71135e65..630b22e8 100644 --- a/crates/lock/src/namespace.rs +++ b/crates/lock/src/namespace.rs @@ -19,6 +19,7 @@ use std::time::Duration; use crate::{ client::LockClient, error::{LockError, Result}, + guard::LockGuard, types::{LockId, LockInfo, LockRequest, LockResponse, LockStatus, LockType}, }; @@ -90,6 +91,34 @@ impl NamespaceLock { self.acquire_lock_with_2pc(request).await } + /// Acquire a lock and return a RAII guard that will release asynchronously on Drop. + /// This is a thin wrapper around `acquire_lock` and will only create a guard when acquisition succeeds. + pub async fn acquire_guard(&self, request: &LockRequest) -> Result> { + let resp = self.acquire_lock(request).await?; + if resp.success { + let guard = LockGuard::new(LockId::new_deterministic(&request.resource), self.clients.clone()); + Ok(Some(guard)) + } else { + Ok(None) + } + } + + /// Convenience: acquire exclusive lock as a guard + pub async fn lock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result> { + let req = LockRequest::new(self.get_resource_key(resource), LockType::Exclusive, owner) + .with_acquire_timeout(timeout) + .with_ttl(ttl); + self.acquire_guard(&req).await + } + + /// Convenience: acquire shared lock as a guard + pub async fn rlock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result> { + let req = LockRequest::new(self.get_resource_key(resource), LockType::Shared, owner) + .with_acquire_timeout(timeout) + .with_ttl(ttl); + self.acquire_guard(&req).await + } + /// Two-phase commit lock acquisition: all nodes must succeed or all fail async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result { // Phase 1: Prepare - try to acquire lock on all clients @@ -420,6 +449,33 @@ mod tests { assert!(result.is_ok()); } + #[tokio::test] + async fn test_guard_acquire_and_drop_release() { + let ns_lock = NamespaceLock::with_client(Arc::new(LocalClient::new())); + + // Acquire guard + let guard = ns_lock + .lock_guard("guard-resource", "owner", Duration::from_millis(100), Duration::from_secs(5)) + .await + .unwrap(); + assert!(guard.is_some()); + let lock_id = guard.as_ref().unwrap().lock_id().clone(); + + // Drop guard to trigger background release + drop(guard); + + // Give background worker a moment to process + tokio::time::sleep(Duration::from_millis(50)).await; + + // Re-acquire should succeed (previous lock released) + let req = LockRequest::new(&lock_id.resource, LockType::Exclusive, "owner").with_ttl(Duration::from_secs(2)); + let resp = ns_lock.acquire_lock(&req).await.unwrap(); + assert!(resp.success); + + // Cleanup + let _ = ns_lock.release_lock(&LockId::new_deterministic(&lock_id.resource)).await; + } + #[tokio::test] async fn test_connection_health() { let local_lock = NamespaceLock::new("test-namespace".to_string());