mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
@@ -133,8 +133,14 @@ impl HealStorageAPI for ECStoreHealStorage {
|
||||
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
|
||||
Ok(info) => Ok(Some(info)),
|
||||
Err(e) => {
|
||||
error!("Failed to get object meta: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
// Map ObjectNotFound to None to align with Option return type
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!("Object meta not found: {}/{}", bucket, object);
|
||||
Ok(None)
|
||||
} else {
|
||||
error!("Failed to get object meta: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -154,8 +160,13 @@ impl HealStorageAPI for ECStoreHealStorage {
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to get object: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!("Object data not found: {}/{}", bucket, object);
|
||||
Ok(None)
|
||||
} else {
|
||||
error!("Failed to get object: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use ecstore::{
|
||||
set_disk::SetDisks,
|
||||
};
|
||||
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
|
||||
use rustfs_filemeta::MetacacheReader;
|
||||
use rustfs_filemeta::{MetacacheReader, VersionType};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
@@ -432,8 +432,27 @@ impl Scanner {
|
||||
}
|
||||
|
||||
if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() {
|
||||
// First try the standard integrity check
|
||||
// First check whether the object still logically exists.
|
||||
// If it's already deleted (e.g., non-versioned bucket), do not trigger heal.
|
||||
let object_opts = ecstore::store_api::ObjectOptions::default();
|
||||
match ecstore.get_object_info(bucket, object, &object_opts).await {
|
||||
Ok(_) => {
|
||||
// Object exists logically, continue with verification below
|
||||
}
|
||||
Err(e) => {
|
||||
if matches!(e, ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!(
|
||||
"Object {}/{} not found logically (likely deleted), skip integrity check & heal",
|
||||
bucket, object
|
||||
);
|
||||
return Ok(());
|
||||
} else {
|
||||
debug!("get_object_info error for {}/{}: {}", bucket, object, e);
|
||||
// Fall through to existing logic which will handle accordingly
|
||||
}
|
||||
}
|
||||
}
|
||||
// First try the standard integrity check
|
||||
let mut integrity_failed = false;
|
||||
|
||||
debug!("Running standard object verification for {}/{}", bucket, object);
|
||||
@@ -1398,8 +1417,64 @@ impl Scanner {
|
||||
let empty_vec = Vec::new();
|
||||
let locations = object_locations.get(&key).unwrap_or(&empty_vec);
|
||||
|
||||
// If any disk reports this object as a latest delete marker (tombstone),
|
||||
// it's a legitimate deletion. Skip missing-object heal to avoid recreating
|
||||
// deleted objects. Optional: a metadata heal could be submitted to fan-out
|
||||
// the delete marker, but we keep it conservative here.
|
||||
let mut has_latest_delete_marker = false;
|
||||
for &disk_idx in locations {
|
||||
if let Some(bucket_map) = all_disk_objects.get(disk_idx) {
|
||||
if let Some(file_map) = bucket_map.get(bucket) {
|
||||
if let Some(fm) = file_map.get(object_name) {
|
||||
if let Some(first_ver) = fm.versions.first() {
|
||||
if first_ver.header.version_type == VersionType::Delete {
|
||||
has_latest_delete_marker = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if has_latest_delete_marker {
|
||||
debug!(
|
||||
"Object {}/{} is a delete marker on some disk(s), skipping heal for missing parts",
|
||||
bucket, object_name
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if object is missing from some disks
|
||||
if locations.len() < disks.len() {
|
||||
// Before submitting heal, confirm the object still exists logically.
|
||||
let should_heal = if let Some(store) = rustfs_ecstore::new_object_layer_fn() {
|
||||
match store.get_object_info(bucket, object_name, &Default::default()).await {
|
||||
Ok(_) => true, // exists -> propagate by heal
|
||||
Err(e) => {
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!(
|
||||
"Object {}/{} not found logically (deleted), skip missing-disks heal",
|
||||
bucket, object_name
|
||||
);
|
||||
false
|
||||
} else {
|
||||
debug!(
|
||||
"Object {}/{} get_object_info errored ({}), conservatively skip heal",
|
||||
bucket, object_name, e
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No store available; be conservative and skip to avoid recreating deletions
|
||||
debug!("No ECStore available to confirm existence, skip heal for {}/{}", bucket, object_name);
|
||||
false
|
||||
};
|
||||
|
||||
if !should_heal {
|
||||
continue;
|
||||
}
|
||||
objects_needing_heal += 1;
|
||||
let missing_disks: Vec<usize> = (0..disks.len()).filter(|&i| !locations.contains(&i)).collect();
|
||||
warn!("Object {}/{} missing from disks: {:?}", bucket, object_name, missing_disks);
|
||||
|
||||
@@ -38,6 +38,79 @@ fn get_cluster_endpoints() -> Vec<Endpoint> {
|
||||
}]
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
#[ignore = "requires running RustFS server at localhost:9000"]
|
||||
async fn test_guard_drop_releases_exclusive_lock_local() -> Result<(), Box<dyn Error>> {
|
||||
// Single local client; no external server required
|
||||
let client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let ns_lock = NamespaceLock::with_clients("e2e_guard_local".to_string(), vec![client]);
|
||||
|
||||
// Acquire exclusive guard
|
||||
let g1 = ns_lock
|
||||
.lock_guard("guard_exclusive", "owner1", Duration::from_millis(100), Duration::from_secs(5))
|
||||
.await?;
|
||||
assert!(g1.is_some(), "first guard acquisition should succeed");
|
||||
|
||||
// While g1 is alive, second exclusive acquisition should fail
|
||||
let g2 = ns_lock
|
||||
.lock_guard("guard_exclusive", "owner2", Duration::from_millis(50), Duration::from_secs(5))
|
||||
.await?;
|
||||
assert!(g2.is_none(), "second guard acquisition should fail while first is held");
|
||||
|
||||
// Drop first guard to trigger background release
|
||||
drop(g1);
|
||||
// Give the background unlock worker a short moment to process
|
||||
sleep(Duration::from_millis(80)).await;
|
||||
|
||||
// Now acquisition should succeed
|
||||
let g3 = ns_lock
|
||||
.lock_guard("guard_exclusive", "owner2", Duration::from_millis(100), Duration::from_secs(5))
|
||||
.await?;
|
||||
assert!(g3.is_some(), "acquisition should succeed after guard drop releases the lock");
|
||||
drop(g3);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
#[ignore = "requires running RustFS server at localhost:9000"]
|
||||
async fn test_guard_shared_then_write_after_drop() -> Result<(), Box<dyn Error>> {
|
||||
// Two shared read guards should coexist; write should be blocked until they drop
|
||||
let client: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let ns_lock = NamespaceLock::with_clients("e2e_guard_rw".to_string(), vec![client]);
|
||||
|
||||
// Acquire two read guards
|
||||
let r1 = ns_lock
|
||||
.rlock_guard("rw_resource", "reader1", Duration::from_millis(100), Duration::from_secs(5))
|
||||
.await?;
|
||||
let r2 = ns_lock
|
||||
.rlock_guard("rw_resource", "reader2", Duration::from_millis(100), Duration::from_secs(5))
|
||||
.await?;
|
||||
assert!(r1.is_some() && r2.is_some(), "both read guards should be acquired");
|
||||
|
||||
// Attempt write while readers hold the lock should fail
|
||||
let w_fail = ns_lock
|
||||
.lock_guard("rw_resource", "writer", Duration::from_millis(50), Duration::from_secs(5))
|
||||
.await?;
|
||||
assert!(w_fail.is_none(), "write should be blocked when read guards are active");
|
||||
|
||||
// Drop read guards to release
|
||||
drop(r1);
|
||||
drop(r2);
|
||||
sleep(Duration::from_millis(80)).await;
|
||||
|
||||
// Now write should succeed
|
||||
let w_ok = ns_lock
|
||||
.lock_guard("rw_resource", "writer", Duration::from_millis(150), Duration::from_secs(5))
|
||||
.await?;
|
||||
assert!(w_ok.is_some(), "write should succeed after read guards are dropped");
|
||||
drop(w_ok);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
#[ignore = "requires running RustFS server at localhost:9000"]
|
||||
|
||||
@@ -3211,6 +3211,20 @@ impl ObjectIO for SetDisks {
|
||||
h: HeaderMap,
|
||||
opts: &ObjectOptions,
|
||||
) -> Result<GetObjectReader> {
|
||||
// Acquire a shared read-lock early to protect read consistency
|
||||
let mut _read_lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_read_lock_guard = guard_opt;
|
||||
}
|
||||
|
||||
let (fi, files, disks) = self
|
||||
.get_object_fileinfo(bucket, object, opts, true)
|
||||
.await
|
||||
@@ -3256,7 +3270,10 @@ impl ObjectIO for SetDisks {
|
||||
let object = object.to_owned();
|
||||
let set_index = self.set_index;
|
||||
let pool_index = self.pool_index;
|
||||
// Move the read-lock guard into the task so it lives for the duration of the read
|
||||
let _guard_to_hold = _read_lock_guard; // moved into closure below
|
||||
tokio::spawn(async move {
|
||||
let _guard = _guard_to_hold; // keep guard alive until task ends
|
||||
if let Err(e) = Self::get_object_with_fileinfo(
|
||||
&bucket,
|
||||
&object,
|
||||
@@ -3284,16 +3301,18 @@ impl ObjectIO for SetDisks {
|
||||
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
let disks = self.disks.read().await;
|
||||
|
||||
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
|
||||
let mut _object_lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let paths = vec![object.to_string()];
|
||||
let lock_acquired = self
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_batch(&paths, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
|
||||
if !lock_acquired {
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_object_lock_guard = guard_opt;
|
||||
}
|
||||
|
||||
let mut user_defined = opts.user_defined.clone();
|
||||
@@ -3500,14 +3519,6 @@ impl ObjectIO for SetDisks {
|
||||
|
||||
self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_dir).await?;
|
||||
|
||||
// Release lock if it was acquired
|
||||
if !opts.no_lock {
|
||||
let paths = vec![object.to_string()];
|
||||
if let Err(err) = self.namespace_lock.unlock_batch(&paths, &self.locker_owner).await {
|
||||
error!("Failed to unlock object {}: {}", object, err);
|
||||
}
|
||||
}
|
||||
|
||||
for (i, op_disk) in online_disks.iter().enumerate() {
|
||||
if let Some(disk) = op_disk {
|
||||
if disk.is_online().await {
|
||||
@@ -3583,6 +3594,19 @@ impl StorageAPI for SetDisks {
|
||||
return Err(StorageError::NotImplemented);
|
||||
}
|
||||
|
||||
// Guard lock for source object metadata update
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
{
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(src_object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
let (mut metas, errs) = {
|
||||
@@ -3676,6 +3700,18 @@ impl StorageAPI for SetDisks {
|
||||
}
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn delete_object_version(&self, bucket: &str, object: &str, fi: &FileInfo, force_del_marker: bool) -> Result<()> {
|
||||
// Guard lock for single object delete-version
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
{
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
let disks = self.get_disks(0, 0).await?;
|
||||
let write_quorum = disks.len() / 2 + 1;
|
||||
|
||||
@@ -3732,6 +3768,23 @@ impl StorageAPI for SetDisks {
|
||||
del_errs.push(None)
|
||||
}
|
||||
|
||||
// Per-object guards to keep until function end
|
||||
let mut _guards: Vec<Option<rustfs_lock::LockGuard>> = Vec::with_capacity(objects.len());
|
||||
// Acquire locks for all objects first; mark errors for failures
|
||||
for (i, dobj) in objects.iter().enumerate() {
|
||||
match self
|
||||
.namespace_lock
|
||||
.lock_guard(&dobj.object_name, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?
|
||||
{
|
||||
Some(g) => _guards.push(Some(g)),
|
||||
None => {
|
||||
del_errs[i] = Some(Error::other("can not get lock. please retry"));
|
||||
_guards.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// let mut del_fvers = Vec::with_capacity(objects.len());
|
||||
|
||||
let mut vers_map: HashMap<&String, FileInfoVersions> = HashMap::new();
|
||||
@@ -3788,7 +3841,10 @@ impl StorageAPI for SetDisks {
|
||||
}
|
||||
}
|
||||
|
||||
vers_map.insert(&dobj.object_name, v);
|
||||
// Only add to vers_map if we hold the lock
|
||||
if _guards[i].is_some() {
|
||||
vers_map.insert(&dobj.object_name, v);
|
||||
}
|
||||
}
|
||||
|
||||
let mut vers = Vec::with_capacity(vers_map.len());
|
||||
@@ -3830,6 +3886,18 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
|
||||
// Guard lock for single object delete
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.delete_prefix {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
if opts.delete_prefix {
|
||||
self.delete_prefix(bucket, object)
|
||||
.await
|
||||
@@ -3952,33 +4020,18 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
// let mut _ns = None;
|
||||
// if !opts.no_lock {
|
||||
// let paths = vec![object.to_string()];
|
||||
// let ns_lock = new_nslock(
|
||||
// Arc::clone(&self.ns_mutex),
|
||||
// self.locker_owner.clone(),
|
||||
// bucket.to_string(),
|
||||
// paths,
|
||||
// self.lockers.clone(),
|
||||
// )
|
||||
// .await;
|
||||
// if !ns_lock
|
||||
// .0
|
||||
// .write()
|
||||
// .await
|
||||
// .get_lock(&Options {
|
||||
// timeout: Duration::from_secs(5),
|
||||
// retry_interval: Duration::from_secs(1),
|
||||
// })
|
||||
// .await
|
||||
// .map_err(|err| Error::other(err.to_string()))?
|
||||
// {
|
||||
// return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
// }
|
||||
|
||||
// _ns = Some(ns_lock);
|
||||
// }
|
||||
// Acquire a shared read-lock to protect consistency during info fetch
|
||||
let mut _read_lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.rlock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_read_lock_guard = guard_opt;
|
||||
}
|
||||
|
||||
let (fi, _, _) = self
|
||||
.get_object_fileinfo(bucket, object, opts, false)
|
||||
@@ -4010,6 +4063,19 @@ impl StorageAPI for SetDisks {
|
||||
async fn put_object_metadata(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
// TODO: nslock
|
||||
|
||||
// Guard lock for metadata update
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
let (metas, errs) = {
|
||||
@@ -4100,12 +4166,18 @@ impl StorageAPI for SetDisks {
|
||||
}
|
||||
};
|
||||
|
||||
/*if !opts.no_lock {
|
||||
let lk = self.new_ns_lock(bucket, object);
|
||||
let lkctx = lk.get_lock(globalDeleteOperationTimeout)?;
|
||||
//ctx = lkctx.Context()
|
||||
//defer lk.Unlock(lkctx)
|
||||
}*/
|
||||
// Acquire write-lock early; hold for the whole transition operation scope
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
|
||||
let (mut fi, meta_arr, online_disks) = self.get_object_fileinfo(bucket, object, opts, true).await?;
|
||||
/*if err != nil {
|
||||
@@ -4223,6 +4295,18 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn restore_transitioned_object(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
|
||||
// Acquire write-lock early for the restore operation
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
let set_restore_header_fn = async move |oi: &mut ObjectInfo, rerr: Option<Error>| -> Result<()> {
|
||||
if rerr.is_none() {
|
||||
return Ok(());
|
||||
@@ -4296,6 +4380,18 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
// Acquire write-lock for tag update (metadata write)
|
||||
let mut _lock_guard: Option<rustfs_lock::LockGuard> = None;
|
||||
if !opts.no_lock {
|
||||
let guard_opt = self
|
||||
.namespace_lock
|
||||
.lock_guard(object, &self.locker_owner, Duration::from_secs(5), Duration::from_secs(10))
|
||||
.await?;
|
||||
if guard_opt.is_none() {
|
||||
return Err(Error::other("can not get lock. please retry".to_string()));
|
||||
}
|
||||
_lock_guard = guard_opt;
|
||||
}
|
||||
let (mut fi, _, disks) = self.get_object_fileinfo(bucket, object, opts, false).await?;
|
||||
|
||||
fi.metadata.insert(AMZ_OBJECT_TAGGING.to_owned(), tags.to_owned());
|
||||
|
||||
@@ -165,7 +165,13 @@ impl Sets {
|
||||
|
||||
let lock_clients = create_unique_clients(&set_endpoints).await?;
|
||||
|
||||
let namespace_lock = rustfs_lock::NamespaceLock::with_clients(format!("set-{i}"), lock_clients);
|
||||
// Bind lock quorum to EC write quorum for this set: data_shards (+1 if equal to parity) per default_write_quorum()
|
||||
let mut write_quorum = set_drive_count - parity_count;
|
||||
if write_quorum == parity_count {
|
||||
write_quorum += 1;
|
||||
}
|
||||
let namespace_lock =
|
||||
rustfs_lock::NamespaceLock::with_clients_and_quorum(format!("set-{i}"), lock_clients, write_quorum);
|
||||
|
||||
let set_disks = SetDisks::new(
|
||||
Arc::new(namespace_lock),
|
||||
|
||||
120
crates/lock/src/guard.rs
Normal file
120
crates/lock/src/guard.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{client::LockClient, types::LockId};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct UnlockJob {
|
||||
lock_id: LockId,
|
||||
clients: Vec<Arc<dyn LockClient>>, // cloned Arcs; cheap and shares state
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UnlockRuntime {
|
||||
tx: mpsc::Sender<UnlockJob>,
|
||||
}
|
||||
|
||||
// Global unlock runtime with background worker
|
||||
static UNLOCK_RUNTIME: Lazy<UnlockRuntime> = Lazy::new(|| {
|
||||
// Larger buffer to reduce contention during bursts
|
||||
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
|
||||
|
||||
// Spawn background worker when first used; assumes a Tokio runtime is available
|
||||
tokio::spawn(async move {
|
||||
while let Some(job) = rx.recv().await {
|
||||
// Best-effort release across clients; try all, success if any succeeds
|
||||
let mut any_ok = false;
|
||||
let lock_id = job.lock_id.clone();
|
||||
for client in job.clients.into_iter() {
|
||||
if client.release(&lock_id).await.unwrap_or(false) {
|
||||
any_ok = true;
|
||||
}
|
||||
}
|
||||
if !any_ok {
|
||||
tracing::warn!("LockGuard background release failed for {}", lock_id);
|
||||
} else {
|
||||
tracing::debug!("LockGuard background released {}", lock_id);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
UnlockRuntime { tx }
|
||||
});
|
||||
|
||||
/// A RAII guard that releases the lock asynchronously when dropped.
|
||||
#[derive(Debug)]
|
||||
pub struct LockGuard {
|
||||
lock_id: LockId,
|
||||
clients: Vec<Arc<dyn LockClient>>,
|
||||
/// If true, Drop will not try to release (used if user manually released).
|
||||
disarmed: bool,
|
||||
}
|
||||
|
||||
impl LockGuard {
|
||||
pub(crate) fn new(lock_id: LockId, clients: Vec<Arc<dyn LockClient>>) -> Self {
|
||||
Self {
|
||||
lock_id,
|
||||
clients,
|
||||
disarmed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the lock id associated with this guard
|
||||
pub fn lock_id(&self) -> &LockId {
|
||||
&self.lock_id
|
||||
}
|
||||
|
||||
/// Manually disarm the guard so dropping it won't release the lock.
|
||||
/// Call this if you explicitly released the lock elsewhere.
|
||||
pub fn disarm(&mut self) {
|
||||
self.disarmed = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LockGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.disarmed {
|
||||
return;
|
||||
}
|
||||
|
||||
let job = UnlockJob {
|
||||
lock_id: self.lock_id.clone(),
|
||||
clients: self.clients.clone(),
|
||||
};
|
||||
|
||||
// Try a non-blocking send to avoid panics in Drop
|
||||
if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
|
||||
// Channel full or closed; best-effort fallback: spawn a detached task
|
||||
let lock_id = self.lock_id.clone();
|
||||
let clients = self.clients.clone();
|
||||
tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id);
|
||||
|
||||
// If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts.
|
||||
let handle = tokio::spawn(async move {
|
||||
let futures_iter = clients.into_iter().map(|client| {
|
||||
let id = lock_id.clone();
|
||||
async move { client.release(&id).await.unwrap_or(false) }
|
||||
});
|
||||
let _ = futures::future::join_all(futures_iter).await;
|
||||
});
|
||||
// Explicitly drop the JoinHandle to acknowledge detaching the task.
|
||||
std::mem::drop(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,7 @@ pub mod local;
|
||||
|
||||
// Core Modules
|
||||
pub mod error;
|
||||
pub mod guard;
|
||||
pub mod types;
|
||||
|
||||
// ============================================================================
|
||||
@@ -39,6 +40,7 @@ pub use crate::{
|
||||
client::{LockClient, local::LocalClient, remote::RemoteClient},
|
||||
// Error types
|
||||
error::{LockError, Result},
|
||||
guard::LockGuard,
|
||||
local::LocalLockMap,
|
||||
// Main components
|
||||
namespace::{NamespaceLock, NamespaceLockManager},
|
||||
|
||||
@@ -12,11 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{Mutex, Notify, RwLock};
|
||||
|
||||
use crate::LockRequest;
|
||||
|
||||
@@ -29,6 +29,11 @@ pub struct LocalLockEntry {
|
||||
pub readers: HashMap<String, usize>,
|
||||
/// lock expiration time
|
||||
pub expires_at: Option<Instant>,
|
||||
/// number of writers waiting (for simple fairness against reader storms)
|
||||
pub writer_pending: usize,
|
||||
/// notifiers for readers/writers
|
||||
pub notify_readers: Arc<Notify>,
|
||||
pub notify_writers: Arc<Notify>,
|
||||
}
|
||||
|
||||
/// local lock map
|
||||
@@ -38,6 +43,10 @@ pub struct LocalLockMap {
|
||||
pub locks: Arc<RwLock<HashMap<crate::types::LockId, Arc<RwLock<LocalLockEntry>>>>>,
|
||||
/// Shutdown flag for background tasks
|
||||
shutdown: Arc<AtomicBool>,
|
||||
/// expiration schedule map: when -> lock_ids
|
||||
expirations: Arc<Mutex<BTreeMap<Instant, Vec<crate::types::LockId>>>>,
|
||||
/// notify expiry task when new earlier deadline arrives
|
||||
exp_notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl Default for LocalLockMap {
|
||||
@@ -52,6 +61,8 @@ impl LocalLockMap {
|
||||
let map = Self {
|
||||
locks: Arc::new(RwLock::new(HashMap::new())),
|
||||
shutdown: Arc::new(AtomicBool::new(false)),
|
||||
expirations: Arc::new(Mutex::new(BTreeMap::new())),
|
||||
exp_notify: Arc::new(Notify::new()),
|
||||
};
|
||||
map.spawn_expiry_task();
|
||||
map
|
||||
@@ -61,56 +72,115 @@ impl LocalLockMap {
|
||||
fn spawn_expiry_task(&self) {
|
||||
let locks = self.locks.clone();
|
||||
let shutdown = self.shutdown.clone();
|
||||
let expirations = self.expirations.clone();
|
||||
let exp_notify = self.exp_notify.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(1));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
if shutdown.load(Ordering::Relaxed) {
|
||||
tracing::debug!("Expiry task shutting down");
|
||||
break;
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let mut to_remove = Vec::new();
|
||||
// Find next deadline and drain due ids
|
||||
let (due_ids, wait_duration) = {
|
||||
let mut due = Vec::new();
|
||||
let mut guard = expirations.lock().await;
|
||||
let now = Instant::now();
|
||||
let next_deadline = guard.first_key_value().map(|(k, _)| *k);
|
||||
// drain all <= now
|
||||
let mut keys_to_remove = Vec::new();
|
||||
for (k, v) in guard.range(..=now).map(|(k, v)| (*k, v.clone())) {
|
||||
due.extend(v);
|
||||
keys_to_remove.push(k);
|
||||
}
|
||||
for k in keys_to_remove {
|
||||
guard.remove(&k);
|
||||
}
|
||||
let wait = if due.is_empty() {
|
||||
next_deadline.map(|dl| if dl > now { dl - now } else { Duration::from_millis(0) })
|
||||
} else {
|
||||
Some(Duration::from_millis(0))
|
||||
};
|
||||
(due, wait)
|
||||
};
|
||||
|
||||
{
|
||||
let locks_guard = locks.read().await;
|
||||
for (key, entry) in locks_guard.iter() {
|
||||
if let Ok(mut entry_guard) = entry.try_write() {
|
||||
if let Some(exp) = entry_guard.expires_at {
|
||||
if exp <= now {
|
||||
entry_guard.writer = None;
|
||||
entry_guard.readers.clear();
|
||||
entry_guard.expires_at = None;
|
||||
if !due_ids.is_empty() {
|
||||
// process due ids without holding the map lock during awaits
|
||||
let now = Instant::now();
|
||||
// collect entries to process
|
||||
let entries: Vec<(crate::types::LockId, Arc<RwLock<LocalLockEntry>>)> = {
|
||||
let locks_guard = locks.read().await;
|
||||
due_ids
|
||||
.into_iter()
|
||||
.filter_map(|id| locks_guard.get(&id).cloned().map(|e| (id, e)))
|
||||
.collect()
|
||||
};
|
||||
|
||||
if entry_guard.writer.is_none() && entry_guard.readers.is_empty() {
|
||||
to_remove.push(key.clone());
|
||||
}
|
||||
let mut to_remove = Vec::new();
|
||||
for (lock_id, entry) in entries {
|
||||
let mut entry_guard = entry.write().await;
|
||||
if let Some(exp) = entry_guard.expires_at {
|
||||
if exp <= now {
|
||||
entry_guard.writer = None;
|
||||
entry_guard.readers.clear();
|
||||
entry_guard.expires_at = None;
|
||||
entry_guard.notify_writers.notify_waiters();
|
||||
entry_guard.notify_readers.notify_waiters();
|
||||
if entry_guard.writer.is_none() && entry_guard.readers.is_empty() {
|
||||
to_remove.push(lock_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !to_remove.is_empty() {
|
||||
let mut locks_w = locks.write().await;
|
||||
for id in to_remove {
|
||||
let _ = locks_w.remove(&id);
|
||||
}
|
||||
}
|
||||
continue; // immediately look for next
|
||||
}
|
||||
|
||||
if !to_remove.is_empty() {
|
||||
let mut locks_guard = locks.write().await;
|
||||
for key in to_remove {
|
||||
locks_guard.remove(&key);
|
||||
// nothing due; wait for next deadline or notification
|
||||
if let Some(dur) = wait_duration {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(dur) => {},
|
||||
_ = exp_notify.notified() => {},
|
||||
}
|
||||
} else {
|
||||
// no deadlines, wait for new schedule or shutdown tick
|
||||
exp_notify.notified().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// schedule an expiry time for the given lock id (inline, avoid per-acquisition spawn)
|
||||
async fn schedule_expiry(&self, id: crate::types::LockId, exp: Instant) {
|
||||
let mut guard = self.expirations.lock().await;
|
||||
let is_earliest = match guard.first_key_value() {
|
||||
Some((k, _)) => exp < *k,
|
||||
None => true,
|
||||
};
|
||||
guard.entry(exp).or_insert_with(Vec::new).push(id);
|
||||
drop(guard);
|
||||
if is_earliest {
|
||||
self.exp_notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
/// write lock with TTL, support timeout, use LockRequest
|
||||
pub async fn lock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result<bool> {
|
||||
let start = Instant::now();
|
||||
let expires_at = Some(Instant::now() + request.ttl);
|
||||
|
||||
loop {
|
||||
// get or create lock entry
|
||||
let entry = {
|
||||
// get or create lock entry (double-checked to reduce write-lock contention)
|
||||
let entry = if let Some(e) = {
|
||||
let locks_guard = self.locks.read().await;
|
||||
locks_guard.get(&request.lock_id).cloned()
|
||||
} {
|
||||
e
|
||||
} else {
|
||||
let mut locks_guard = self.locks.write().await;
|
||||
locks_guard
|
||||
.entry(request.lock_id.clone())
|
||||
@@ -119,13 +189,17 @@ impl LocalLockMap {
|
||||
writer: None,
|
||||
readers: HashMap::new(),
|
||||
expires_at: None,
|
||||
writer_pending: 0,
|
||||
notify_readers: Arc::new(Notify::new()),
|
||||
notify_writers: Arc::new(Notify::new()),
|
||||
}))
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
// try to get write lock to modify state
|
||||
if let Ok(mut entry_guard) = entry.try_write() {
|
||||
// attempt acquisition or wait using Notify
|
||||
let notify_to_wait = {
|
||||
let mut entry_guard = entry.write().await;
|
||||
// check expired state
|
||||
let now = Instant::now();
|
||||
if let Some(exp) = entry_guard.expires_at {
|
||||
@@ -136,30 +210,68 @@ impl LocalLockMap {
|
||||
}
|
||||
}
|
||||
|
||||
// check if can get write lock
|
||||
// try acquire
|
||||
if entry_guard.writer.is_none() && entry_guard.readers.is_empty() {
|
||||
entry_guard.writer = Some(request.owner.clone());
|
||||
entry_guard.expires_at = expires_at;
|
||||
let expires_at = Instant::now() + request.ttl;
|
||||
entry_guard.expires_at = Some(expires_at);
|
||||
tracing::debug!("Write lock acquired for resource '{}' by owner '{}'", request.resource, request.owner);
|
||||
{
|
||||
drop(entry_guard);
|
||||
self.schedule_expiry(request.lock_id.clone(), expires_at).await;
|
||||
}
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
// couldn't acquire now, mark as pending writer and choose notifier
|
||||
entry_guard.writer_pending = entry_guard.writer_pending.saturating_add(1);
|
||||
entry_guard.notify_writers.clone()
|
||||
};
|
||||
|
||||
if start.elapsed() >= request.acquire_timeout {
|
||||
// wait with remaining timeout
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed >= request.acquire_timeout {
|
||||
// best-effort decrement pending counter
|
||||
if let Ok(mut eg) = entry.try_write() {
|
||||
eg.writer_pending = eg.writer_pending.saturating_sub(1);
|
||||
} else {
|
||||
let mut eg = entry.write().await;
|
||||
eg.writer_pending = eg.writer_pending.saturating_sub(1);
|
||||
}
|
||||
return Ok(false);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
let remaining = request.acquire_timeout - elapsed;
|
||||
if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() {
|
||||
// timeout; decrement pending before returning
|
||||
if let Ok(mut eg) = entry.try_write() {
|
||||
eg.writer_pending = eg.writer_pending.saturating_sub(1);
|
||||
} else {
|
||||
let mut eg = entry.write().await;
|
||||
eg.writer_pending = eg.writer_pending.saturating_sub(1);
|
||||
}
|
||||
return Ok(false);
|
||||
}
|
||||
// woke up; decrement pending before retrying
|
||||
if let Ok(mut eg) = entry.try_write() {
|
||||
eg.writer_pending = eg.writer_pending.saturating_sub(1);
|
||||
} else {
|
||||
let mut eg = entry.write().await;
|
||||
eg.writer_pending = eg.writer_pending.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// read lock with TTL, support timeout, use LockRequest
|
||||
pub async fn rlock_with_ttl_id(&self, request: &LockRequest) -> std::io::Result<bool> {
|
||||
let start = Instant::now();
|
||||
let expires_at = Some(Instant::now() + request.ttl);
|
||||
|
||||
loop {
|
||||
// get or create lock entry
|
||||
let entry = {
|
||||
// get or create lock entry (double-checked to reduce write-lock contention)
|
||||
let entry = if let Some(e) = {
|
||||
let locks_guard = self.locks.read().await;
|
||||
locks_guard.get(&request.lock_id).cloned()
|
||||
} {
|
||||
e
|
||||
} else {
|
||||
let mut locks_guard = self.locks.write().await;
|
||||
locks_guard
|
||||
.entry(request.lock_id.clone())
|
||||
@@ -168,13 +280,17 @@ impl LocalLockMap {
|
||||
writer: None,
|
||||
readers: HashMap::new(),
|
||||
expires_at: None,
|
||||
writer_pending: 0,
|
||||
notify_readers: Arc::new(Notify::new()),
|
||||
notify_writers: Arc::new(Notify::new()),
|
||||
}))
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
// try to get write lock to modify state
|
||||
if let Ok(mut entry_guard) = entry.try_write() {
|
||||
// attempt acquisition or wait using Notify
|
||||
let notify_to_wait = {
|
||||
let mut entry_guard = entry.write().await;
|
||||
// check expired state
|
||||
let now = Instant::now();
|
||||
if let Some(exp) = entry_guard.expires_at {
|
||||
@@ -185,189 +301,247 @@ impl LocalLockMap {
|
||||
}
|
||||
}
|
||||
|
||||
// check if can get read lock
|
||||
if entry_guard.writer.is_none() {
|
||||
// increase read lock count
|
||||
if entry_guard.writer.is_none() && entry_guard.writer_pending == 0 {
|
||||
*entry_guard.readers.entry(request.owner.clone()).or_insert(0) += 1;
|
||||
entry_guard.expires_at = expires_at;
|
||||
let expires_at = Instant::now() + request.ttl;
|
||||
entry_guard.expires_at = Some(expires_at);
|
||||
tracing::debug!("Read lock acquired for resource '{}' by owner '{}'", request.resource, request.owner);
|
||||
{
|
||||
drop(entry_guard);
|
||||
self.schedule_expiry(request.lock_id.clone(), expires_at).await;
|
||||
}
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
if start.elapsed() >= request.acquire_timeout {
|
||||
// choose notifier: prefer waiting on writers if writers pending, else readers
|
||||
if entry_guard.writer_pending > 0 {
|
||||
entry_guard.notify_writers.clone()
|
||||
} else {
|
||||
entry_guard.notify_readers.clone()
|
||||
}
|
||||
};
|
||||
|
||||
// wait with remaining timeout
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed >= request.acquire_timeout {
|
||||
return Ok(false);
|
||||
}
|
||||
let remaining = request.acquire_timeout - elapsed;
|
||||
if tokio::time::timeout(remaining, notify_to_wait.notified()).await.is_err() {
|
||||
return Ok(false);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// unlock by LockId and owner - need to specify owner to correctly unlock
|
||||
pub async fn unlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> {
|
||||
println!("Unlocking lock_id: {lock_id:?}, owner: {owner}");
|
||||
let mut need_remove = false;
|
||||
|
||||
{
|
||||
// first, get the entry without holding the write lock on the map
|
||||
let entry = {
|
||||
let locks_guard = self.locks.read().await;
|
||||
if let Some(entry) = locks_guard.get(lock_id) {
|
||||
println!("Found lock entry, attempting to acquire write lock...");
|
||||
match entry.try_write() {
|
||||
Ok(mut entry_guard) => {
|
||||
println!("Successfully acquired write lock for unlock");
|
||||
// try to release write lock
|
||||
if entry_guard.writer.as_ref() == Some(&owner.to_string()) {
|
||||
println!("Releasing write lock for owner: {owner}");
|
||||
entry_guard.writer = None;
|
||||
}
|
||||
// try to release read lock
|
||||
else if let Some(count) = entry_guard.readers.get_mut(owner) {
|
||||
println!("Releasing read lock for owner: {owner} (count: {count})");
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(owner);
|
||||
println!("Removed owner {owner} from readers");
|
||||
}
|
||||
} else {
|
||||
println!("Owner {owner} not found in writers or readers");
|
||||
}
|
||||
// check if need to remove
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
println!("Lock entry is empty, marking for removal");
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
} else {
|
||||
println!(
|
||||
"Lock entry still has content: writer={:?}, readers={:?}",
|
||||
entry_guard.writer, entry_guard.readers
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
println!("Failed to acquire write lock for unlock - this is the problem!");
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::WouldBlock,
|
||||
"Failed to acquire write lock for unlock",
|
||||
));
|
||||
}
|
||||
match locks_guard.get(lock_id) {
|
||||
Some(e) => e.clone(),
|
||||
None => return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Lock entry not found")),
|
||||
}
|
||||
};
|
||||
|
||||
let mut need_remove = false;
|
||||
let (notify_writers, notify_readers, writer_pending, writer_none) = {
|
||||
let mut entry_guard = entry.write().await;
|
||||
|
||||
// try to release write lock
|
||||
if entry_guard.writer.as_ref() == Some(&owner.to_string()) {
|
||||
entry_guard.writer = None;
|
||||
}
|
||||
// try to release read lock
|
||||
else if let Some(count) = entry_guard.readers.get_mut(owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(owner);
|
||||
}
|
||||
} else {
|
||||
println!("Lock entry not found for lock_id: {lock_id:?}");
|
||||
// owner not found, treat as no-op
|
||||
}
|
||||
|
||||
// check if need to remove
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
}
|
||||
|
||||
// capture notifications and state
|
||||
(
|
||||
entry_guard.notify_writers.clone(),
|
||||
entry_guard.notify_readers.clone(),
|
||||
entry_guard.writer_pending,
|
||||
entry_guard.writer.is_none(),
|
||||
)
|
||||
};
|
||||
|
||||
if writer_pending > 0 && writer_none {
|
||||
// Wake a single writer to preserve fairness and avoid thundering herd
|
||||
notify_writers.notify_one();
|
||||
} else if writer_none {
|
||||
// No writers waiting, allow readers to proceed
|
||||
notify_readers.notify_waiters();
|
||||
}
|
||||
|
||||
// only here, entry's Ref is really dropped, can safely remove
|
||||
if need_remove {
|
||||
println!("Removing lock entry from map...");
|
||||
let mut locks_guard = self.locks.write().await;
|
||||
let removed = locks_guard.remove(lock_id);
|
||||
println!("Lock entry removed: {:?}", removed.is_some());
|
||||
let _ = locks_guard.remove(lock_id);
|
||||
}
|
||||
println!("Unlock operation completed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// unlock by LockId - smart release (compatible with old interface, but may be inaccurate)
|
||||
pub async fn unlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> {
|
||||
let mut need_remove = false;
|
||||
|
||||
{
|
||||
let entry = {
|
||||
let locks_guard = self.locks.read().await;
|
||||
if let Some(entry) = locks_guard.get(lock_id) {
|
||||
if let Ok(mut entry_guard) = entry.try_write() {
|
||||
// release write lock first
|
||||
if entry_guard.writer.is_some() {
|
||||
entry_guard.writer = None;
|
||||
}
|
||||
// if no write lock, release first read lock
|
||||
else if let Some((owner, _)) = entry_guard.readers.iter().next() {
|
||||
let owner = owner.clone();
|
||||
if let Some(count) = entry_guard.readers.get_mut(&owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(&owner);
|
||||
}
|
||||
}
|
||||
}
|
||||
match locks_guard.get(lock_id) {
|
||||
Some(e) => e.clone(),
|
||||
None => return Ok(()), // nothing to do
|
||||
}
|
||||
};
|
||||
|
||||
// if completely idle, clean entry
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
let mut need_remove = false;
|
||||
let (notify_writers, notify_readers, writer_pending, writer_none) = {
|
||||
let mut entry_guard = entry.write().await;
|
||||
|
||||
// release write lock first
|
||||
if entry_guard.writer.is_some() {
|
||||
entry_guard.writer = None;
|
||||
}
|
||||
// if no write lock, release first read lock
|
||||
else if let Some((owner, _)) = entry_guard.readers.iter().next() {
|
||||
let owner = owner.clone();
|
||||
if let Some(count) = entry_guard.readers.get_mut(&owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(&owner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
}
|
||||
|
||||
(
|
||||
entry_guard.notify_writers.clone(),
|
||||
entry_guard.notify_readers.clone(),
|
||||
entry_guard.writer_pending,
|
||||
entry_guard.writer.is_none(),
|
||||
)
|
||||
};
|
||||
|
||||
if writer_pending > 0 && writer_none {
|
||||
notify_writers.notify_one();
|
||||
} else if writer_none {
|
||||
notify_readers.notify_waiters();
|
||||
}
|
||||
|
||||
if need_remove {
|
||||
let mut locks_guard = self.locks.write().await;
|
||||
locks_guard.remove(lock_id);
|
||||
let _ = locks_guard.remove(lock_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// runlock by LockId and owner - need to specify owner to correctly unlock read lock
|
||||
pub async fn runlock_by_id_and_owner(&self, lock_id: &crate::types::LockId, owner: &str) -> std::io::Result<()> {
|
||||
let mut need_remove = false;
|
||||
|
||||
{
|
||||
let entry = {
|
||||
let locks_guard = self.locks.read().await;
|
||||
if let Some(entry) = locks_guard.get(lock_id) {
|
||||
if let Ok(mut entry_guard) = entry.try_write() {
|
||||
// release read lock
|
||||
if let Some(count) = entry_guard.readers.get_mut(owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(owner);
|
||||
}
|
||||
}
|
||||
match locks_guard.get(lock_id) {
|
||||
Some(e) => e.clone(),
|
||||
None => return Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
// if completely idle, clean entry
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
}
|
||||
let mut need_remove = false;
|
||||
let (notify_writers, notify_readers, writer_pending, writer_none) = {
|
||||
let mut entry_guard = entry.write().await;
|
||||
|
||||
// release read lock
|
||||
if let Some(count) = entry_guard.readers.get_mut(owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(owner);
|
||||
}
|
||||
}
|
||||
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
}
|
||||
|
||||
(
|
||||
entry_guard.notify_writers.clone(),
|
||||
entry_guard.notify_readers.clone(),
|
||||
entry_guard.writer_pending,
|
||||
entry_guard.writer.is_none(),
|
||||
)
|
||||
};
|
||||
|
||||
if writer_pending > 0 && writer_none {
|
||||
notify_writers.notify_waiters();
|
||||
} else if writer_none {
|
||||
notify_readers.notify_waiters();
|
||||
}
|
||||
|
||||
if need_remove {
|
||||
let mut locks_guard = self.locks.write().await;
|
||||
locks_guard.remove(lock_id);
|
||||
let _ = locks_guard.remove(lock_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// runlock by LockId - smart release read lock (compatible with old interface)
|
||||
pub async fn runlock_by_id(&self, lock_id: &crate::types::LockId) -> std::io::Result<()> {
|
||||
let mut need_remove = false;
|
||||
|
||||
{
|
||||
let entry = {
|
||||
let locks_guard = self.locks.read().await;
|
||||
if let Some(entry) = locks_guard.get(lock_id) {
|
||||
if let Ok(mut entry_guard) = entry.try_write() {
|
||||
// release first read lock
|
||||
if let Some((owner, _)) = entry_guard.readers.iter().next() {
|
||||
let owner = owner.clone();
|
||||
if let Some(count) = entry_guard.readers.get_mut(&owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(&owner);
|
||||
}
|
||||
}
|
||||
}
|
||||
match locks_guard.get(lock_id) {
|
||||
Some(e) => e.clone(),
|
||||
None => return Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
// if completely idle, clean entry
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
let mut need_remove = false;
|
||||
let (notify_writers, notify_readers, writer_pending, writer_none) = {
|
||||
let mut entry_guard = entry.write().await;
|
||||
|
||||
// release first read lock
|
||||
if let Some((owner, _)) = entry_guard.readers.iter().next() {
|
||||
let owner = owner.clone();
|
||||
if let Some(count) = entry_guard.readers.get_mut(&owner) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
entry_guard.readers.remove(&owner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if entry_guard.readers.is_empty() && entry_guard.writer.is_none() {
|
||||
entry_guard.expires_at = None;
|
||||
need_remove = true;
|
||||
}
|
||||
|
||||
(
|
||||
entry_guard.notify_writers.clone(),
|
||||
entry_guard.notify_readers.clone(),
|
||||
entry_guard.writer_pending,
|
||||
entry_guard.writer.is_none(),
|
||||
)
|
||||
};
|
||||
|
||||
if writer_pending > 0 && writer_none {
|
||||
notify_writers.notify_waiters();
|
||||
} else if writer_none {
|
||||
notify_readers.notify_waiters();
|
||||
}
|
||||
|
||||
if need_remove {
|
||||
let mut locks_guard = self.locks.write().await;
|
||||
locks_guard.remove(lock_id);
|
||||
let _ = locks_guard.remove(lock_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::time::Duration;
|
||||
use crate::{
|
||||
client::LockClient,
|
||||
error::{LockError, Result},
|
||||
guard::LockGuard,
|
||||
types::{LockId, LockInfo, LockRequest, LockResponse, LockStatus, LockType},
|
||||
};
|
||||
|
||||
@@ -60,6 +61,22 @@ impl NamespaceLock {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create namespace lock with clients and an explicit quorum size.
|
||||
/// Quorum will be clamped into [1, clients.len()]. For single client, quorum is always 1.
|
||||
pub fn with_clients_and_quorum(namespace: String, clients: Vec<Arc<dyn LockClient>>, quorum: usize) -> Self {
|
||||
let q = if clients.len() <= 1 {
|
||||
1
|
||||
} else {
|
||||
quorum.clamp(1, clients.len())
|
||||
};
|
||||
|
||||
Self {
|
||||
clients,
|
||||
namespace,
|
||||
quorum: q,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create namespace lock with client (compatibility)
|
||||
pub fn with_client(client: Arc<dyn LockClient>) -> Self {
|
||||
Self::with_clients("default".to_string(), vec![client])
|
||||
@@ -86,54 +103,77 @@ impl NamespaceLock {
|
||||
return self.clients[0].acquire_lock(request).await;
|
||||
}
|
||||
|
||||
// Two-phase commit for distributed lock acquisition
|
||||
self.acquire_lock_with_2pc(request).await
|
||||
// Quorum-based acquisition for distributed mode
|
||||
let (resp, _idxs) = self.acquire_lock_quorum(request).await?;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
/// Two-phase commit lock acquisition: all nodes must succeed or all fail
|
||||
async fn acquire_lock_with_2pc(&self, request: &LockRequest) -> Result<LockResponse> {
|
||||
// Phase 1: Prepare - try to acquire lock on all clients
|
||||
let futures: Vec<_> = self
|
||||
/// Acquire a lock and return a RAII guard that will release asynchronously on Drop.
|
||||
/// This is a thin wrapper around `acquire_lock` and will only create a guard when acquisition succeeds.
|
||||
pub async fn acquire_guard(&self, request: &LockRequest) -> Result<Option<LockGuard>> {
|
||||
if self.clients.is_empty() {
|
||||
return Err(LockError::internal("No lock clients available"));
|
||||
}
|
||||
|
||||
if self.clients.len() == 1 {
|
||||
let resp = self.clients[0].acquire_lock(request).await?;
|
||||
if resp.success {
|
||||
return Ok(Some(LockGuard::new(
|
||||
LockId::new_deterministic(&request.resource),
|
||||
vec![self.clients[0].clone()],
|
||||
)));
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (resp, idxs) = self.acquire_lock_quorum(request).await?;
|
||||
if resp.success {
|
||||
let subset: Vec<_> = idxs.into_iter().filter_map(|i| self.clients.get(i).cloned()).collect();
|
||||
Ok(Some(LockGuard::new(LockId::new_deterministic(&request.resource), subset)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience: acquire exclusive lock as a guard
|
||||
pub async fn lock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result<Option<LockGuard>> {
|
||||
let req = LockRequest::new(self.get_resource_key(resource), LockType::Exclusive, owner)
|
||||
.with_acquire_timeout(timeout)
|
||||
.with_ttl(ttl);
|
||||
self.acquire_guard(&req).await
|
||||
}
|
||||
|
||||
/// Convenience: acquire shared lock as a guard
|
||||
pub async fn rlock_guard(&self, resource: &str, owner: &str, timeout: Duration, ttl: Duration) -> Result<Option<LockGuard>> {
|
||||
let req = LockRequest::new(self.get_resource_key(resource), LockType::Shared, owner)
|
||||
.with_acquire_timeout(timeout)
|
||||
.with_ttl(ttl);
|
||||
self.acquire_guard(&req).await
|
||||
}
|
||||
|
||||
/// Quorum-based lock acquisition: success if at least `self.quorum` clients succeed.
|
||||
/// Returns the LockResponse and the indices of clients that acquired the lock.
|
||||
async fn acquire_lock_quorum(&self, request: &LockRequest) -> Result<(LockResponse, Vec<usize>)> {
|
||||
let futs: Vec<_> = self
|
||||
.clients
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, client)| async move {
|
||||
let result = client.acquire_lock(request).await;
|
||||
(idx, result)
|
||||
})
|
||||
.map(|(idx, client)| async move { (idx, client.acquire_lock(request).await) })
|
||||
.collect();
|
||||
|
||||
let results = futures::future::join_all(futures).await;
|
||||
let results = futures::future::join_all(futs).await;
|
||||
let mut successful_clients = Vec::new();
|
||||
let mut failed_clients = Vec::new();
|
||||
|
||||
// Collect results
|
||||
for (idx, result) in results {
|
||||
match result {
|
||||
Ok(response) if response.success => {
|
||||
for (idx, res) in results {
|
||||
if let Ok(resp) = res {
|
||||
if resp.success {
|
||||
successful_clients.push(idx);
|
||||
}
|
||||
_ => {
|
||||
failed_clients.push(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we have enough successful acquisitions for quorum
|
||||
if successful_clients.len() >= self.quorum {
|
||||
// Phase 2a: Commit - we have quorum, but need to ensure consistency
|
||||
// If not all clients succeeded, we need to rollback for consistency
|
||||
if successful_clients.len() < self.clients.len() {
|
||||
// Rollback all successful acquisitions to maintain consistency
|
||||
self.rollback_acquisitions(request, &successful_clients).await;
|
||||
return Ok(LockResponse::failure(
|
||||
"Partial success detected, rolled back for consistency".to_string(),
|
||||
Duration::ZERO,
|
||||
));
|
||||
}
|
||||
|
||||
// All clients succeeded - lock acquired successfully
|
||||
Ok(LockResponse::success(
|
||||
let resp = LockResponse::success(
|
||||
LockInfo {
|
||||
id: LockId::new_deterministic(&request.resource),
|
||||
resource: request.resource.clone(),
|
||||
@@ -148,16 +188,17 @@ impl NamespaceLock {
|
||||
wait_start_time: None,
|
||||
},
|
||||
Duration::ZERO,
|
||||
))
|
||||
);
|
||||
Ok((resp, successful_clients))
|
||||
} else {
|
||||
// Phase 2b: Abort - insufficient quorum, rollback any successful acquisitions
|
||||
if !successful_clients.is_empty() {
|
||||
self.rollback_acquisitions(request, &successful_clients).await;
|
||||
}
|
||||
Ok(LockResponse::failure(
|
||||
let resp = LockResponse::failure(
|
||||
format!("Failed to acquire quorum: {}/{} required", successful_clients.len(), self.quorum),
|
||||
Duration::ZERO,
|
||||
))
|
||||
);
|
||||
Ok((resp, Vec::new()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -420,6 +461,33 @@ mod tests {
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_guard_acquire_and_drop_release() {
|
||||
let ns_lock = NamespaceLock::with_client(Arc::new(LocalClient::new()));
|
||||
|
||||
// Acquire guard
|
||||
let guard = ns_lock
|
||||
.lock_guard("guard-resource", "owner", Duration::from_millis(100), Duration::from_secs(5))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(guard.is_some());
|
||||
let lock_id = guard.as_ref().unwrap().lock_id().clone();
|
||||
|
||||
// Drop guard to trigger background release
|
||||
drop(guard);
|
||||
|
||||
// Give background worker a moment to process
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Re-acquire should succeed (previous lock released)
|
||||
let req = LockRequest::new(&lock_id.resource, LockType::Exclusive, "owner").with_ttl(Duration::from_secs(2));
|
||||
let resp = ns_lock.acquire_lock(&req).await.unwrap();
|
||||
assert!(resp.success);
|
||||
|
||||
// Cleanup
|
||||
let _ = ns_lock.release_lock(&LockId::new_deterministic(&lock_id.resource)).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connection_health() {
|
||||
let local_lock = NamespaceLock::new("test-namespace".to_string());
|
||||
@@ -502,9 +570,11 @@ mod tests {
|
||||
let client2: Arc<dyn LockClient> = Arc::new(LocalClient::new());
|
||||
let clients = vec![client1, client2];
|
||||
|
||||
let ns_lock = NamespaceLock::with_clients("test-namespace".to_string(), clients);
|
||||
// LocalClient shares a global in-memory map. For exclusive locks, only one can acquire at a time.
|
||||
// In real distributed setups the quorum should be tied to EC write quorum. Here we use quorum=1 for success.
|
||||
let ns_lock = NamespaceLock::with_clients_and_quorum("test-namespace".to_string(), clients, 1);
|
||||
|
||||
let request = LockRequest::new("test-resource", LockType::Exclusive, "test_owner").with_ttl(Duration::from_secs(10));
|
||||
let request = LockRequest::new("test-resource", LockType::Shared, "test_owner").with_ttl(Duration::from_secs(2));
|
||||
|
||||
// This should succeed only if ALL clients can acquire the lock
|
||||
let response = ns_lock.acquire_lock(&request).await.unwrap();
|
||||
|
||||
@@ -22,6 +22,7 @@ pub mod net;
|
||||
#[cfg(feature = "net")]
|
||||
pub use net::*;
|
||||
|
||||
#[cfg(all(feature = "net", feature = "io"))]
|
||||
pub mod retry;
|
||||
|
||||
#[cfg(feature = "io")]
|
||||
|
||||
Reference in New Issue
Block a user