From df57f0c03304a00a42b6b5f8c880b2ab09860520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=AD=A3=E8=B6=85?= Date: Wed, 11 Mar 2026 21:59:00 +0800 Subject: [PATCH] fix(workers): clamp worker release count (#2122) --- crates/workers/Cargo.toml | 2 +- crates/workers/src/workers.rs | 25 ++++++++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/crates/workers/Cargo.toml b/crates/workers/Cargo.toml index bc72c9e4..97c9aa8e 100644 --- a/crates/workers/Cargo.toml +++ b/crates/workers/Cargo.toml @@ -29,5 +29,5 @@ documentation = "https://docs.rs/rustfs-workers/latest/rustfs_workers/" workspace = true [dependencies] -tokio = { workspace = true, features = ["sync"] } +tokio = { workspace = true, features = ["sync", "time", "macros"] } tracing.workspace = true diff --git a/crates/workers/src/workers.rs b/crates/workers/src/workers.rs index eecd15dd..3b56444c 100644 --- a/crates/workers/src/workers.rs +++ b/crates/workers/src/workers.rs @@ -55,7 +55,7 @@ impl Workers { pub async fn give(&self) { let mut available = self.available.lock().await; info!("worker give, {}", *available); - *available += 1; // Increase available slots + *available = (*available).saturating_add(1).min(self.limit); // avoid over-release beyond limit self.notify.notify_one(); // Notify a waiting task } @@ -87,13 +87,14 @@ mod tests { #[tokio::test] async fn test_workers() { - let workers = Arc::new(Workers::new(5).unwrap()); + let workers = Workers::new(5).unwrap(); for _ in 0..5 { let workers = workers.clone(); tokio::spawn(async move { workers.take().await; - sleep(Duration::from_secs(3)).await; + sleep(Duration::from_millis(50)).await; + workers.give().await; }); } @@ -101,10 +102,20 @@ mod tests { workers.give().await; } // Sleep: wait for spawn task started - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_millis(20)).await; workers.wait().await; - if workers.available().await != workers.limit { - unreachable!(); - } + assert_eq!(workers.available().await, workers.limit); + } + + #[tokio::test] + async fn test_workers_over_release_is_clamped() { + let workers = Workers::new(2).unwrap(); + + workers.take().await; + workers.give().await; + workers.give().await; + workers.give().await; + + assert_eq!(workers.available().await, 2); } }