diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index 098724bd..a2535ce2 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -8,10 +8,7 @@ use crate::{ }; use futures::future::join_all; use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::{ - spawn, - sync::{broadcast::Receiver as B_Receiver, RwLock}, -}; +use tokio::{spawn, sync::broadcast::Receiver as B_Receiver}; use tracing::{error, info}; type AgreedFn = Box Pin + Send>> + Send + 'static>; @@ -65,7 +62,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - let mut jobs: Vec>> = Vec::new(); let mut readers = Vec::with_capacity(opts.disks.len()); - let fds = Arc::new(RwLock::new(opts.fallback_disks.clone())); + let fds = Arc::new(opts.fallback_disks.clone()); for disk in opts.disks.iter() { let opdisk = disk.clone(); @@ -101,47 +98,40 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } while need_fallback { - let f_disk = loop { - let mut fds_w = fds_clone.write().await; - if fds_w.is_empty() { - break None; - } - - if let Some(fd) = fds_w.remove(0) { - if fd.is_online().await { - break Some(fd); - } - } - }; - - if let Some(disk) = f_disk { - match disk - .as_ref() - .walk_dir( - WalkDirOptions { - bucket: opts_clone.bucket.clone(), - base_dir: opts_clone.path.clone(), - recursive: opts_clone.recursice, - report_notfound: opts_clone.report_not_found, - filter_prefix: opts_clone.filter_prefix.clone(), - forward_to: opts_clone.forward_to.clone(), - limit: opts_clone.per_disk_limit, - ..Default::default() - }, - &mut wr, - ) - .await - { - Ok(_r) => { - need_fallback = false; - } - Err(err) => { - error!("walk dir2 err {:?}", &err); + let disk = match fds_clone.iter().find(|d| d.is_some()) { + Some(d) => { + if let Some(disk) = d.clone() { + disk + } else { break; } } - } else { - break; + None => break, + }; + match disk + .as_ref() + .walk_dir( + WalkDirOptions { + bucket: opts_clone.bucket.clone(), + base_dir: opts_clone.path.clone(), + recursive: opts_clone.recursice, + report_notfound: opts_clone.report_not_found, + filter_prefix: opts_clone.filter_prefix.clone(), + forward_to: opts_clone.forward_to.clone(), + limit: opts_clone.per_disk_limit, + ..Default::default() + }, + &mut wr, + ) + .await + { + Ok(_r) => { + need_fallback = false; + } + Err(err) => { + error!("walk dir2 err {:?}", &err); + break; + } } } @@ -295,7 +285,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - if let Some(partial_fn) = opts.partial.as_ref() { partial_fn(MetaCacheEntries(top_entries), &errs).await; } - break; + // break; } Ok(()) });