fix(workers): clamp worker release count (#2122)

This commit is contained in:
安正超
2026-03-11 21:59:00 +08:00
committed by GitHub
parent c47dec8549
commit df57f0c033
2 changed files with 19 additions and 8 deletions

View File

@@ -29,5 +29,5 @@ documentation = "https://docs.rs/rustfs-workers/latest/rustfs_workers/"
workspace = true
[dependencies]
tokio = { workspace = true, features = ["sync"] }
tokio = { workspace = true, features = ["sync", "time", "macros"] }
tracing.workspace = true

View File

@@ -55,7 +55,7 @@ impl Workers {
pub async fn give(&self) {
let mut available = self.available.lock().await;
info!("worker give, {}", *available);
*available += 1; // Increase available slots
*available = (*available).saturating_add(1).min(self.limit); // avoid over-release beyond limit
self.notify.notify_one(); // Notify a waiting task
}
@@ -87,13 +87,14 @@ mod tests {
#[tokio::test]
async fn test_workers() {
let workers = Arc::new(Workers::new(5).unwrap());
let workers = Workers::new(5).unwrap();
for _ in 0..5 {
let workers = workers.clone();
tokio::spawn(async move {
workers.take().await;
sleep(Duration::from_secs(3)).await;
sleep(Duration::from_millis(50)).await;
workers.give().await;
});
}
@@ -101,10 +102,20 @@ mod tests {
workers.give().await;
}
// Sleep: wait for spawn task started
sleep(Duration::from_secs(1)).await;
sleep(Duration::from_millis(20)).await;
workers.wait().await;
if workers.available().await != workers.limit {
unreachable!();
}
assert_eq!(workers.available().await, workers.limit);
}
#[tokio::test]
async fn test_workers_over_release_is_clamped() {
let workers = Workers::new(2).unwrap();
workers.take().await;
workers.give().await;
workers.give().await;
workers.give().await;
assert_eq!(workers.available().await, 2);
}
}