From 7a94363b389e8cdfebef00a2c54b703250e5e20c Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Thu, 8 May 2025 18:37:28 +0800 Subject: [PATCH] improve speed Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/disk/local.rs | 15 +++++---- ecstore/src/disk/os.rs | 9 ++--- ecstore/src/erasure.rs | 71 --------------------------------------- ecstore/src/file_meta.rs | 6 +++- ecstore/src/set_disk.rs | 57 +++++++++++++++++++++---------- ecstore/src/store.rs | 1 + ecstore/src/utils/fs.rs | 12 +++++++ 7 files changed, 70 insertions(+), 101 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 45bea2d9..bd10ccf6 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -261,7 +261,7 @@ impl LocalDisk { #[tracing::instrument(level = "debug", skip(self))] async fn check_format_json(&self) -> Result { - let md = fs::metadata(&self.format_path).await.map_err(|e| match e.kind() { + let md = std::fs::metadata(&self.format_path).map_err(|e| match e.kind() { ErrorKind::NotFound => DiskError::DiskNotFound, ErrorKind::PermissionDenied => DiskError::FileAccessDenied, _ => { @@ -367,7 +367,7 @@ impl LocalDisk { Ok(()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip(self))] pub async fn delete_file( &self, base_path: &PathBuf, @@ -690,6 +690,7 @@ impl LocalDisk { } // write_all_private with check_path_length + #[tracing::instrument(level = "debug", skip_all)] pub async fn write_all_private( &self, volume: &str, @@ -1215,7 +1216,7 @@ impl DiskAPI for LocalDisk { Ok(data) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip_all)] async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()> { self.write_all_public(volume, path, data).await } @@ -1723,7 +1724,7 @@ impl DiskAPI for LocalDisk { Ok(()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "debug", skip(self))] async fn rename_data( &self, src_volume: &str, @@ -1734,7 +1735,7 @@ impl DiskAPI for LocalDisk { ) -> Result { let src_volume_dir = self.get_bucket_path(src_volume)?; if !skip_access_checks(src_volume) { - if let Err(e) = utils::fs::access(&src_volume_dir).await { + if let Err(e) = utils::fs::access_std(&src_volume_dir) { info!("access checks failed, src_volume_dir: {:?}, err: {}", src_volume_dir, e.to_string()); return Err(convert_access_error(e, DiskError::VolumeAccessDenied)); } @@ -1742,7 +1743,7 @@ impl DiskAPI for LocalDisk { let dst_volume_dir = self.get_bucket_path(dst_volume)?; if !skip_access_checks(dst_volume) { - if let Err(e) = utils::fs::access(&dst_volume_dir).await { + if let Err(e) = utils::fs::access_std(&dst_volume_dir) { info!("access checks failed, dst_volume_dir: {:?}, err: {}", dst_volume_dir, e.to_string()); return Err(convert_access_error(e, DiskError::VolumeAccessDenied)); } @@ -1915,7 +1916,7 @@ impl DiskAPI for LocalDisk { if let Some(src_file_path_parent) = src_file_path.parent() { if src_volume != super::RUSTFS_META_MULTIPART_BUCKET { - let _ = utils::fs::remove(src_file_path_parent).await; + let _ = utils::fs::remove_std(src_file_path_parent); } else { let _ = self .delete_file(&dst_volume_dir, &src_file_path_parent.to_path_buf(), true, false) diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index ae88611a..a6dfb15b 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -108,6 +108,7 @@ pub async fn read_dir(path: impl AsRef, count: i32) -> Result> Ok(volumes) } +#[tracing::instrument(level = "debug", skip_all)] pub async fn rename_all( src_file_path: impl AsRef, dst_file_path: impl AsRef, @@ -136,7 +137,7 @@ pub async fn reliable_rename( base_dir: impl AsRef, ) -> io::Result<()> { if let Some(parent) = dst_file_path.as_ref().parent() { - if !file_exists(parent).await { + if !file_exists(parent) { info!("reliable_rename reliable_mkdir_all parent: {:?}", parent); reliable_mkdir_all(parent, base_dir.as_ref()).await?; } @@ -144,7 +145,7 @@ pub async fn reliable_rename( let mut i = 0; loop { - if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await { + if let Err(e) = utils::fs::rename_std(src_file_path.as_ref(), dst_file_path.as_ref()) { if os_is_not_exist(&e) && i == 0 { i += 1; continue; @@ -221,6 +222,6 @@ pub async fn os_mkdir_all(dir_path: impl AsRef, base_dir: impl AsRef Ok(()) } -pub async fn file_exists(path: impl AsRef) -> bool { - fs::metadata(path.as_ref()).await.map(|_| true).unwrap_or(false) +pub fn file_exists(path: impl AsRef) -> bool { + std::fs::metadata(path.as_ref()).map(|_| true).unwrap_or(false) } diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 942f461b..ac0eee89 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -135,77 +135,6 @@ impl Erasure { } } task.await? - - // // let stream = ChunkedStream::new(body, self.block_size); - // let stream = ChunkedStream::new(body, total_size, self.block_size, false); - // let mut total: usize = 0; - // // let mut idx = 0; - // pin_mut!(stream); - - // // warn!("encode start..."); - - // loop { - // match stream.next().await { - // Some(result) => match result { - // Ok(data) => { - // total += data.len(); - - // // EOF - // if data.is_empty() { - // break; - // } - - // // idx += 1; - // // warn!("encode {} get data {:?}", data.len(), data.to_vec()); - - // let blocks = self.encode_data(data.as_ref())?; - - // // warn!( - // // "encode shard size: {}/{} from block_size {}, total_size {} ", - // // blocks[0].len(), - // // blocks.len(), - // // data.len(), - // // total_size - // // ); - - // let mut errs = Vec::new(); - - // for (i, w_op) in writers.iter_mut().enumerate() { - // if let Some(w) = w_op { - // match w.write(blocks[i].as_ref()).await { - // Ok(_) => errs.push(None), - // Err(e) => errs.push(Some(e)), - // } - // } else { - // errs.push(Some(Error::new(DiskError::DiskNotFound))); - // } - // } - - // let none_count = errs.iter().filter(|&x| x.is_none()).count(); - // if none_count >= write_quorum { - // continue; - // } - - // if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { - // warn!("Erasure encode errs {:?}", &errs); - // return Err(err); - // } - // } - // Err(e) => { - // warn!("poll result err {:?}", &e); - // return Err(Error::msg(e.to_string())); - // } - // }, - // None => { - // // warn!("poll empty result"); - // break; - // } - // } - // } - - // let _ = close_bitrot_writers(writers).await?; - - // Ok(total) } pub async fn decode( diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index e4da3882..e374e456 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -58,10 +58,12 @@ impl FileMeta { } // isXL2V1Format + #[tracing::instrument(level = "debug", skip_all)] pub fn is_xl2_v1_format(buf: &[u8]) -> bool { !matches!(Self::check_xl2_v1(buf), Err(_e)) } + #[tracing::instrument(level = "debug", skip_all)] pub fn load(buf: &[u8]) -> Result { let mut xl = FileMeta::default(); xl.unmarshal_msg(buf)?; @@ -245,7 +247,7 @@ impl FileMeta { } } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip_all)] pub fn marshal_msg(&self) -> Result> { let mut wr = Vec::new(); @@ -363,6 +365,7 @@ impl FileMeta { } // shard_data_dir_count 查询 vid下data_dir的数量 + #[tracing::instrument(level = "debug", skip_all)] pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { self.versions .iter() @@ -434,6 +437,7 @@ impl FileMeta { } // 添加版本 + #[tracing::instrument(level = "debug", skip_all)] pub fn add_version(&mut self, fi: FileInfo) -> Result<()> { let vid = fi.version_id; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index abf7422a..6e738783 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -284,10 +284,20 @@ impl SetDisks { // let mut ress = Vec::with_capacity(disks.len()); let mut errs = Vec::with_capacity(disks.len()); - for (i, disk) in disks.iter().enumerate() { - let mut file_info = file_infos[i].clone(); + let src_bucket = Arc::new(src_bucket.to_string()); + let src_object = Arc::new(src_object.to_string()); + let dst_bucket = Arc::new(dst_bucket.to_string()); + let dst_object = Arc::new(dst_object.to_string()); - futures.push(async move { + for (i, (disk, file_info)) in disks.iter().zip(file_infos.iter()).enumerate() { + let mut file_info = file_info.clone(); + let disk = disk.clone(); + let src_bucket = src_bucket.clone(); + let src_object = src_object.clone(); + let dst_object = dst_object.clone(); + let dst_bucket = dst_bucket.clone(); + + futures.push(tokio::spawn(async move { if file_info.erasure.index == 0 { file_info.erasure.index = i + 1; } @@ -297,12 +307,12 @@ impl SetDisks { } if let Some(disk) = disk { - disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object) + disk.rename_data(&src_bucket, &src_object, file_info, &dst_bucket, &dst_object) .await } else { Err(Error::new(DiskError::DiskNotFound)) } - }) + })); } let mut disk_versions = vec![None; disks.len()]; @@ -311,15 +321,13 @@ impl SetDisks { let results = join_all(futures).await; for (idx, result) in results.iter().enumerate() { - match result { + match result.as_ref().map_err(|_| Error::new(DiskError::Unexpected))? { Ok(res) => { data_dirs[idx] = res.old_data_dir; disk_versions[idx].clone_from(&res.sign); - // ress.push(Some(res)); errs.push(None); } Err(e) => { - // ress.push(None); errs.push(Some(clone_err(e))); } } @@ -336,11 +344,14 @@ impl SetDisks { if let Some(disk) = disks[i].as_ref() { let fi = file_infos[i].clone(); let old_data_dir = data_dirs[i]; - futures.push(async move { + let disk = disk.clone(); + let src_bucket = src_bucket.clone(); + let src_object = src_object.clone(); + futures.push(tokio::spawn(async move { let _ = disk .delete_version( - src_bucket, - src_object, + &src_bucket, + &src_object, fi, false, DeleteOptions { @@ -354,7 +365,7 @@ impl SetDisks { debug!("rename_data delete_version err {:?}", e); e }); - }); + })); } } @@ -407,7 +418,7 @@ impl SetDisks { } #[allow(dead_code)] - #[tracing::instrument(level = "info", skip(self, disks))] + #[tracing::instrument(level = "debug", skip(self, disks))] async fn commit_rename_data_dir( &self, disks: &[Option], @@ -416,14 +427,17 @@ impl SetDisks { data_dir: &str, write_quorum: usize, ) -> Result<()> { - let file_path = format!("{}/{}", object, data_dir); + let file_path = Arc::new(format!("{}/{}", object, data_dir)); + let bucket = Arc::new(bucket.to_string()); let futures = disks.iter().map(|disk| { let file_path = file_path.clone(); - async move { + let bucket = bucket.clone(); + let disk = disk.clone(); + tokio::spawn(async move { if let Some(disk) = disk { match disk .delete( - bucket, + &bucket, &file_path, DeleteOptions { recursive: true, @@ -438,9 +452,16 @@ impl SetDisks { } else { Some(Error::new(DiskError::DiskNotFound)) } - } + }) }); - let errs: Vec> = join_all(futures).await; + let errs: Vec> = join_all(futures) + .await + .into_iter() + .map(|e| match e { + Ok(e) => e, + Err(_) => Some(Error::new(DiskError::Unexpected)), + }) + .collect(); if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { return Err(err); diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 5a207577..d480e0ac 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -2549,6 +2549,7 @@ fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Re check_multipart_object_args(bucket, object, upload_id) } +#[tracing::instrument(level = "debug")] fn check_put_object_args(bucket: &str, object: &str) -> Result<()> { if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() { return Err(Error::new(StorageError::BucketNameInvalid(bucket.to_string()))); diff --git a/ecstore/src/utils/fs.rs b/ecstore/src/utils/fs.rs index 9c0c591f..f60e28f0 100644 --- a/ecstore/src/utils/fs.rs +++ b/ecstore/src/utils/fs.rs @@ -106,6 +106,11 @@ pub async fn access(path: impl AsRef) -> io::Result<()> { Ok(()) } +pub fn access_std(path: impl AsRef) -> io::Result<()> { + std::fs::metadata(path)?; + Ok(()) +} + pub async fn lstat(path: impl AsRef) -> io::Result { fs::metadata(path).await } @@ -114,6 +119,7 @@ pub async fn make_dir_all(path: impl AsRef) -> io::Result<()> { fs::create_dir_all(path.as_ref()).await } +#[tracing::instrument(level = "debug", skip_all)] pub async fn remove(path: impl AsRef) -> io::Result<()> { let meta = fs::metadata(path.as_ref()).await?; if meta.is_dir() { @@ -132,6 +138,7 @@ pub async fn remove_all(path: impl AsRef) -> io::Result<()> { } } +#[tracing::instrument(level = "debug", skip_all)] pub fn remove_std(path: impl AsRef) -> io::Result<()> { let meta = std::fs::metadata(path.as_ref())?; if meta.is_dir() { @@ -158,6 +165,11 @@ pub async fn rename(from: impl AsRef, to: impl AsRef) -> io::Result< fs::rename(from, to).await } +pub fn rename_std(from: impl AsRef, to: impl AsRef) -> io::Result<()> { + std::fs::rename(from, to) +} + +#[tracing::instrument(level = "debug", skip_all)] pub async fn read_file(path: impl AsRef) -> io::Result> { fs::read(path.as_ref()).await }