From 5b85bf7a001e23de66f7c2f216b7b7db5847c41f Mon Sep 17 00:00:00 2001 From: loverustfs <155562731+loverustfs@users.noreply.github.com> Date: Fri, 22 Aug 2025 16:51:56 +0800 Subject: [PATCH] lock: dedicate unlock worker to thread runtime; robust fallback in Drop (#446) * lock: dedicate unlock worker to thread runtime; robust fallback in Drop * Update crates/lock/src/guard.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/lock/src/guard.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/lock/src/guard.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Refactor logging in UNLOCK_TX error handling Removed redundant logging of lock_id in warning message. --------- Co-authored-by: houseme Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/lock/src/guard.rs | 95 ++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 38 deletions(-) diff --git a/crates/lock/src/guard.rs b/crates/lock/src/guard.rs index f680840f..7f728cca 100644 --- a/crates/lock/src/guard.rs +++ b/crates/lock/src/guard.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use once_cell::sync::Lazy; +use std::thread; +use tokio::runtime::Builder; use tokio::sync::mpsc; use crate::{client::LockClient, types::LockId}; @@ -25,36 +27,43 @@ struct UnlockJob { clients: Vec>, // cloned Arcs; cheap and shares state } -#[derive(Debug)] -struct UnlockRuntime { - tx: mpsc::Sender, -} - -// Global unlock runtime with background worker -static UNLOCK_RUNTIME: Lazy = Lazy::new(|| { +// Global unlock runtime with background worker running on a dedicated thread-bound Tokio runtime. +// This avoids depending on the application's Tokio runtime lifetimes/cancellation scopes. +static UNLOCK_TX: Lazy> = Lazy::new(|| { // Larger buffer to reduce contention during bursts let (tx, mut rx) = mpsc::channel::(8192); - // Spawn background worker when first used; assumes a Tokio runtime is available - tokio::spawn(async move { - while let Some(job) = rx.recv().await { - // Best-effort release across clients; try all, success if any succeeds - let mut any_ok = false; - let lock_id = job.lock_id.clone(); - for client in job.clients.into_iter() { - if client.release(&lock_id).await.unwrap_or(false) { - any_ok = true; - } - } - if !any_ok { - tracing::warn!("LockGuard background release failed for {}", lock_id); - } else { - tracing::debug!("LockGuard background released {}", lock_id); - } - } - }); + // Spawn a dedicated OS thread that owns its own Tokio runtime to process unlock jobs. + thread::Builder::new() + .name("rustfs-lock-unlocker".to_string()) + .spawn(move || { + // A lightweight current-thread runtime is sufficient here. + let rt = Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build Tokio runtime for background unlock jobs (possible causes: resource exhaustion, thread limit, Tokio misconfiguration)"); - UnlockRuntime { tx } + rt.block_on(async move { + while let Some(job) = rx.recv().await { + // Best-effort release across clients; try all, success if any succeeds + let mut any_ok = false; + let lock_id = job.lock_id.clone(); + for client in job.clients.into_iter() { + if client.release(&lock_id).await.unwrap_or(false) { + any_ok = true; + } + } + if !any_ok { + tracing::warn!("LockGuard background release failed for {}", lock_id); + } else { + tracing::debug!("LockGuard background released {}", lock_id); + } + } + }); + }) + .expect("failed to spawn unlock worker thread"); + + tx }); /// A RAII guard that releases the lock asynchronously when dropped. @@ -99,22 +108,32 @@ impl Drop for LockGuard { }; // Try a non-blocking send to avoid panics in Drop - if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) { - // Channel full or closed; best-effort fallback: spawn a detached task + if let Err(err) = UNLOCK_TX.try_send(job) { + // Channel full or closed; best-effort fallback using a dedicated thread runtime let lock_id = self.lock_id.clone(); let clients = self.clients.clone(); - tracing::warn!("LockGuard channel send failed ({}), spawning fallback unlock task for {}", err, lock_id); + tracing::warn!( + "LockGuard channel send failed ({}), spawning fallback unlock thread for {}", + err, + lock_id.clone() + ); - // If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts. - let handle = tokio::spawn(async move { - let futures_iter = clients.into_iter().map(|client| { - let id = lock_id.clone(); - async move { client.release(&id).await.unwrap_or(false) } + // Use a short-lived background thread to execute the async releases on its own runtime. + let _ = thread::Builder::new() + .name("rustfs-lock-unlock-fallback".to_string()) + .spawn(move || { + let rt = Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to build fallback unlock runtime in LockGuard::drop fallback thread. This indicates resource exhaustion or misconfiguration (e.g., thread limits, Tokio runtime issues). Remediation: check system resource limits, ensure sufficient threads are available, and verify Tokio runtime configuration."); + rt.block_on(async move { + let futures_iter = clients.into_iter().map(|client| { + let id = lock_id.clone(); + async move { client.release(&id).await.unwrap_or(false) } + }); + let _ = futures::future::join_all(futures_iter).await; + }); }); - let _ = futures::future::join_all(futures_iter).await; - }); - // Explicitly drop the JoinHandle to acknowledge detaching the task. - std::mem::drop(handle); } } }