diff --git a/Cargo.lock b/Cargo.lock index 0916ae0f..b90be216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3561,6 +3561,7 @@ version = "0.0.1" dependencies = [ "common", "tokio", + "tracing", ] [[package]] diff --git a/common/workers/Cargo.toml b/common/workers/Cargo.toml index fa918688..f5b6f25f 100644 --- a/common/workers/Cargo.toml +++ b/common/workers/Cargo.toml @@ -11,4 +11,5 @@ workspace = true [dependencies] common.workspace = true -tokio.workspace = true \ No newline at end of file +tokio.workspace = true +tracing.workspace = true diff --git a/common/workers/src/workers.rs b/common/workers/src/workers.rs index 85be5efb..e33ebd75 100644 --- a/common/workers/src/workers.rs +++ b/common/workers/src/workers.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use tokio::sync::{Mutex, Notify}; +use tracing::info; pub struct Workers { available: Mutex, // 可用的工作槽 @@ -23,18 +24,23 @@ impl Workers { // 让一个作业获得执行的机会 pub async fn take(&self) { - let mut available = self.available.lock().await; - while *available == 0 { - // 等待直到有可用槽 - self.notify.notified().await; - available = self.available.lock().await; + loop { + let mut available = self.available.lock().await; + info!("worker take, {}", *available); + if *available == 0 { + drop(available); + self.notify.notified().await; + } else { + *available -= 1; + break; + } } - *available -= 1; // 减少可用槽 } // 让一个作业释放其机会 pub async fn give(&self) { let mut available = self.available.lock().await; + info!("worker give, {}", *available); *available += 1; // 增加可用槽 self.notify.notify_one(); // 通知一个等待的任务 } @@ -68,15 +74,17 @@ mod tests { async fn test_workers() { let workers = Arc::new(Workers::new(5).unwrap()); - for _ in 0..4 { + for _ in 0..5 { let workers = workers.clone(); tokio::spawn(async move { workers.take().await; sleep(Duration::from_secs(3)).await; - workers.give().await; }); } + for _ in 0..5 { + workers.give().await; + } // Sleep: wait for spawn task started sleep(Duration::from_secs(1)).await; workers.wait().await; diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index a2535ce2..1f553565 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -164,6 +164,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - let entry = match r.peek().await { Ok(res) => { if let Some(entry) = res { + info!("read entry disk: {}, name: {}", i, entry.name); entry } else { // eof @@ -195,7 +196,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - // If no current, add it. if current.name.is_empty() { - top_entries.insert(i, Some(entry.clone())); + top_entries[i] = Some(entry.clone()); current = entry; agree += 1; @@ -203,14 +204,14 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } // If exact match, we agree. if let Ok((_, true)) = current.matches(&entry, true) { - top_entries.insert(i, Some(entry)); + top_entries[i] = Some(entry); agree += 1; continue; } // If only the name matches we didn't agree, but add it for resolution. if entry.name == current.name { - top_entries.insert(i, Some(entry)); + top_entries[i] = Some(entry); continue; } @@ -220,9 +221,9 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } // We got a new, better current. // Clear existing entries. - top_entries.clear(); + top_entries = vec![None; top_entries.len()]; agree += 1; - top_entries.insert(i, Some(entry.clone())); + top_entries[i] = Some(entry.clone()); current = entry; } @@ -282,6 +283,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } } + info!("read entry should heal: {}", current.name); if let Some(partial_fn) = opts.partial.as_ref() { partial_fn(MetaCacheEntries(top_entries), &errs).await; }