From 7d2c9b06de2c883e2135b232af1f5954384f34e0 Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 11 Dec 2024 16:05:29 +0800 Subject: [PATCH] fix:#159, fix:#160 --- ecstore/src/erasure.rs | 21 ++- ecstore/src/set_disk.rs | 288 +++++++++++++++++++------------------- ecstore/src/store_init.rs | 14 +- scripts/run.sh | 4 +- 4 files changed, 166 insertions(+), 161 deletions(-) diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index bd559e49..02ca900b 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -533,14 +533,21 @@ impl ShardReader { let mut ress = Vec::with_capacity(reader_length); for disk in self.readers.iter_mut() { - if disk.is_none() { - ress.push(None); - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } + // if disk.is_none() { + // ress.push(None); + // errors.push(Some(Error::new(DiskError::DiskNotFound))); + // continue; + // } - let disk: &mut BitrotReader = disk.as_mut().unwrap(); - futures.push(disk.read_at(self.offset, read_length)); + // let disk: &mut BitrotReader = disk.as_mut().unwrap(); + let offset = self.offset; + futures.push(async move { + if let Some(disk) = disk { + disk.read_at(offset, read_length).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 76da71f7..9f1cf580 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -194,25 +194,24 @@ impl SetDisks { let mut errs = Vec::with_capacity(disks.len()); for (i, disk) in disks.iter().enumerate() { - if disk.is_none() { - // ress.push(None); - errs.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - let disk = disk.as_ref().unwrap(); let mut file_info = file_infos[i].clone(); - if file_info.erasure.index == 0 { - file_info.erasure.index = i + 1; - } + futures.push(async move { + if file_info.erasure.index == 0 { + file_info.erasure.index = i + 1; + } - if !file_info.is_valid() { - // ress.push(None); - errs.push(Some(Error::new(DiskError::FileCorrupt))); - continue; - } + if !file_info.is_valid() { + return Err(Error::new(DiskError::FileCorrupt)); + } - futures.push(disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object)) + if let Some(disk) = disk { + disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object) + .await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }) } let mut disk_versions = vec![None; disks.len()]; @@ -326,20 +325,22 @@ impl SetDisks { let mut errs = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errs.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); - futures.push(disk.delete( - bucket, - &file_path, - DeleteOptions { - recursive: true, - ..Default::default() - }, - )); + let file_path = file_path.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.delete( + bucket, + &file_path, + DeleteOptions { + recursive: true, + ..Default::default() + }, + ) + .await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -367,12 +368,13 @@ impl SetDisks { let mut errs = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errs.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - let disk = disk.as_ref().unwrap(); - futures.push(disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths)) + futures.push(async move { + if let Some(disk) = disk { + disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }) } let results = join_all(futures).await; @@ -401,12 +403,14 @@ impl SetDisks { let mut errs = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errs.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - let disk = disk.as_ref().unwrap(); - futures.push(disk.rename_part(src_bucket, src_object, dst_bucket, dst_object, meta.clone())) + let meta = meta.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.rename_part(src_bucket, src_object, dst_bucket, dst_object, meta).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }) } let results = join_all(futures).await; @@ -487,15 +491,15 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for (i, disk) in disks.iter().enumerate() { - if disk.is_none() { - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); let mut file_info = files[i].clone(); file_info.erasure.index = i + 1; - futures.push(disk.write_metadata(org_bucket, bucket, prefix, file_info)); + futures.push(async move { + if let Some(disk) = disk { + disk.write_metadata(org_bucket, bucket, prefix, file_info).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -961,25 +965,22 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - ress.push(FileInfo::default()); - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); let opts = ReadOptions { read_data, healing }; futures.push(async move { - if version_id.is_empty() { - match disk.read_xl(bucket, object, read_data).await { - Ok(info) => { - let fi = file_info_from_raw(info, bucket, object, read_data).await?; - Ok(fi) + if let Some(disk) = disk { + if version_id.is_empty() { + match disk.read_xl(bucket, object, read_data).await { + Ok(info) => { + let fi = file_info_from_raw(info, bucket, object, read_data).await?; + Ok(fi) + } + Err(err) => Err(err), } - Err(err) => Err(err), + } else { + disk.read_version(org_bucket, bucket, object, version_id, &opts).await } } else { - disk.read_version(org_bucket, bucket, object, version_id, &opts).await + Err(Error::new(DiskError::DiskNotFound)) } }) } @@ -1023,14 +1024,13 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - ress.push(None); - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); - futures.push(disk.read_xl(bucket, object, read_data)); + futures.push(async move { + if let Some(disk) = disk { + disk.read_xl(bucket, object, read_data).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -1149,15 +1149,14 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - ress.push(None); - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); let req = req.clone(); - futures.push(disk.read_multiple(req)); + futures.push(async move { + if let Some(disk) = disk { + disk.read_multiple(req).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -1325,17 +1324,14 @@ impl SetDisks { let mut ress = Vec::new(); for disk in disks.iter() { - if disk.is_none() { - ress.push(None); - errs.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); let opts = opts.clone(); - // let mut wr = &mut wr; - futures.push(disk.walk_dir(opts)); - // tokio::spawn(async move { disk.walk_dir(opts, wr).await }); + futures.push(async move { + if let Some(disk) = disk { + disk.walk_dir(opts).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -1375,20 +1371,18 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); let file_path = file_path.clone(); let meta_file_path = format!("{}.meta", file_path); futures.push(async move { - disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) - .await?; - disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default()) - .await + if let Some(disk) = disk { + disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) + .await?; + disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default()) + .await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } }); } @@ -1419,13 +1413,15 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); - futures.push(disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())); + let file_path = file_path.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) + .await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -1453,20 +1449,21 @@ impl SetDisks { let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errors.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - - let disk = disk.as_ref().unwrap(); - futures.push(disk.delete( - bucket, - prefix, - DeleteOptions { - recursive: true, - ..Default::default() - }, - )); + futures.push(async move { + if let Some(disk) = disk { + disk.delete( + bucket, + prefix, + DeleteOptions { + recursive: true, + ..Default::default() + }, + ) + .await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -1605,7 +1602,7 @@ impl SetDisks { // TODO: 优化并发 可用数量中断 let (parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, vid.as_str(), read_data, false).await; // warn!("get_object_fileinfo parts_metadata {:?}", &parts_metadata); - // warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs); + warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs); let _min_disks = self.set_drive_count - self.default_parity_count; @@ -1626,9 +1623,9 @@ impl SetDisks { let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?; // debug!("get_object_fileinfo pick fi {:?}", &fi); - let online_disks: Vec> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect(); + // let online_disks: Vec> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect(); - Ok((fi, parts_metadata, online_disks)) + Ok((fi, parts_metadata, op_online_disks)) } #[allow(clippy::too_many_arguments)] @@ -1763,12 +1760,14 @@ impl SetDisks { let mut errs = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - errs.push(Some(Error::new(DiskError::DiskNotFound))); - continue; - } - let disk = disk.as_ref().unwrap(); - futures.push(disk.update_metadata(bucket, object, fi.clone(), opts)) + let fi = fi.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.update_metadata(bucket, object, fi, opts).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }) } let results = join_all(futures).await; @@ -3722,12 +3721,14 @@ impl StorageAPI for SetDisks { // let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - continue; - } - - let disk = disk.as_ref().unwrap(); - futures.push(disk.delete_versions(bucket, vers.clone(), DeleteOptions::default())); + let vers = vers.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.delete_versions(bucket, vers, DeleteOptions::default()).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; @@ -3960,19 +3961,16 @@ impl StorageAPI for SetDisks { let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); for disk in disks.iter() { - if disk.is_none() { + if let Some(disk) = disk { + // let writer = disk.append_file(RUSTFS_META_TMP_BUCKET, &tmp_part_path).await?; + let filewriter = disk + .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, data.content_length) + .await?; + let writer = new_bitrot_filewriter(filewriter, DEFAULT_BITROT_ALGO, erasure.shard_size(erasure.block_size)); + writers.push(Some(writer)); + } else { writers.push(None); - continue; } - let disk = disk.as_ref().unwrap().clone(); - - // let writer = disk.append_file(RUSTFS_META_TMP_BUCKET, &tmp_part_path).await?; - let filewriter = disk - .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, data.content_length) - .await?; - let writer = new_bitrot_filewriter(filewriter, DEFAULT_BITROT_ALGO, erasure.shard_size(erasure.block_size)); - - writers.push(Some(writer)); } let mut erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks, fi.erasure.block_size); diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index b08182bc..1b154c16 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -191,13 +191,13 @@ pub async fn load_format_erasure_all(disks: &[Option], heal: bool) -> let mut errors = Vec::with_capacity(disks.len()); for disk in disks.iter() { - if disk.is_none() { - datas.push(None); - errors.push(Some(Error::new(DiskError::DiskNotFound))); - } - - let disk = disk.as_ref().unwrap(); - futures.push(load_format_erasure(disk, heal)); + futures.push(async move { + if let Some(disk) = disk { + load_format_erasure(disk, heal).await + } else { + Err(Error::new(DiskError::DiskNotFound)) + } + }); } let results = join_all(futures).await; diff --git a/scripts/run.sh b/scripts/run.sh index 4f1942d1..5b1326e6 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -18,8 +18,8 @@ fi export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB" -# DATA_DIR_ARG="./target/volume/test{0...4}" -DATA_DIR_ARG="./target/volume/test" +DATA_DIR_ARG="./target/volume/test{0...4}" +# DATA_DIR_ARG="./target/volume/test" if [ -n "$1" ]; then DATA_DIR_ARG="$1"