fix move_to_trash

This commit is contained in:
weisd
2025-03-20 00:23:10 +08:00
parent ff4769ca1e
commit a12d48595e
9 changed files with 193 additions and 162 deletions

View File

@@ -63,5 +63,6 @@
## 性能优化
- [ ] bitrot impl AsyncRead/AsyncWrite
- [ ] erasure 并发读写
- [ ] 完善删除逻辑, 并发处理,先移动到回收站,空间不足时清空回收站
- [x] 完善删除逻辑, 并发处理,先移动到回收站,
- [ ] 空间不足时清空回收站
- [ ] list_object 使用reader传输

View File

@@ -565,3 +565,13 @@ pub fn is_err_os_not_exist(err: &Error) -> bool {
false
}
}
pub fn is_err_os_disk_full(err: &Error) -> bool {
if let Some(os_err) = err.downcast_ref::<io::Error>() {
is_sys_err_no_space(os_err)
} else if let Some(e) = err.downcast_ref::<DiskError>() {
e == &DiskError::DiskFull
} else {
false
}
}

View File

@@ -1,6 +1,6 @@
use super::error::{
is_err_file_not_found, is_err_file_version_not_found, is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files,
os_is_not_exist, os_is_permission,
is_err_file_not_found, is_err_file_version_not_found, is_err_os_disk_full, is_sys_err_io, is_sys_err_not_empty,
is_sys_err_too_many_files, os_is_not_exist, os_is_permission,
};
use super::os::{is_root_disk, rename_all};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
@@ -35,11 +35,11 @@ use crate::set_disk::{
CHECK_PART_VOLUME_NOT_FOUND,
};
use crate::store_api::{BitrotAlgorithm, StorageAPI};
use crate::utils::fs::{access, lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY};
use crate::utils::fs::{access, lstat, remove, remove_all, rename, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY};
use crate::utils::os::get_info;
use crate::utils::path::{
self, clean, decode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH,
SLASH_SEPARATOR,
self, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, GLOBAL_DIR_SUFFIX,
GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR,
};
use crate::{
file_meta::FileMeta,
@@ -308,44 +308,46 @@ impl LocalDisk {
// })
// }
pub async fn move_to_trash(&self, delete_path: &PathBuf, _recursive: bool, _immediate_purge: bool) -> Result<()> {
pub async fn move_to_trash(&self, delete_path: &PathBuf, recursive: bool, immediate_purge: bool) -> Result<()> {
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
if let Some(parent) = trash_path.parent() {
if !parent.exists() {
fs::create_dir_all(parent).await?;
}
}
// debug!("move_to_trash from:{:?} to {:?}", &delete_path, &trash_path);
// TODO: 清空回收站
if let Err(err) = fs::rename(&delete_path, &trash_path).await {
match err.kind() {
ErrorKind::NotFound => (),
_ => {
warn!("delete_file rename {:?} err {:?}", &delete_path, &err);
return Err(Error::from(err));
}
}
let err = if recursive {
rename_all(delete_path, trash_path, self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?)
.await
.err()
} else {
rename(&delete_path, &trash_path).await.map_err(Error::new).err()
};
if immediate_purge || delete_path.to_string_lossy().ends_with(path::SLASH_SEPARATOR) {
warn!("move_to_trash immediate_purge {:?}", &delete_path.to_string_lossy());
let trash_path2 = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
let _ = rename_all(
encode_dir_object(delete_path.to_string_lossy().as_ref()),
trash_path2,
self.get_bucket_path(super::RUSTFS_META_TMP_DELETED_BUCKET)?,
)
.await;
}
// TODO: 优化 FIXME: 先清空回收站吧,有时间再添加判断逻辑
if let Err(err) = {
if trash_path.is_dir() {
fs::remove_dir_all(&trash_path).await
} else {
fs::remove_file(&trash_path).await
}
} {
match err.kind() {
ErrorKind::NotFound => (),
_ => {
warn!("delete_file remove trash {:?} err {:?}", &trash_path, &err);
return Err(Error::from(err));
if let Some(err) = err {
if is_err_os_disk_full(&err) {
if recursive {
remove_all(delete_path).await?;
} else {
remove(delete_path).await?;
}
}
return Ok(());
}
// TODO: immediate
// TODO: 异步通知 检测硬盘空间 清空回收站
Ok(())
}
@@ -1971,7 +1973,7 @@ impl DiskAPI for LocalDisk {
created: modtime,
})
}
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> {
async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> {
let volume_dir = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
utils::fs::access(&volume_dir)

View File

@@ -250,7 +250,7 @@ impl DiskAPI for Disk {
}
}
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> {
async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.delete_paths(volume, paths).await,
Disk::Remote(remote_disk) => remote_disk.delete_paths(volume, paths).await,
@@ -412,7 +412,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
versions: Vec<FileInfoVersions>,
opts: DeleteOptions,
) -> Result<Vec<Option<Error>>>;
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()>;
async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()>;
async fn write_metadata(&self, org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()>;
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()>;
async fn read_version(

View File

@@ -137,20 +137,12 @@ pub async fn reliable_rename(
base_dir: impl AsRef<Path>,
) -> io::Result<()> {
if let Some(parent) = dst_file_path.as_ref().parent() {
reliable_mkdir_all(parent, base_dir.as_ref()).await?;
}
// need remove dst path
if let Err(err) = utils::fs::remove_all(dst_file_path.as_ref()).await {
if err.kind() != io::ErrorKind::NotFound {
info!(
"reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
err
);
if !file_exists(parent).await {
info!("reliable_rename reliable_mkdir_all parent: {:?}", parent);
reliable_mkdir_all(parent, base_dir.as_ref()).await?;
}
}
let mut i = 0;
loop {
if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await {
@@ -158,13 +150,13 @@ pub async fn reliable_rename(
i += 1;
continue;
}
info!(
"reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
e
);
// info!(
// "reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
// src_file_path.as_ref(),
// dst_file_path.as_ref(),
// base_dir.as_ref(),
// e
// );
return Err(e);
}
@@ -229,3 +221,7 @@ pub async fn os_mkdir_all(dir_path: impl AsRef<Path>, base_dir: impl AsRef<Path>
Ok(())
}
pub async fn file_exists(path: impl AsRef<Path>) -> bool {
fs::metadata(path.as_ref()).await.map(|_| true).unwrap_or(false)
}

View File

@@ -565,9 +565,9 @@ impl DiskAPI for RemoteDisk {
Ok(volume_info)
}
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> {
async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> {
info!("delete_paths");
let paths = paths.iter().map(|s| s.to_string()).collect::<Vec<String>>();
let paths = paths.to_owned();
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;

View File

@@ -453,7 +453,7 @@ impl SetDisks {
Ok(())
}
async fn cleanup_multipart_path(disks: &[Option<DiskStore>], paths: &[&str]) {
async fn cleanup_multipart_path(disks: &[Option<DiskStore>], paths: &[String]) {
let mut futures = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
@@ -479,6 +479,10 @@ impl SetDisks {
}
}
}
if errs.iter().any(|e| e.is_some()) {
warn!("cleanup_multipart_path errs {:?}", &errs);
}
}
async fn rename_part(
disks: &[Option<DiskStore>],
@@ -518,7 +522,7 @@ impl SetDisks {
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
warn!("rename_part errs {:?}", &errs);
Self::cleanup_multipart_path(disks, vec![dst_object, format!("{}.meta", dst_object).as_str()].as_slice()).await;
Self::cleanup_multipart_path(disks, &[dst_object.to_owned(), format!("{}.meta", dst_object)]).await;
return Err(err);
}
@@ -1490,92 +1494,92 @@ impl SetDisks {
// (ress, errs)
// }
async fn remove_object_part(
&self,
bucket: &str,
object: &str,
upload_id: &str,
data_dir: &str,
part_num: usize,
) -> Result<()> {
let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id);
let disks = self.disks.read().await;
// async fn remove_object_part(
// &self,
// bucket: &str,
// object: &str,
// upload_id: &str,
// data_dir: &str,
// part_num: usize,
// ) -> Result<()> {
// let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id);
// let disks = self.disks.read().await;
let disks = disks.clone();
// let disks = disks.clone();
let file_path = format!("{}/{}/part.{}", upload_id_path, data_dir, part_num);
// let file_path = format!("{}/{}/part.{}", upload_id_path, data_dir, part_num);
let mut futures = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
// let mut futures = Vec::with_capacity(disks.len());
// let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let file_path = file_path.clone();
let meta_file_path = format!("{}.meta", file_path);
// for disk in disks.iter() {
// let file_path = file_path.clone();
// let meta_file_path = format!("{}.meta", file_path);
futures.push(async move {
if let Some(disk) = disk {
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
.await?;
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default())
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
// futures.push(async move {
// if let Some(disk) = disk {
// disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
// .await?;
// disk.delete(RUSTFS_META_MULTIPART_BUCKET, &meta_file_path, DeleteOptions::default())
// .await
// } else {
// Err(Error::new(DiskError::DiskNotFound))
// }
// });
// }
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
// let results = join_all(futures).await;
// for result in results {
// match result {
// Ok(_) => {
// errors.push(None);
// }
// Err(e) => {
// errors.push(Some(e));
// }
// }
// }
Ok(())
}
async fn remove_part_meta(&self, bucket: &str, object: &str, upload_id: &str, data_dir: &str, part_num: usize) -> Result<()> {
let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id);
let disks = self.disks.read().await;
// Ok(())
// }
// async fn remove_part_meta(&self, bucket: &str, object: &str, upload_id: &str, data_dir: &str, part_num: usize) -> Result<()> {
// let upload_id_path = Self::get_upload_id_dir(bucket, object, upload_id);
// let disks = self.disks.read().await;
let disks = disks.clone();
// let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
// let disks = disks.clone();
// // let disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
let file_path = format!("{}/{}/part.{}.meta", upload_id_path, data_dir, part_num);
// let file_path = format!("{}/{}/part.{}.meta", upload_id_path, data_dir, part_num);
let mut futures = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
// let mut futures = Vec::with_capacity(disks.len());
// let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let file_path = file_path.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
.await
} else {
Err(Error::new(DiskError::DiskNotFound))
}
});
}
// for disk in disks.iter() {
// let file_path = file_path.clone();
// futures.push(async move {
// if let Some(disk) = disk {
// disk.delete(RUSTFS_META_MULTIPART_BUCKET, &file_path, DeleteOptions::default())
// .await
// } else {
// Err(Error::new(DiskError::DiskNotFound))
// }
// });
// }
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
// let results = join_all(futures).await;
// for result in results {
// match result {
// Ok(_) => {
// errors.push(None);
// }
// Err(e) => {
// errors.push(Some(e));
// }
// }
// }
Ok(())
}
// Ok(())
// }
// #[tracing::instrument(skip(self))]
pub async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> {
@@ -4847,29 +4851,49 @@ impl StorageAPI for SetDisks {
}
}
let mut parts = Vec::with_capacity(curr_fi.parts.len());
// TODO: 优化 cleanupMultipartPath
for p in curr_fi.parts.iter() {
let _ = self
.remove_part_meta(
bucket,
object,
upload_id,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
p.number,
)
.await;
parts.push(path_join_buf(&[
&upload_id_path,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
format!("part.{}.meta", p.number).as_str(),
]));
if !fi.parts.iter().any(|v| v.number == p.number) {
let _ = self
.remove_object_part(
bucket,
object,
upload_id,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
p.number,
)
.await;
parts.push(path_join_buf(&[
&upload_id_path,
curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
format!("part.{}", p.number).as_str(),
]));
}
// let _ = self
// .remove_part_meta(
// bucket,
// object,
// upload_id,
// curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
// p.number,
// )
// .await;
// if !fi.parts.iter().any(|v| v.number == p.number) {
// let _ = self
// .remove_object_part(
// bucket,
// object,
// upload_id,
// curr_fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str(),
// p.number,
// )
// .await;
// }
}
{
let disks = self.get_disks_internal().await;
Self::cleanup_multipart_path(&disks, &parts).await;
}
let (online_disks, versions, op_old_dir) = Self::rename_data(

View File

@@ -1073,8 +1073,7 @@ impl Node for NodeService {
async fn delete_paths(&self, request: Request<DeletePathsRequest>) -> Result<Response<DeletePathsResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let paths = request.paths.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
match disk.delete_paths(&request.volume, &paths).await {
match disk.delete_paths(&request.volume, &request.paths).await {
Ok(_) => Ok(tonic::Response::new(DeletePathsResponse {
success: true,
error: None,

View File

@@ -7,7 +7,6 @@ use iam::sys::Args;
use s3s::access::{S3Access, S3AccessContext};
use s3s::{dto::*, s3_error, S3Error, S3ErrorCode, S3Request, S3Result};
use std::collections::HashMap;
use tracing::info;
#[allow(dead_code)]
#[derive(Default, Clone)]
@@ -71,16 +70,16 @@ impl S3Access for FS {
// /// + [`cx.extensions_mut()`](S3AccessContext::extensions_mut)
async fn check(&self, cx: &mut S3AccessContext<'_>) -> S3Result<()> {
// 上层验证了 ak/sk
info!(
"s3 check uri: {:?}, method: {:?} path: {:?}, s3_op: {:?}, cred: {:?}, headers:{:?}",
cx.uri(),
cx.method(),
cx.s3_path(),
cx.s3_op().name(),
cx.credentials(),
cx.headers(),
// cx.extensions_mut(),
);
// info!(
// "s3 check uri: {:?}, method: {:?} path: {:?}, s3_op: {:?}, cred: {:?}, headers:{:?}",
// cx.uri(),
// cx.method(),
// cx.s3_path(),
// cx.s3_op().name(),
// cx.credentials(),
// cx.headers(),
// // cx.extensions_mut(),
// );
let Some(input_cred) = cx.credentials() else {
return Err(s3_error!(UnauthorizedAccess, "Signature is required"));