test complete_multipart_upload

This commit is contained in:
weisd
2024-07-12 16:45:31 +08:00
parent a487dcdfa5
commit e8685c977f
6 changed files with 170 additions and 16 deletions

View File

@@ -11,11 +11,11 @@ use path_absolutize::Absolutize;
use time::OffsetDateTime;
use tokio::fs::{self, File};
use tokio::io::ErrorKind;
use tracing::debug;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::{
disk_api::{DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, VolumeInfo},
disk_api::{DeleteOptions, DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, VolumeInfo},
endpoint::{Endpoint, Endpoints},
file_meta::FileMeta,
format::FormatV3,
@@ -196,7 +196,7 @@ impl LocalDisk {
Ok(())
}
pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf) -> Result<()> {
pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf, recursive: bool, _immediate: bool) -> Result<()> {
if is_root_path(base_path) || is_root_path(delete_path) {
return Ok(());
}
@@ -205,12 +205,26 @@ impl LocalDisk {
return Ok(());
}
if recursive {
let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
fs::create_dir_all(&trash_path).await?;
fs::rename(&delete_path, &trash_path).await?;
// TODO: immediate
return Ok(());
}
if delete_path.is_dir() {
fs::remove_dir(delete_path).await?;
} else {
fs::remove_file(delete_path).await?;
}
if let Some(dir_path) = delete_path.parent() {
Box::pin(self.delete_file(base_path, &PathBuf::from(dir_path), false, false)).await?;
}
Ok(())
}
@@ -363,6 +377,31 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
let vol_path = self.get_bucket_path(&volume)?;
if !skip_access_checks(&volume) {
check_volume_exists(&vol_path).await?;
}
let fpath = self.get_object_path(&volume, &path)?;
self.delete_file(&vol_path, &fpath, opt.recursive, opt.immediate).await?;
// if opt.recursive {
// let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
// fs::create_dir_all(&trash_path).await?;
// fs::rename(&fpath, &trash_path).await?;
// // TODO: immediate
// return Ok(());
// }
// fs::remove_file(fpath).await?;
Ok(())
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
if !skip_access_checks(&src_volume) {
let vol_path = self.get_bucket_path(&src_volume)?;
@@ -427,8 +466,6 @@ impl DiskAPI for LocalDisk {
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
let p = self.get_object_path(&volume, &path)?;
debug!("append_file start {} {:?}", self.id(), &p);
if let Some(dir_path) = p.parent() {
fs::create_dir_all(&dir_path).await?;
}
@@ -524,7 +561,7 @@ impl DiskAPI for LocalDisk {
if src_volume != RUSTFS_META_MULTIPART_BUCKET {
fs::remove_dir(&src_file_path.parent().unwrap()).await?;
} else {
self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap()))
self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap()), true, false)
.await?;
}
@@ -600,7 +637,7 @@ impl DiskAPI for LocalDisk {
_org_volume: &str,
volume: &str,
path: &str,
version_id: Uuid,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
let file_path = self.get_object_path(volume, path)?;
@@ -628,10 +665,16 @@ impl DiskAPI for LocalDisk {
let mut found = 0;
for v in req.files.iter() {
let fpath = self.get_object_path(&req.bucket, format!("{}/{}", req.prefix, v).as_str())?;
let mut res = ReadMultipleResp::default();
let fpath = self.get_object_path(&req.bucket, format!("{}/{}", &req.prefix, v).as_str())?;
let mut res = ReadMultipleResp {
bucket: req.bucket.clone(),
prefix: req.prefix.clone(),
file: v.clone(),
..Default::default()
};
// if req.metadata_only {}
match read_file_all(fpath).await {
match read_file_all(&fpath).await {
Ok((data, meta)) => {
found += 1;

View File

@@ -13,6 +13,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
fn id(&self) -> Uuid;
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()>;
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>;
@@ -37,13 +38,20 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
org_volume: &str,
volume: &str,
path: &str,
version_id: Uuid,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo>;
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo>;
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
#[derive(Debug, Clone, Default)]
pub struct DeleteOptions {
pub recursive: bool,
pub immediate: bool,
}
#[derive(Debug, Clone)]
pub struct ReadMultipleReq {
pub bucket: String,
pub prefix: String,
@@ -54,6 +62,7 @@ pub struct ReadMultipleReq {
pub max_results: usize,
}
#[derive(Debug, Clone)]
pub struct ReadMultipleResp {
pub bucket: String,
pub prefix: String,

View File

@@ -131,8 +131,16 @@ impl StorageAPI for ECStore {
let reader = PutObjReader::new(StreamingBlob::from(body), content_len);
self.put_object(RUSTFS_META_BUCKET, &file_path, reader, &ObjectOptions { max_parity: true })
.await?;
self.put_object(
RUSTFS_META_BUCKET,
&file_path,
reader,
&ObjectOptions {
max_parity: true,
..Default::default()
},
)
.await?;
// TODO: toObjectErr

View File

@@ -23,6 +23,7 @@ pub struct FileInfo {
pub data: Option<Vec<u8>>,
pub fresh: bool, // indicates this is a first time call to write FileInfo.
pub parts: Vec<ObjectPartInfo>,
pub is_latest: bool,
}
impl FileInfo {
@@ -50,6 +51,46 @@ impl FileInfo {
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: FileInfo = rmp_serde::from_slice(&buf)?;
Ok(t)
}
pub fn add_object_part(&mut self, num: usize, part_size: usize, mod_time: OffsetDateTime) {
let part = ObjectPartInfo {
number: num,
size: part_size,
mod_time,
};
for p in self.parts.iter_mut() {
if p.number == num {
*p = part;
return;
}
}
self.parts.push(part);
self.parts.sort_by(|a, b| a.number.cmp(&b.number));
}
pub fn into_object_info(&self, bucket: &str, object: &str, versioned: bool) -> ObjectInfo {
ObjectInfo {
bucket: bucket.to_string(),
name: object.to_string(),
is_dir: object.starts_with("/"),
parity_blocks: self.erasure.parity_blocks,
data_blocks: self.erasure.data_blocks,
version_id: self.version_id,
deleted: self.deleted,
mod_time: self.mod_time,
size: self.size,
parts: self.parts.clone(),
is_latest: self.is_latest,
}
}
}
impl Default for FileInfo {
@@ -66,6 +107,7 @@ impl Default for FileInfo {
name: Default::default(),
volume: Default::default(),
parts: Default::default(),
is_latest: Default::default(),
}
}
}
@@ -194,10 +236,20 @@ impl PutObjReader {
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files
pub max_parity: bool,
pub mod_time: OffsetDateTime,
}
impl Default for ObjectOptions {
fn default() -> Self {
Self {
max_parity: Default::default(),
mod_time: OffsetDateTime::UNIX_EPOCH,
}
}
}
pub struct BucketOptions {}
@@ -218,9 +270,31 @@ pub struct PartInfo {
pub size: usize,
}
pub struct CompletePart {}
pub struct CompletePart {
pub part_num: usize,
}
pub struct ObjectInfo {}
impl From<s3s::dto::CompletedPart> for CompletePart {
fn from(value: s3s::dto::CompletedPart) -> Self {
Self {
part_num: value.part_number.unwrap_or_default() as usize,
}
}
}
pub struct ObjectInfo {
pub bucket: String,
pub name: String,
pub is_dir: bool,
pub parity_blocks: usize,
pub data_blocks: usize,
pub version_id: Uuid,
pub deleted: bool,
pub mod_time: OffsetDateTime,
pub size: usize,
pub parts: Vec<ObjectPartInfo>,
pub is_latest: bool,
}
#[async_trait::async_trait]
pub trait StorageAPI {

View File

@@ -275,4 +275,7 @@ pub enum ErasureError {
#[error("first disk wiat")]
FirstDiskWait,
#[error("invalid part id {0}")]
InvalidPart(usize),
}

View File

@@ -2,6 +2,7 @@ use std::fmt::Debug;
use ecstore::disk_api::DiskError;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
use ecstore::store_api::MakeBucketOptions;
use ecstore::store_api::MultipartUploadResult;
use ecstore::store_api::ObjectOptions;
@@ -311,6 +312,22 @@ impl S3 for FS {
// mc cp step 5
let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) };
let opts = &ObjectOptions::default();
let mut uploaded_parts = Vec::new();
for part in multipart_upload.parts.into_iter().flatten() {
uploaded_parts.push(CompletePart::from(part));
}
try_!(
self.store
.complete_multipart_upload(&bucket, &key, &upload_id, uploaded_parts, opts)
.await
);
let output = CompleteMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),