mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3561,6 +3561,7 @@ version = "0.0.1"
|
||||
dependencies = [
|
||||
"common",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -11,4 +11,5 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
common.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, Notify};
|
||||
use tracing::info;
|
||||
|
||||
pub struct Workers {
|
||||
available: Mutex<usize>, // 可用的工作槽
|
||||
@@ -23,18 +24,23 @@ impl Workers {
|
||||
|
||||
// 让一个作业获得执行的机会
|
||||
pub async fn take(&self) {
|
||||
let mut available = self.available.lock().await;
|
||||
while *available == 0 {
|
||||
// 等待直到有可用槽
|
||||
self.notify.notified().await;
|
||||
available = self.available.lock().await;
|
||||
loop {
|
||||
let mut available = self.available.lock().await;
|
||||
info!("worker take, {}", *available);
|
||||
if *available == 0 {
|
||||
drop(available);
|
||||
self.notify.notified().await;
|
||||
} else {
|
||||
*available -= 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
*available -= 1; // 减少可用槽
|
||||
}
|
||||
|
||||
// 让一个作业释放其机会
|
||||
pub async fn give(&self) {
|
||||
let mut available = self.available.lock().await;
|
||||
info!("worker give, {}", *available);
|
||||
*available += 1; // 增加可用槽
|
||||
self.notify.notify_one(); // 通知一个等待的任务
|
||||
}
|
||||
@@ -68,15 +74,17 @@ mod tests {
|
||||
async fn test_workers() {
|
||||
let workers = Arc::new(Workers::new(5).unwrap());
|
||||
|
||||
for _ in 0..4 {
|
||||
for _ in 0..5 {
|
||||
let workers = workers.clone();
|
||||
tokio::spawn(async move {
|
||||
workers.take().await;
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
workers.give().await;
|
||||
});
|
||||
}
|
||||
|
||||
for _ in 0..5 {
|
||||
workers.give().await;
|
||||
}
|
||||
// Sleep: wait for spawn task started
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
workers.wait().await;
|
||||
|
||||
@@ -164,6 +164,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
let entry = match r.peek().await {
|
||||
Ok(res) => {
|
||||
if let Some(entry) = res {
|
||||
info!("read entry disk: {}, name: {}", i, entry.name);
|
||||
entry
|
||||
} else {
|
||||
// eof
|
||||
@@ -195,7 +196,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
|
||||
// If no current, add it.
|
||||
if current.name.is_empty() {
|
||||
top_entries.insert(i, Some(entry.clone()));
|
||||
top_entries[i] = Some(entry.clone());
|
||||
current = entry;
|
||||
agree += 1;
|
||||
|
||||
@@ -203,14 +204,14 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
// If exact match, we agree.
|
||||
if let Ok((_, true)) = current.matches(&entry, true) {
|
||||
top_entries.insert(i, Some(entry));
|
||||
top_entries[i] = Some(entry);
|
||||
agree += 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
// If only the name matches we didn't agree, but add it for resolution.
|
||||
if entry.name == current.name {
|
||||
top_entries.insert(i, Some(entry));
|
||||
top_entries[i] = Some(entry);
|
||||
|
||||
continue;
|
||||
}
|
||||
@@ -220,9 +221,9 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
// We got a new, better current.
|
||||
// Clear existing entries.
|
||||
top_entries.clear();
|
||||
top_entries = vec![None; top_entries.len()];
|
||||
agree += 1;
|
||||
top_entries.insert(i, Some(entry.clone()));
|
||||
top_entries[i] = Some(entry.clone());
|
||||
current = entry;
|
||||
}
|
||||
|
||||
@@ -282,6 +283,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
}
|
||||
|
||||
info!("read entry should heal: {}", current.name);
|
||||
if let Some(partial_fn) = opts.partial.as_ref() {
|
||||
partial_fn(MetaCacheEntries(top_entries), &errs).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user