Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-12-29 18:19:46 +08:00
parent 874679e65d
commit dc3154b883

View File

@@ -8,10 +8,7 @@ use crate::{
};
use futures::future::join_all;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
spawn,
sync::{broadcast::Receiver as B_Receiver, RwLock},
};
use tokio::{spawn, sync::broadcast::Receiver as B_Receiver};
use tracing::{error, info};
type AgreedFn = Box<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
@@ -65,7 +62,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
let mut jobs: Vec<tokio::task::JoinHandle<std::result::Result<(), Error>>> = Vec::new();
let mut readers = Vec::with_capacity(opts.disks.len());
let fds = Arc::new(RwLock::new(opts.fallback_disks.clone()));
let fds = Arc::new(opts.fallback_disks.clone());
for disk in opts.disks.iter() {
let opdisk = disk.clone();
@@ -101,47 +98,40 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
while need_fallback {
let f_disk = loop {
let mut fds_w = fds_clone.write().await;
if fds_w.is_empty() {
break None;
}
if let Some(fd) = fds_w.remove(0) {
if fd.is_online().await {
break Some(fd);
}
}
};
if let Some(disk) = f_disk {
match disk
.as_ref()
.walk_dir(
WalkDirOptions {
bucket: opts_clone.bucket.clone(),
base_dir: opts_clone.path.clone(),
recursive: opts_clone.recursice,
report_notfound: opts_clone.report_not_found,
filter_prefix: opts_clone.filter_prefix.clone(),
forward_to: opts_clone.forward_to.clone(),
limit: opts_clone.per_disk_limit,
..Default::default()
},
&mut wr,
)
.await
{
Ok(_r) => {
need_fallback = false;
}
Err(err) => {
error!("walk dir2 err {:?}", &err);
let disk = match fds_clone.iter().find(|d| d.is_some()) {
Some(d) => {
if let Some(disk) = d.clone() {
disk
} else {
break;
}
}
} else {
break;
None => break,
};
match disk
.as_ref()
.walk_dir(
WalkDirOptions {
bucket: opts_clone.bucket.clone(),
base_dir: opts_clone.path.clone(),
recursive: opts_clone.recursice,
report_notfound: opts_clone.report_not_found,
filter_prefix: opts_clone.filter_prefix.clone(),
forward_to: opts_clone.forward_to.clone(),
limit: opts_clone.per_disk_limit,
..Default::default()
},
&mut wr,
)
.await
{
Ok(_r) => {
need_fallback = false;
}
Err(err) => {
error!("walk dir2 err {:?}", &err);
break;
}
}
}
@@ -295,7 +285,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
if let Some(partial_fn) = opts.partial.as_ref() {
partial_fn(MetaCacheEntries(top_entries), &errs).await;
}
break;
// break;
}
Ok(())
});