diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs
index bd559e49..02ca900b 100644
--- a/ecstore/src/erasure.rs
+++ b/ecstore/src/erasure.rs
@@ -533,14 +533,21 @@ impl ShardReader {
let mut ress = Vec::with_capacity(reader_length);
for disk in self.readers.iter_mut() {
- if disk.is_none() {
- ress.push(None);
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
+ // if disk.is_none() {
+ // ress.push(None);
+ // errors.push(Some(Error::new(DiskError::DiskNotFound)));
+ // continue;
+ // }
- let disk: &mut BitrotReader = disk.as_mut().unwrap();
- futures.push(disk.read_at(self.offset, read_length));
+ // let disk: &mut BitrotReader = disk.as_mut().unwrap();
+ let offset = self.offset;
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.read_at(offset, read_length).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs
index 76da71f7..9f1cf580 100644
--- a/ecstore/src/set_disk.rs
+++ b/ecstore/src/set_disk.rs
@@ -194,25 +194,24 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
- if disk.is_none() {
- // ress.push(None);
- errs.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
- let disk = disk.as_ref().unwrap();
let mut file_info = file_infos[i].clone();
- if file_info.erasure.index == 0 {
- file_info.erasure.index = i + 1;
- }
+ futures.push(async move {
+ if file_info.erasure.index == 0 {
+ file_info.erasure.index = i + 1;
+ }
- if !file_info.is_valid() {
- // ress.push(None);
- errs.push(Some(Error::new(DiskError::FileCorrupt)));
- continue;
- }
+ if !file_info.is_valid() {
+ return Err(Error::new(DiskError::FileCorrupt));
+ }
- futures.push(disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object))
+ if let Some(disk) = disk {
+ disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object)
+ .await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ })
}
let mut disk_versions = vec![None; disks.len()];
@@ -326,20 +325,22 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- errs.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
- futures.push(disk.delete(
- bucket,
- &file_path,
- DeleteOptions {
- recursive: true,
- ..Default::default()
- },
- ));
+ let file_path = file_path.clone();
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.delete(
+ bucket,
+ &file_path,
+ DeleteOptions {
+ recursive: true,
+ ..Default::default()
+ },
+ )
+ .await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -367,12 +368,13 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- errs.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
- let disk = disk.as_ref().unwrap();
- futures.push(disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths))
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ })
}
let results = join_all(futures).await;
@@ -401,12 +403,14 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- errs.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
- let disk = disk.as_ref().unwrap();
- futures.push(disk.rename_part(src_bucket, src_object, dst_bucket, dst_object, meta.clone()))
+ let meta = meta.clone();
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.rename_part(src_bucket, src_object, dst_bucket, dst_object, meta).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ })
}
let results = join_all(futures).await;
@@ -487,15 +491,15 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
- if disk.is_none() {
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
let mut file_info = files[i].clone();
file_info.erasure.index = i + 1;
- futures.push(disk.write_metadata(org_bucket, bucket, prefix, file_info));
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.write_metadata(org_bucket, bucket, prefix, file_info).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -961,25 +965,22 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- ress.push(FileInfo::default());
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
let opts = ReadOptions { read_data, healing };
futures.push(async move {
- if version_id.is_empty() {
- match disk.read_xl(bucket, object, read_data).await {
- Ok(info) => {
- let fi = file_info_from_raw(info, bucket, object, read_data).await?;
- Ok(fi)
+ if let Some(disk) = disk {
+ if version_id.is_empty() {
+ match disk.read_xl(bucket, object, read_data).await {
+ Ok(info) => {
+ let fi = file_info_from_raw(info, bucket, object, read_data).await?;
+ Ok(fi)
+ }
+ Err(err) => Err(err),
}
- Err(err) => Err(err),
+ } else {
+ disk.read_version(org_bucket, bucket, object, version_id, &opts).await
}
} else {
- disk.read_version(org_bucket, bucket, object, version_id, &opts).await
+ Err(Error::new(DiskError::DiskNotFound))
}
})
}
@@ -1023,14 +1024,13 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- ress.push(None);
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
- futures.push(disk.read_xl(bucket, object, read_data));
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.read_xl(bucket, object, read_data).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -1149,15 +1149,14 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- ress.push(None);
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
let req = req.clone();
- futures.push(disk.read_multiple(req));
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.read_multiple(req).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -1325,17 +1324,14 @@ impl SetDisks {
let mut ress = Vec::new();
for disk in disks.iter() {
- if disk.is_none() {
- ress.push(None);
- errs.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
let opts = opts.clone();
- // let mut wr = &mut wr;
- futures.push(disk.walk_dir(opts));
- // tokio::spawn(async move { disk.walk_dir(opts, wr).await });
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.walk_dir(opts).await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -1375,20 +1371,18 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
let file_path = file_path.clone();
let meta_file_path = format!("{}.meta", file_path);
futures.push(async move {
- disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
- .await?;
- disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default())
- .await
+ if let Some(disk) = disk {
+ disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
+ .await?;
+ disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default())
+ .await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
});
}
@@ -1419,13 +1413,15 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
- futures.push(disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()));
+ let file_path = file_path.clone();
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
+ .await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -1453,20 +1449,21 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
- if disk.is_none() {
- errors.push(Some(Error::new(DiskError::DiskNotFound)));
- continue;
- }
-
- let disk = disk.as_ref().unwrap();
- futures.push(disk.delete(
- bucket,
- prefix,
- DeleteOptions {
- recursive: true,
- ..Default::default()
- },
- ));
+ futures.push(async move {
+ if let Some(disk) = disk {
+ disk.delete(
+ bucket,
+ prefix,
+ DeleteOptions {
+ recursive: true,
+ ..Default::default()
+ },
+ )
+ .await
+ } else {
+ Err(Error::new(DiskError::DiskNotFound))
+ }
+ });
}
let results = join_all(futures).await;
@@ -1605,7 +1602,7 @@ impl SetDisks {
// TODO: 优化并发 可用数量中断
let (parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, vid.as_str(), read_data, false).await;
// warn!("get_object_fileinfo parts_metadata {:?}", &parts_metadata);
- // warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
+ warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
let _min_disks = self.set_drive_count - self.default_parity_count;
@@ -1626,9 +1623,9 @@ impl SetDisks {
let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?;
// debug!("get_object_fileinfo pick fi {:?}", &fi);
- let online_disks: Vec