improve speed

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-05-08 18:37:28 +08:00
parent 29ddf4dbc8
commit 7a94363b38
7 changed files with 70 additions and 101 deletions

View File

@@ -261,7 +261,7 @@ impl LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn check_format_json(&self) -> Result<Metadata> {
let md = fs::metadata(&self.format_path).await.map_err(|e| match e.kind() {
let md = std::fs::metadata(&self.format_path).map_err(|e| match e.kind() {
ErrorKind::NotFound => DiskError::DiskNotFound,
ErrorKind::PermissionDenied => DiskError::FileAccessDenied,
_ => {
@@ -367,7 +367,7 @@ impl LocalDisk {
Ok(())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn delete_file(
&self,
base_path: &PathBuf,
@@ -690,6 +690,7 @@ impl LocalDisk {
}
// write_all_private with check_path_length
#[tracing::instrument(level = "debug", skip_all)]
pub async fn write_all_private(
&self,
volume: &str,
@@ -1215,7 +1216,7 @@ impl DiskAPI for LocalDisk {
Ok(data)
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip_all)]
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
self.write_all_public(volume, path, data).await
}
@@ -1723,7 +1724,7 @@ impl DiskAPI for LocalDisk {
Ok(())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn rename_data(
&self,
src_volume: &str,
@@ -1734,7 +1735,7 @@ impl DiskAPI for LocalDisk {
) -> Result<RenameDataResp> {
let src_volume_dir = self.get_bucket_path(src_volume)?;
if !skip_access_checks(src_volume) {
if let Err(e) = utils::fs::access(&src_volume_dir).await {
if let Err(e) = utils::fs::access_std(&src_volume_dir) {
info!("access checks failed, src_volume_dir: {:?}, err: {}", src_volume_dir, e.to_string());
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
}
@@ -1742,7 +1743,7 @@ impl DiskAPI for LocalDisk {
let dst_volume_dir = self.get_bucket_path(dst_volume)?;
if !skip_access_checks(dst_volume) {
if let Err(e) = utils::fs::access(&dst_volume_dir).await {
if let Err(e) = utils::fs::access_std(&dst_volume_dir) {
info!("access checks failed, dst_volume_dir: {:?}, err: {}", dst_volume_dir, e.to_string());
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
}
@@ -1915,7 +1916,7 @@ impl DiskAPI for LocalDisk {
if let Some(src_file_path_parent) = src_file_path.parent() {
if src_volume != super::RUSTFS_META_MULTIPART_BUCKET {
let _ = utils::fs::remove(src_file_path_parent).await;
let _ = utils::fs::remove_std(src_file_path_parent);
} else {
let _ = self
.delete_file(&dst_volume_dir, &src_file_path_parent.to_path_buf(), true, false)

View File

@@ -108,6 +108,7 @@ pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> Result<Vec<String>>
Ok(volumes)
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn rename_all(
src_file_path: impl AsRef<Path>,
dst_file_path: impl AsRef<Path>,
@@ -136,7 +137,7 @@ pub async fn reliable_rename(
base_dir: impl AsRef<Path>,
) -> io::Result<()> {
if let Some(parent) = dst_file_path.as_ref().parent() {
if !file_exists(parent).await {
if !file_exists(parent) {
info!("reliable_rename reliable_mkdir_all parent: {:?}", parent);
reliable_mkdir_all(parent, base_dir.as_ref()).await?;
}
@@ -144,7 +145,7 @@ pub async fn reliable_rename(
let mut i = 0;
loop {
if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await {
if let Err(e) = utils::fs::rename_std(src_file_path.as_ref(), dst_file_path.as_ref()) {
if os_is_not_exist(&e) && i == 0 {
i += 1;
continue;
@@ -221,6 +222,6 @@ pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>
Ok(())
}
pub async fn file_exists(path: impl AsRef<Path>) -> bool {
fs::metadata(path.as_ref()).await.map(|_| true).unwrap_or(false)
pub fn file_exists(path: impl AsRef<Path>) -> bool {
std::fs::metadata(path.as_ref()).map(|_| true).unwrap_or(false)
}

View File

@@ -135,77 +135,6 @@ impl Erasure {
}
}
task.await?
// // let stream = ChunkedStream::new(body, self.block_size);
// let stream = ChunkedStream::new(body, total_size, self.block_size, false);
// let mut total: usize = 0;
// // let mut idx = 0;
// pin_mut!(stream);
// // warn!("encode start...");
// loop {
// match stream.next().await {
// Some(result) => match result {
// Ok(data) => {
// total += data.len();
// // EOF
// if data.is_empty() {
// break;
// }
// // idx += 1;
// // warn!("encode {} get data {:?}", data.len(), data.to_vec());
// let blocks = self.encode_data(data.as_ref())?;
// // warn!(
// // "encode shard size: {}/{} from block_size {}, total_size {} ",
// // blocks[0].len(),
// // blocks.len(),
// // data.len(),
// // total_size
// // );
// let mut errs = Vec::new();
// for (i, w_op) in writers.iter_mut().enumerate() {
// if let Some(w) = w_op {
// match w.write(blocks[i].as_ref()).await {
// Ok(_) => errs.push(None),
// Err(e) => errs.push(Some(e)),
// }
// } else {
// errs.push(Some(Error::new(DiskError::DiskNotFound)));
// }
// }
// let none_count = errs.iter().filter(|&x| x.is_none()).count();
// if none_count >= write_quorum {
// continue;
// }
// if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
// warn!("Erasure encode errs {:?}", &errs);
// return Err(err);
// }
// }
// Err(e) => {
// warn!("poll result err {:?}", &e);
// return Err(Error::msg(e.to_string()));
// }
// },
// None => {
// // warn!("poll empty result");
// break;
// }
// }
// }
// let _ = close_bitrot_writers(writers).await?;
// Ok(total)
}
pub async fn decode<W>(

View File

@@ -58,10 +58,12 @@ impl FileMeta {
}
// isXL2V1Format
#[tracing::instrument(level = "debug", skip_all)]
pub fn is_xl2_v1_format(buf: &[u8]) -> bool {
!matches!(Self::check_xl2_v1(buf), Err(_e))
}
#[tracing::instrument(level = "debug", skip_all)]
pub fn load(buf: &[u8]) -> Result<FileMeta> {
let mut xl = FileMeta::default();
xl.unmarshal_msg(buf)?;
@@ -245,7 +247,7 @@ impl FileMeta {
}
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip_all)]
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut wr = Vec::new();
@@ -363,6 +365,7 @@ impl FileMeta {
}
// shard_data_dir_count 查询 vid下data_dir的数量
#[tracing::instrument(level = "debug", skip_all)]
pub fn shard_data_dir_count(&self, vid: &Option<Uuid>, data_dir: &Option<Uuid>) -> usize {
self.versions
.iter()
@@ -434,6 +437,7 @@ impl FileMeta {
}
// 添加版本
#[tracing::instrument(level = "debug", skip_all)]
pub fn add_version(&mut self, fi: FileInfo) -> Result<()> {
let vid = fi.version_id;

View File

@@ -284,10 +284,20 @@ impl SetDisks {
// let mut ress = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
let mut file_info = file_infos[i].clone();
let src_bucket = Arc::new(src_bucket.to_string());
let src_object = Arc::new(src_object.to_string());
let dst_bucket = Arc::new(dst_bucket.to_string());
let dst_object = Arc::new(dst_object.to_string());
futures.push(async move {
for (i, (disk, file_info)) in disks.iter().zip(file_infos.iter()).enumerate() {
let mut file_info = file_info.clone();
let disk = disk.clone();
let src_bucket = src_bucket.clone();
let src_object = src_object.clone();
let dst_object = dst_object.clone();
let dst_bucket = dst_bucket.clone();
futures.push(tokio::spawn(async move {
if file_info.erasure.index == 0 {
file_info.erasure.index = i + 1;
}
@@ -297,12 +307,12 @@ impl SetDisks {
}
if let Some(disk) = disk {
disk.rename_data(src_bucket, src_object, file_info, dst_bucket, dst_object)
disk.rename_data(&src_bucket, &src_object, file_info, &dst_bucket, &dst_object)
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
})
}));
}
let mut disk_versions = vec![None; disks.len()];
@@ -311,15 +321,13 @@ impl SetDisks {
let results = join_all(futures).await;
for (idx, result) in results.iter().enumerate() {
match result {
match result.as_ref().map_err(|_| Error::new(DiskError::Unexpected))? {
Ok(res) => {
data_dirs[idx] = res.old_data_dir;
disk_versions[idx].clone_from(&res.sign);
// ress.push(Some(res));
errs.push(None);
}
Err(e) => {
// ress.push(None);
errs.push(Some(clone_err(e)));
}
}
@@ -336,11 +344,14 @@ impl SetDisks {
if let Some(disk) = disks[i].as_ref() {
let fi = file_infos[i].clone();
let old_data_dir = data_dirs[i];
futures.push(async move {
let disk = disk.clone();
let src_bucket = src_bucket.clone();
let src_object = src_object.clone();
futures.push(tokio::spawn(async move {
let _ = disk
.delete_version(
src_bucket,
src_object,
&src_bucket,
&src_object,
fi,
false,
DeleteOptions {
@@ -354,7 +365,7 @@ impl SetDisks {
debug!("rename_data delete_version err {:?}", e);
e
});
});
}));
}
}
@@ -407,7 +418,7 @@ impl SetDisks {
}
#[allow(dead_code)]
#[tracing::instrument(level = "info", skip(self, disks))]
#[tracing::instrument(level = "debug", skip(self, disks))]
async fn commit_rename_data_dir(
&self,
disks: &[Option<DiskStore>],
@@ -416,14 +427,17 @@ impl SetDisks {
data_dir: &str,
write_quorum: usize,
) -> Result<()> {
let file_path = format!("{}/{}", object, data_dir);
let file_path = Arc::new(format!("{}/{}", object, data_dir));
let bucket = Arc::new(bucket.to_string());
let futures = disks.iter().map(|disk| {
let file_path = file_path.clone();
async move {
let bucket = bucket.clone();
let disk = disk.clone();
tokio::spawn(async move {
if let Some(disk) = disk {
match disk
.delete(
bucket,
&bucket,
&file_path,
DeleteOptions {
recursive: true,
@@ -438,9 +452,16 @@ impl SetDisks {
} else {
Some(Error::new(DiskError::DiskNotFound))
}
}
})
});
let errs: Vec<Option<Error>> = join_all(futures).await;
let errs: Vec<Option<Error>> = join_all(futures)
.await
.into_iter()
.map(|e| match e {
Ok(e) => e,
Err(_) => Some(Error::new(DiskError::Unexpected)),
})
.collect();
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
return Err(err);

View File

@@ -2549,6 +2549,7 @@ fn check_abort_multipart_args(bucket: &str, object: &str, upload_id: &str) -> Re
check_multipart_object_args(bucket, object, upload_id)
}
#[tracing::instrument(level = "debug")]
fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
if !is_meta_bucketname(bucket) && check_valid_bucket_name_strict(bucket).is_err() {
return Err(Error::new(StorageError::BucketNameInvalid(bucket.to_string())));

View File

@@ -106,6 +106,11 @@ pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
Ok(())
}
pub fn access_std(path: impl AsRef<Path>) -> io::Result<()> {
std::fs::metadata(path)?;
Ok(())
}
pub async fn lstat(path: impl AsRef<Path>) -> io::Result<Metadata> {
fs::metadata(path).await
}
@@ -114,6 +119,7 @@ pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir_all(path.as_ref()).await
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
let meta = fs::metadata(path.as_ref()).await?;
if meta.is_dir() {
@@ -132,6 +138,7 @@ pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
let meta = std::fs::metadata(path.as_ref())?;
if meta.is_dir() {
@@ -158,6 +165,11 @@ pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<
fs::rename(from, to).await
}
pub fn rename_std(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
std::fs::rename(from, to)
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn read_file(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
fs::read(path.as_ref()).await
}