mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
lock: dedicate unlock worker to thread runtime; robust fallback in Drop (#446)
* lock: dedicate unlock worker to thread runtime; robust fallback in Drop * Update crates/lock/src/guard.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/lock/src/guard.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/lock/src/guard.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Refactor logging in UNLOCK_TX error handling Removed redundant logging of lock_id in warning message. --------- Co-authored-by: houseme <housemecn@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -15,6 +15,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use std::thread;
|
||||
use tokio::runtime::Builder;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{client::LockClient, types::LockId};
|
||||
@@ -25,36 +27,43 @@ struct UnlockJob {
|
||||
clients: Vec<Arc<dyn LockClient>>, // cloned Arcs; cheap and shares state
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UnlockRuntime {
|
||||
tx: mpsc::Sender<UnlockJob>,
|
||||
}
|
||||
|
||||
// Global unlock runtime with background worker
|
||||
static UNLOCK_RUNTIME: Lazy<UnlockRuntime> = Lazy::new(|| {
|
||||
// Global unlock runtime with background worker running on a dedicated thread-bound Tokio runtime.
|
||||
// This avoids depending on the application's Tokio runtime lifetimes/cancellation scopes.
|
||||
static UNLOCK_TX: Lazy<mpsc::Sender<UnlockJob>> = Lazy::new(|| {
|
||||
// Larger buffer to reduce contention during bursts
|
||||
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
|
||||
|
||||
// Spawn background worker when first used; assumes a Tokio runtime is available
|
||||
tokio::spawn(async move {
|
||||
while let Some(job) = rx.recv().await {
|
||||
// Best-effort release across clients; try all, success if any succeeds
|
||||
let mut any_ok = false;
|
||||
let lock_id = job.lock_id.clone();
|
||||
for client in job.clients.into_iter() {
|
||||
if client.release(&lock_id).await.unwrap_or(false) {
|
||||
any_ok = true;
|
||||
}
|
||||
}
|
||||
if !any_ok {
|
||||
tracing::warn!("LockGuard background release failed for {}", lock_id);
|
||||
} else {
|
||||
tracing::debug!("LockGuard background released {}", lock_id);
|
||||
}
|
||||
}
|
||||
});
|
||||
// Spawn a dedicated OS thread that owns its own Tokio runtime to process unlock jobs.
|
||||
thread::Builder::new()
|
||||
.name("rustfs-lock-unlocker".to_string())
|
||||
.spawn(move || {
|
||||
// A lightweight current-thread runtime is sufficient here.
|
||||
let rt = Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to build Tokio runtime for background unlock jobs (possible causes: resource exhaustion, thread limit, Tokio misconfiguration)");
|
||||
|
||||
UnlockRuntime { tx }
|
||||
rt.block_on(async move {
|
||||
while let Some(job) = rx.recv().await {
|
||||
// Best-effort release across clients; try all, success if any succeeds
|
||||
let mut any_ok = false;
|
||||
let lock_id = job.lock_id.clone();
|
||||
for client in job.clients.into_iter() {
|
||||
if client.release(&lock_id).await.unwrap_or(false) {
|
||||
any_ok = true;
|
||||
}
|
||||
}
|
||||
if !any_ok {
|
||||
tracing::warn!("LockGuard background release failed for {}", lock_id);
|
||||
} else {
|
||||
tracing::debug!("LockGuard background released {}", lock_id);
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
.expect("failed to spawn unlock worker thread");
|
||||
|
||||
tx
|
||||
});
|
||||
|
||||
/// A RAII guard that releases the lock asynchronously when dropped.
|
||||
@@ -99,22 +108,32 @@ impl Drop for LockGuard {
|
||||
};
|
||||
|
||||
// Try a non-blocking send to avoid panics in Drop
|
||||
if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
|
||||
// Channel full or closed; best-effort fallback: spawn a detached task
|
||||
if let Err(err) = UNLOCK_TX.try_send(job) {
|
||||
// Channel full or closed; best-effort fallback using a dedicated thread runtime
|
||||
let lock_id = self.lock_id.clone();
|
||||
let clients = self.clients.clone();
|
||||
tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id);
|
||||
tracing::warn!(
|
||||
"LockGuard channel send failed ({}), spawning fallback unlock thread for {}",
|
||||
err,
|
||||
lock_id.clone()
|
||||
);
|
||||
|
||||
// If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts.
|
||||
let handle = tokio::spawn(async move {
|
||||
let futures_iter = clients.into_iter().map(|client| {
|
||||
let id = lock_id.clone();
|
||||
async move { client.release(&id).await.unwrap_or(false) }
|
||||
// Use a short-lived background thread to execute the async releases on its own runtime.
|
||||
let _ = thread::Builder::new()
|
||||
.name("rustfs-lock-unlock-fallback".to_string())
|
||||
.spawn(move || {
|
||||
let rt = Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to build fallback unlock runtime in LockGuard::drop fallback thread. This indicates resource exhaustion or misconfiguration (e.g., thread limits, Tokio runtime issues). Remediation: check system resource limits, ensure sufficient threads are available, and verify Tokio runtime configuration.");
|
||||
rt.block_on(async move {
|
||||
let futures_iter = clients.into_iter().map(|client| {
|
||||
let id = lock_id.clone();
|
||||
async move { client.release(&id).await.unwrap_or(false) }
|
||||
});
|
||||
let _ = futures::future::join_all(futures_iter).await;
|
||||
});
|
||||
});
|
||||
let _ = futures::future::join_all(futures_iter).await;
|
||||
});
|
||||
// Explicitly drop the JoinHandle to acknowledge detaching the task.
|
||||
std::mem::drop(handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user