Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-12-29 20:25:23 +08:00
parent dc3154b883
commit 2f5f352f9a
4 changed files with 26 additions and 14 deletions

1
Cargo.lock generated
View File

@@ -3561,6 +3561,7 @@ version = "0.0.1"
dependencies = [
"common",
"tokio",
"tracing",
]
[[package]]

View File

@@ -11,4 +11,5 @@ workspace = true
[dependencies]
common.workspace = true
tokio.workspace = true
tokio.workspace = true
tracing.workspace = true

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tracing::info;
pub struct Workers {
available: Mutex<usize>, // 可用的工作槽
@@ -23,18 +24,23 @@ impl Workers {
// 让一个作业获得执行的机会
pub async fn take(&self) {
let mut available = self.available.lock().await;
while *available == 0 {
// 等待直到有可用槽
self.notify.notified().await;
available = self.available.lock().await;
loop {
let mut available = self.available.lock().await;
info!("worker take, {}", *available);
if *available == 0 {
drop(available);
self.notify.notified().await;
} else {
*available -= 1;
break;
}
}
*available -= 1; // 减少可用槽
}
// 让一个作业释放其机会
pub async fn give(&self) {
let mut available = self.available.lock().await;
info!("worker give, {}", *available);
*available += 1; // 增加可用槽
self.notify.notify_one(); // 通知一个等待的任务
}
@@ -68,15 +74,17 @@ mod tests {
async fn test_workers() {
let workers = Arc::new(Workers::new(5).unwrap());
for _ in 0..4 {
for _ in 0..5 {
let workers = workers.clone();
tokio::spawn(async move {
workers.take().await;
sleep(Duration::from_secs(3)).await;
workers.give().await;
});
}
for _ in 0..5 {
workers.give().await;
}
// Sleep: wait for spawn task started
sleep(Duration::from_secs(1)).await;
workers.wait().await;

View File

@@ -164,6 +164,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
let entry = match r.peek().await {
Ok(res) => {
if let Some(entry) = res {
info!("read entry disk: {}, name: {}", i, entry.name);
entry
} else {
// eof
@@ -195,7 +196,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
// If no current, add it.
if current.name.is_empty() {
top_entries.insert(i, Some(entry.clone()));
top_entries[i] = Some(entry.clone());
current = entry;
agree += 1;
@@ -203,14 +204,14 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
// If exact match, we agree.
if let Ok((_, true)) = current.matches(&entry, true) {
top_entries.insert(i, Some(entry));
top_entries[i] = Some(entry);
agree += 1;
continue;
}
// If only the name matches we didn't agree, but add it for resolution.
if entry.name == current.name {
top_entries.insert(i, Some(entry));
top_entries[i] = Some(entry);
continue;
}
@@ -220,9 +221,9 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
// We got a new, better current.
// Clear existing entries.
top_entries.clear();
top_entries = vec![None; top_entries.len()];
agree += 1;
top_entries.insert(i, Some(entry.clone()));
top_entries[i] = Some(entry.clone());
current = entry;
}
@@ -282,6 +283,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
}
info!("read entry should heal: {}", current.name);
if let Some(partial_fn) = opts.partial.as_ref() {
partial_fn(MetaCacheEntries(top_entries), &errs).await;
}