diff --git a/TODO.md b/TODO.md index 3519bd92..9c8e4653 100644 --- a/TODO.md +++ b/TODO.md @@ -63,5 +63,6 @@ ## 性能优化 - [ ] bitrot impl AsyncRead/AsyncWrite - [ ] erasure 并发读写 -- [ ] 完善删除逻辑, 并发处理,先移动到回收站,空间不足时清空回收站 +- [x] 完善删除逻辑, 并发处理,先移动到回收站, +- [ ] 空间不足时清空回收站 - [ ] list_object 使用reader传输 \ No newline at end of file diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index a98c9d58..bdd99b84 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -565,3 +565,13 @@ pub fn is_err_os_not_exist(err: &Error) -> bool { false } } + +pub fn is_err_os_disk_full(err: &Error) -> bool { + if let Some(os_err) = err.downcast_ref::() { + is_sys_err_no_space(os_err) + } else if let Some(e) = err.downcast_ref::() { + e == &DiskError::DiskFull + } else { + false + } +} diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 69b1717c..ee1a9a21 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1,6 +1,6 @@ use super::error::{ - is_err_file_not_found, is_err_file_version_not_found, is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files, - os_is_not_exist, os_is_permission, + is_err_file_not_found, is_err_file_version_not_found, is_err_os_disk_full, is_sys_err_io, is_sys_err_not_empty, + is_sys_err_too_many_files, os_is_not_exist, os_is_permission, }; use super::os::{is_root_disk, rename_all}; use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; @@ -35,11 +35,11 @@ use crate::set_disk::{ CHECK_PART_VOLUME_NOT_FOUND, }; use crate::store_api::{BitrotAlgorithm, StorageAPI}; -use crate::utils::fs::{access, lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; +use crate::utils::fs::{access, lstat, remove, remove_all, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY}; use crate::utils::os::get_info; use crate::utils::path::{ - self, clean, decode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, - SLASH_SEPARATOR, + self, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX, + GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, }; use crate::{ file_meta::FileMeta, @@ -308,44 +308,46 @@ impl LocalDisk { // }) // } - pub async fn move_to_trash(&self, delete_path: &PathBuf, _recursive: bool, _immediate_purge: bool) -> Result<()> { + pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> { let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; if let Some(parent) = trash_path.parent() { if !parent.exists() { fs::create_dir_all(parent).await?; } } - // debug!("move_to_trash from:{:?} to {:?}", &delete_path, &trash_path); - // TODO: 清空回收站 - if let Err(err) = fs::rename(&delete_path, &trash_path).await { - match err.kind() { - ErrorKind::NotFound => (), - _ => { - warn!("delete_file rename {:?} err {:?}", &delete_path, &err); - return Err(Error::from(err)); - } - } + + let err = if recursive { + rename_all(delete_path, trash_path, self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?) + .await + .err() + } else { + rename(&delete_path, &trash_path).await.map_err(Error::new).err() + }; + + if immediate_purge || delete_path.to_string_lossy().ends_with(path::SLASH_SEPARATOR) { + warn!("move_to_trash immediate_purge {:?}", &delete_path.to_string_lossy()); + let trash_path2 = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; + let _ = rename_all( + encode_dir_object(delete_path.to_string_lossy().as_ref()), + trash_path2, + self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?, + ) + .await; } - // TODO: 优化 FIXME: 先清空回收站吧,有时间再添加判断逻辑 - - if let Err(err) = { - if trash_path.is_dir() { - fs::remove_dir_all(&trash_path).await - } else { - fs::remove_file(&trash_path).await - } - } { - match err.kind() { - ErrorKind::NotFound => (), - _ => { - warn!("delete_file remove trash {:?} err {:?}", &trash_path, &err); - return Err(Error::from(err)); + if let Some(err) = err { + if is_err_os_disk_full(&err) { + if recursive { + remove_all(delete_path).await?; + } else { + remove(delete_path).await?; } } + + return Ok(()); } - // TODO: immediate + // TODO: 异步通知 检测硬盘空间 清空回收站 Ok(()) } @@ -1971,7 +1973,7 @@ impl DiskAPI for LocalDisk { created: modtime, }) } - async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> { + async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> { let volume_dir = self.get_bucket_path(volume)?; if !skip_access_checks(volume) { utils::fs::access(&volume_dir) diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 8d737e92..380dc43a 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -250,7 +250,7 @@ impl DiskAPI for Disk { } } - async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> { + async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> { match self { Disk::Local(local_disk) => local_disk.delete_paths(volume, paths).await, Disk::Remote(remote_disk) => remote_disk.delete_paths(volume, paths).await, @@ -412,7 +412,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { versions: Vec, opts: DeleteOptions, ) -> Result>>; - async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()>; + async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()>; async fn write_metadata(&self, org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()>; async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()>; async fn read_version( diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index 175052cb..bd77480d 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -137,20 +137,12 @@ pub async fn reliable_rename( base_dir: impl AsRef, ) -> io::Result<()> { if let Some(parent) = dst_file_path.as_ref().parent() { - reliable_mkdir_all(parent, base_dir.as_ref()).await?; - } - // need remove dst path - if let Err(err) = utils::fs::remove_all(dst_file_path.as_ref()).await { - if err.kind() != io::ErrorKind::NotFound { - info!( - "reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", - src_file_path.as_ref(), - dst_file_path.as_ref(), - base_dir.as_ref(), - err - ); + if !file_exists(parent).await { + info!("reliable_rename reliable_mkdir_all parent: {:?}", parent); + reliable_mkdir_all(parent, base_dir.as_ref()).await?; } } + let mut i = 0; loop { if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await { @@ -158,13 +150,13 @@ pub async fn reliable_rename( i += 1; continue; } - info!( - "reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", - src_file_path.as_ref(), - dst_file_path.as_ref(), - base_dir.as_ref(), - e - ); + // info!( + // "reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}", + // src_file_path.as_ref(), + // dst_file_path.as_ref(), + // base_dir.as_ref(), + // e + // ); return Err(e); } @@ -229,3 +221,7 @@ pub async fn os_mkdir_all(dir_path: impl AsRef, base_dir: impl AsRef Ok(()) } + +pub async fn file_exists(path: impl AsRef) -> bool { + fs::metadata(path.as_ref()).await.map(|_| true).unwrap_or(false) +} diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 43f02832..7211e34f 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -565,9 +565,9 @@ impl DiskAPI for RemoteDisk { Ok(volume_info) } - async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> { + async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> { info!("delete_paths"); - let paths = paths.iter().map(|s| s.to_string()).collect::>(); + let paths = paths.to_owned(); let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 157881b9..c163bd36 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -453,7 +453,7 @@ impl SetDisks { Ok(()) } - async fn cleanup_multipart_path(disks: &[Option], paths: &[&str]) { + async fn cleanup_multipart_path(disks: &[Option], paths: &[String]) { let mut futures = Vec::with_capacity(disks.len()); let mut errs = Vec::with_capacity(disks.len()); @@ -479,6 +479,10 @@ impl SetDisks { } } } + + if errs.iter().any(|e| e.is_some()) { + warn!("cleanup_multipart_path errs {:?}", &errs); + } } async fn rename_part( disks: &[Option], @@ -518,7 +522,7 @@ impl SetDisks { if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) { warn!("rename_part errs {:?}", &errs); - Self::cleanup_multipart_path(disks, vec![dst_object, format!("{}.meta", dst_object).as_str()].as_slice()).await; + Self::cleanup_multipart_path(disks, &[dst_object.to_owned(), format!("{}.meta", dst_object)]).await; return Err(err); } @@ -1490,92 +1494,92 @@ impl SetDisks { // (ress, errs) // } - async fn remove_object_part( - &self, - bucket: &str, - object: &str, - upload_id: &str, - data_dir: &str, - part_num: usize, - ) -> Result<()> { - let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id); - let disks = self.disks.read().await; + // async fn remove_object_part( + // &self, + // bucket: &str, + // object: &str, + // upload_id: &str, + // data_dir: &str, + // part_num: usize, + // ) -> Result<()> { + // let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id); + // let disks = self.disks.read().await; - let disks = disks.clone(); + // let disks = disks.clone(); - let file_path = format!("{}/{}/part.{}", upload_id_path, data_dir, part_num); + // let file_path = format!("{}/{}/part.{}", upload_id_path, data_dir, part_num); - let mut futures = Vec::with_capacity(disks.len()); - let mut errors = Vec::with_capacity(disks.len()); + // let mut futures = Vec::with_capacity(disks.len()); + // let mut errors = Vec::with_capacity(disks.len()); - for disk in disks.iter() { - let file_path = file_path.clone(); - let meta_file_path = format!("{}.meta", file_path); + // for disk in disks.iter() { + // let file_path = file_path.clone(); + // let meta_file_path = format!("{}.meta", file_path); - futures.push(async move { - if let Some(disk) = disk { - disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) - .await?; - disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default()) - .await - } else { - Err(Error::new(DiskError::DiskNotFound)) - } - }); - } + // futures.push(async move { + // if let Some(disk) = disk { + // disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) + // .await?; + // disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default()) + // .await + // } else { + // Err(Error::new(DiskError::DiskNotFound)) + // } + // }); + // } - let results = join_all(futures).await; - for result in results { - match result { - Ok(_) => { - errors.push(None); - } - Err(e) => { - errors.push(Some(e)); - } - } - } + // let results = join_all(futures).await; + // for result in results { + // match result { + // Ok(_) => { + // errors.push(None); + // } + // Err(e) => { + // errors.push(Some(e)); + // } + // } + // } - Ok(()) - } - async fn remove_part_meta(&self, bucket: &str, object: &str, upload_id: &str, data_dir: &str, part_num: usize) -> Result<()> { - let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id); - let disks = self.disks.read().await; + // Ok(()) + // } + // async fn remove_part_meta(&self, bucket: &str, object: &str, upload_id: &str, data_dir: &str, part_num: usize) -> Result<()> { + // let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id); + // let disks = self.disks.read().await; - let disks = disks.clone(); - // let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution); + // let disks = disks.clone(); + // // let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution); - let file_path = format!("{}/{}/part.{}.meta", upload_id_path, data_dir, part_num); + // let file_path = format!("{}/{}/part.{}.meta", upload_id_path, data_dir, part_num); - let mut futures = Vec::with_capacity(disks.len()); - let mut errors = Vec::with_capacity(disks.len()); + // let mut futures = Vec::with_capacity(disks.len()); + // let mut errors = Vec::with_capacity(disks.len()); - for disk in disks.iter() { - let file_path = file_path.clone(); - futures.push(async move { - if let Some(disk) = disk { - disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) - .await - } else { - Err(Error::new(DiskError::DiskNotFound)) - } - }); - } + // for disk in disks.iter() { + // let file_path = file_path.clone(); + // futures.push(async move { + // if let Some(disk) = disk { + // disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default()) + // .await + // } else { + // Err(Error::new(DiskError::DiskNotFound)) + // } + // }); + // } - let results = join_all(futures).await; - for result in results { - match result { - Ok(_) => { - errors.push(None); - } - Err(e) => { - errors.push(Some(e)); - } - } - } + // let results = join_all(futures).await; + // for result in results { + // match result { + // Ok(_) => { + // errors.push(None); + // } + // Err(e) => { + // errors.push(Some(e)); + // } + // } + // } - Ok(()) - } + // Ok(()) + // } // #[tracing::instrument(skip(self))] pub async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> { @@ -4847,29 +4851,49 @@ impl StorageAPI for SetDisks { } } + let mut parts = Vec::with_capacity(curr_fi.parts.len()); // TODO: 优化 cleanupMultipartPath for p in curr_fi.parts.iter() { - let _ = self - .remove_part_meta( - bucket, - object, - upload_id, - curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), - p.number, - ) - .await; + parts.push(path_join_buf(&[ + &upload_id_path, + curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), + format!("part.{}.meta", p.number).as_str(), + ])); if !fi.parts.iter().any(|v| v.number == p.number) { - let _ = self - .remove_object_part( - bucket, - object, - upload_id, - curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), - p.number, - ) - .await; + parts.push(path_join_buf(&[ + &upload_id_path, + curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), + format!("part.{}", p.number).as_str(), + ])); } + + // let _ = self + // .remove_part_meta( + // bucket, + // object, + // upload_id, + // curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), + // p.number, + // ) + // .await; + + // if !fi.parts.iter().any(|v| v.number == p.number) { + // let _ = self + // .remove_object_part( + // bucket, + // object, + // upload_id, + // curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(), + // p.number, + // ) + // .await; + // } + } + + { + let disks = self.get_disks_internal().await; + Self::cleanup_multipart_path(&disks, &parts).await; } let (online_disks, versions, op_old_dir) = Self::rename_data( diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 1aeb64b2..7c944ee1 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -1073,8 +1073,7 @@ impl Node for NodeService { async fn delete_paths(&self, request: Request) -> Result, Status> { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { - let paths = request.paths.iter().map(|s| s.as_str()).collect::>(); - match disk.delete_paths(&request.volume, &paths).await { + match disk.delete_paths(&request.volume, &request.paths).await { Ok(_) => Ok(tonic::Response::new(DeletePathsResponse { success: true, error: None, diff --git a/rustfs/src/storage/access.rs b/rustfs/src/storage/access.rs index 06844c64..81a744bd 100644 --- a/rustfs/src/storage/access.rs +++ b/rustfs/src/storage/access.rs @@ -7,7 +7,6 @@ use iam::sys::Args; use s3s::access::{S3Access, S3AccessContext}; use s3s::{dto::*, s3_error, S3Error, S3ErrorCode, S3Request, S3Result}; use std::collections::HashMap; -use tracing::info; #[allow(dead_code)] #[derive(Default, Clone)] @@ -71,16 +70,16 @@ impl S3Access for FS { // /// + [`cx.extensions_mut()`](S3AccessContext::extensions_mut) async fn check(&self, cx: &mut S3AccessContext<'_>) -> S3Result<()> { // 上层验证了 ak/sk - info!( - "s3 check uri: {:?}, method: {:?} path: {:?}, s3_op: {:?}, cred: {:?}, headers:{:?}", - cx.uri(), - cx.method(), - cx.s3_path(), - cx.s3_op().name(), - cx.credentials(), - cx.headers(), - // cx.extensions_mut(), - ); + // info!( + // "s3 check uri: {:?}, method: {:?} path: {:?}, s3_op: {:?}, cred: {:?}, headers:{:?}", + // cx.uri(), + // cx.method(), + // cx.s3_path(), + // cx.s3_op().name(), + // cx.credentials(), + // cx.headers(), + // // cx.extensions_mut(), + // ); let Some(input_cred) = cx.credentials() else { return Err(s3_error!(UnauthorizedAccess, "Signature is required"));