fix:#159, fix:#160

This commit is contained in:
weisd
2024-12-11 16:05:29 +08:00
parent e80ca05a2e
commit 7d2c9b06de
4 changed files with 166 additions and 161 deletions

View File

@@ -533,14 +533,21 @@ impl ShardReader {
let mut ress = Vec::with_capacity(reader_length);
for disk in self.readers.iter_mut() {
if disk.is_none() {
ress.push(None);
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
// if disk.is_none() {
// ress.push(None);
// errors.push(Some(Error::new(DiskError::DiskNotFound)));
// continue;
// }
let disk: &mut BitrotReader = disk.as_mut().unwrap();
futures.push(disk.read_at(self.offset, read_length));
// let disk: &mut BitrotReader = disk.as_mut().unwrap();
let offset = self.offset;
futures.push(async move {
if let Some(disk) = disk {
disk.read_at(offset, read_length).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;

View File

@@ -194,25 +194,24 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
if disk.is_none() {
// ress.push(None);
errs.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
let mut file_info = file_infos[i].clone();
if file_info.erasure.index == 0 {
file_info.erasure.index = i + 1;
}
futures.push(async move {
if file_info.erasure.index == 0 {
file_info.erasure.index = i + 1;
}
if !file_info.is_valid() {
// ress.push(None);
errs.push(Some(Error::new(DiskError::FileCorrupt)));
continue;
}
if !file_info.is_valid() {
return Err(Error::new(DiskError::FileCorrupt));
}
futures.push(disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object))
if let Some(disk) = disk {
disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object)
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
})
}
let mut disk_versions = vec![None; disks.len()];
@@ -326,20 +325,22 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errs.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.delete(
bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
));
let file_path = file_path.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.delete(
bucket,
&file_path,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -367,12 +368,13 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errs.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths))
futures.push(async move {
if let Some(disk) = disk {
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
})
}
let results = join_all(futures).await;
@@ -401,12 +403,14 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errs.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.rename_part(src_bucket, src_object, dst_bucket, dst_object, meta.clone()))
let meta = meta.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.rename_part(src_bucket, src_object, dst_bucket, dst_object, meta).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
})
}
let results = join_all(futures).await;
@@ -487,15 +491,15 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
if disk.is_none() {
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
let mut file_info = files[i].clone();
file_info.erasure.index = i + 1;
futures.push(disk.write_metadata(org_bucket, bucket, prefix, file_info));
futures.push(async move {
if let Some(disk) = disk {
disk.write_metadata(org_bucket, bucket, prefix, file_info).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -961,25 +965,22 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
ress.push(FileInfo::default());
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
let opts = ReadOptions { read_data, healing };
futures.push(async move {
if version_id.is_empty() {
match disk.read_xl(bucket, object, read_data).await {
Ok(info) => {
let fi = file_info_from_raw(info, bucket, object, read_data).await?;
Ok(fi)
if let Some(disk) = disk {
if version_id.is_empty() {
match disk.read_xl(bucket, object, read_data).await {
Ok(info) => {
let fi = file_info_from_raw(info, bucket, object, read_data).await?;
Ok(fi)
}
Err(err) => Err(err),
}
Err(err) => Err(err),
} else {
disk.read_version(org_bucket, bucket, object, version_id, &opts).await
}
} else {
disk.read_version(org_bucket, bucket, object, version_id, &opts).await
Err(Error::new(DiskError::DiskNotFound))
}
})
}
@@ -1023,14 +1024,13 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
ress.push(None);
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.read_xl(bucket, object, read_data));
futures.push(async move {
if let Some(disk) = disk {
disk.read_xl(bucket, object, read_data).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -1149,15 +1149,14 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
ress.push(None);
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
let req = req.clone();
futures.push(disk.read_multiple(req));
futures.push(async move {
if let Some(disk) = disk {
disk.read_multiple(req).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -1325,17 +1324,14 @@ impl SetDisks {
let mut ress = Vec::new();
for disk in disks.iter() {
if disk.is_none() {
ress.push(None);
errs.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
let opts = opts.clone();
// let mut wr = &mut wr;
futures.push(disk.walk_dir(opts));
// tokio::spawn(async move { disk.walk_dir(opts, wr).await });
futures.push(async move {
if let Some(disk) = disk {
disk.walk_dir(opts).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -1375,20 +1371,18 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
let file_path = file_path.clone();
let meta_file_path = format!("{}.meta", file_path);
futures.push(async move {
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
.await?;
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default())
.await
if let Some(disk) = disk {
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
.await?;
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default())
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
@@ -1419,13 +1413,15 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()));
let file_path = file_path.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -1453,20 +1449,21 @@ impl SetDisks {
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.delete(
bucket,
prefix,
DeleteOptions {
recursive: true,
..Default::default()
},
));
futures.push(async move {
if let Some(disk) = disk {
disk.delete(
bucket,
prefix,
DeleteOptions {
recursive: true,
..Default::default()
},
)
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -1605,7 +1602,7 @@ impl SetDisks {
// TODO: 优化并发 可用数量中断
let (parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, vid.as_str(), read_data, false).await;
// warn!("get_object_fileinfo parts_metadata {:?}", &parts_metadata);
// warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
let _min_disks = self.set_drive_count - self.default_parity_count;
@@ -1626,9 +1623,9 @@ impl SetDisks {
let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?;
// debug!("get_object_fileinfo pick fi {:?}", &fi);
let online_disks: Vec<Option<DiskStore>> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect();
// let online_disks: Vec<Option<DiskStore>> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect();
Ok((fi, parts_metadata, online_disks))
Ok((fi, parts_metadata, op_online_disks))
}
#[allow(clippy::too_many_arguments)]
@@ -1763,12 +1760,14 @@ impl SetDisks {
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
errs.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.update_metadata(bucket, object, fi.clone(), opts))
let fi = fi.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.update_metadata(bucket, object, fi, opts).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
})
}
let results = join_all(futures).await;
@@ -3722,12 +3721,14 @@ impl StorageAPI for SetDisks {
// let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
continue;
}
let disk = disk.as_ref().unwrap();
futures.push(disk.delete_versions(bucket, vers.clone(), DeleteOptions::default()));
let vers = vers.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.delete_versions(bucket, vers, DeleteOptions::default()).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;
@@ -3960,19 +3961,16 @@ impl StorageAPI for SetDisks {
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);
for disk in disks.iter() {
if disk.is_none() {
if let Some(disk) = disk {
// let writer = disk.append_file(RUSTFS_META_TMP_BUCKET, &tmp_part_path).await?;
let filewriter = disk
.create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, data.content_length)
.await?;
let writer = new_bitrot_filewriter(filewriter, DEFAULT_BITROT_ALGO, erasure.shard_size(erasure.block_size));
writers.push(Some(writer));
} else {
writers.push(None);
continue;
}
let disk = disk.as_ref().unwrap().clone();
// let writer = disk.append_file(RUSTFS_META_TMP_BUCKET, &tmp_part_path).await?;
let filewriter = disk
.create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, data.content_length)
.await?;
let writer = new_bitrot_filewriter(filewriter, DEFAULT_BITROT_ALGO, erasure.shard_size(erasure.block_size));
writers.push(Some(writer));
}
let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size);

View File

@@ -191,13 +191,13 @@ pub async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) ->
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
datas.push(None);
errors.push(Some(Error::new(DiskError::DiskNotFound)));
}
let disk = disk.as_ref().unwrap();
futures.push(load_format_erasure(disk, heal));
futures.push(async move {
if let Some(disk) = disk {
load_format_erasure(disk, heal).await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
let results = join_all(futures).await;

View File

@@ -18,8 +18,8 @@ fi
export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB"
# DATA_DIR_ARG="./target/volume/test{0...4}"
DATA_DIR_ARG="./target/volume/test"
DATA_DIR_ARG="./target/volume/test{0...4}"
# DATA_DIR_ARG="./target/volume/test"
if [ -n "$1" ]; then
DATA_DIR_ARG="$1"