From e8685c977fd3ce2697a1d421dacbbe25cbd09af0 Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 12 Jul 2024 16:45:31 +0800 Subject: [PATCH] test complete_multipart_upload --- ecstore/src/disk.rs | 63 +++++++++++++++++++++++++----- ecstore/src/disk_api.rs | 11 +++++- ecstore/src/store.rs | 12 +++++- ecstore/src/store_api.rs | 80 ++++++++++++++++++++++++++++++++++++-- ecstore/src/store_init.rs | 3 ++ rustfs/src/storage/ecfs.rs | 17 ++++++++ 6 files changed, 170 insertions(+), 16 deletions(-) diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index dced1d28..e83fe70d 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -11,11 +11,11 @@ use path_absolutize::Absolutize; use time::OffsetDateTime; use tokio::fs::{self, File}; use tokio::io::ErrorKind; -use tracing::debug; +use tracing::{debug, warn}; use uuid::Uuid; use crate::{ - disk_api::{DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, VolumeInfo}, + disk_api::{DeleteOptions, DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, VolumeInfo}, endpoint::{Endpoint, Endpoints}, file_meta::FileMeta, format::FormatV3, @@ -196,7 +196,7 @@ impl LocalDisk { Ok(()) } - pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf) -> Result<()> { + pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf, recursive: bool, _immediate: bool) -> Result<()> { if is_root_path(base_path) || is_root_path(delete_path) { return Ok(()); } @@ -205,12 +205,26 @@ impl LocalDisk { return Ok(()); } + if recursive { + let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; + fs::create_dir_all(&trash_path).await?; + fs::rename(&delete_path, &trash_path).await?; + + // TODO: immediate + + return Ok(()); + } + if delete_path.is_dir() { fs::remove_dir(delete_path).await?; } else { fs::remove_file(delete_path).await?; } + if let Some(dir_path) = delete_path.parent() { + Box::pin(self.delete_file(base_path, &PathBuf::from(dir_path), false, false)).await?; + } + Ok(()) } @@ -363,6 +377,31 @@ impl DiskAPI for LocalDisk { Ok(()) } + async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { + let vol_path = self.get_bucket_path(&volume)?; + if !skip_access_checks(&volume) { + check_volume_exists(&vol_path).await?; + } + + let fpath = self.get_object_path(&volume, &path)?; + + self.delete_file(&vol_path, &fpath, opt.recursive, opt.immediate).await?; + + // if opt.recursive { + // let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; + // fs::create_dir_all(&trash_path).await?; + // fs::rename(&fpath, &trash_path).await?; + + // // TODO: immediate + + // return Ok(()); + // } + + // fs::remove_file(fpath).await?; + + Ok(()) + } + async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { if !skip_access_checks(&src_volume) { let vol_path = self.get_bucket_path(&src_volume)?; @@ -427,8 +466,6 @@ impl DiskAPI for LocalDisk { async fn append_file(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(&volume, &path)?; - debug!("append_file start {} {:?}", self.id(), &p); - if let Some(dir_path) = p.parent() { fs::create_dir_all(&dir_path).await?; } @@ -524,7 +561,7 @@ impl DiskAPI for LocalDisk { if src_volume != RUSTFS_META_MULTIPART_BUCKET { fs::remove_dir(&src_file_path.parent().unwrap()).await?; } else { - self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap())) + self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap()), true, false) .await?; } @@ -600,7 +637,7 @@ impl DiskAPI for LocalDisk { _org_volume: &str, volume: &str, path: &str, - version_id: Uuid, + version_id: &str, opts: &ReadOptions, ) -> Result { let file_path = self.get_object_path(volume, path)?; @@ -628,10 +665,16 @@ impl DiskAPI for LocalDisk { let mut found = 0; for v in req.files.iter() { - let fpath = self.get_object_path(&req.bucket, format!("{}/{}", req.prefix, v).as_str())?; - let mut res = ReadMultipleResp::default(); + let fpath = self.get_object_path(&req.bucket, format!("{}/{}", &req.prefix, v).as_str())?; + let mut res = ReadMultipleResp { + bucket: req.bucket.clone(), + prefix: req.prefix.clone(), + file: v.clone(), + ..Default::default() + }; + // if req.metadata_only {} - match read_file_all(fpath).await { + match read_file_all(&fpath).await { Ok((data, meta)) => { found += 1; diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 14652fcd..7ea8ba34 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -13,6 +13,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { fn is_local(&self) -> bool; fn id(&self) -> Uuid; + async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>; async fn read_all(&self, volume: &str, path: &str) -> Result; async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()>; async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; @@ -37,13 +38,20 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { org_volume: &str, volume: &str, path: &str, - version_id: Uuid, + version_id: &str, opts: &ReadOptions, ) -> Result; async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result; async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; } +#[derive(Debug, Clone, Default)] +pub struct DeleteOptions { + pub recursive: bool, + pub immediate: bool, +} + +#[derive(Debug, Clone)] pub struct ReadMultipleReq { pub bucket: String, pub prefix: String, @@ -54,6 +62,7 @@ pub struct ReadMultipleReq { pub max_results: usize, } +#[derive(Debug, Clone)] pub struct ReadMultipleResp { pub bucket: String, pub prefix: String, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 50187782..96061bd6 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -131,8 +131,16 @@ impl StorageAPI for ECStore { let reader = PutObjReader::new(StreamingBlob::from(body), content_len); - self.put_object(RUSTFS_META_BUCKET, &file_path, reader, &ObjectOptions { max_parity: true }) - .await?; + self.put_object( + RUSTFS_META_BUCKET, + &file_path, + reader, + &ObjectOptions { + max_parity: true, + ..Default::default() + }, + ) + .await?; // TODO: toObjectErr diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index b4dd1fef..59eb2d05 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -23,6 +23,7 @@ pub struct FileInfo { pub data: Option>, pub fresh: bool, // indicates this is a first time call to write FileInfo. pub parts: Vec, + pub is_latest: bool, } impl FileInfo { @@ -50,6 +51,46 @@ impl FileInfo { Ok(buf) } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: FileInfo = rmp_serde::from_slice(&buf)?; + Ok(t) + } + + pub fn add_object_part(&mut self, num: usize, part_size: usize, mod_time: OffsetDateTime) { + let part = ObjectPartInfo { + number: num, + size: part_size, + mod_time, + }; + + for p in self.parts.iter_mut() { + if p.number == num { + *p = part; + return; + } + } + + self.parts.push(part); + + self.parts.sort_by(|a, b| a.number.cmp(&b.number)); + } + + pub fn into_object_info(&self, bucket: &str, object: &str, versioned: bool) -> ObjectInfo { + ObjectInfo { + bucket: bucket.to_string(), + name: object.to_string(), + is_dir: object.starts_with("/"), + parity_blocks: self.erasure.parity_blocks, + data_blocks: self.erasure.data_blocks, + version_id: self.version_id, + deleted: self.deleted, + mod_time: self.mod_time, + size: self.size, + parts: self.parts.clone(), + is_latest: self.is_latest, + } + } } impl Default for FileInfo { @@ -66,6 +107,7 @@ impl Default for FileInfo { name: Default::default(), volume: Default::default(), parts: Default::default(), + is_latest: Default::default(), } } } @@ -194,10 +236,20 @@ impl PutObjReader { } } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct ObjectOptions { // Use the maximum parity (N/2), used when saving server configuration files pub max_parity: bool, + pub mod_time: OffsetDateTime, +} + +impl Default for ObjectOptions { + fn default() -> Self { + Self { + max_parity: Default::default(), + mod_time: OffsetDateTime::UNIX_EPOCH, + } + } } pub struct BucketOptions {} @@ -218,9 +270,31 @@ pub struct PartInfo { pub size: usize, } -pub struct CompletePart {} +pub struct CompletePart { + pub part_num: usize, +} -pub struct ObjectInfo {} +impl From for CompletePart { + fn from(value: s3s::dto::CompletedPart) -> Self { + Self { + part_num: value.part_number.unwrap_or_default() as usize, + } + } +} + +pub struct ObjectInfo { + pub bucket: String, + pub name: String, + pub is_dir: bool, + pub parity_blocks: usize, + pub data_blocks: usize, + pub version_id: Uuid, + pub deleted: bool, + pub mod_time: OffsetDateTime, + pub size: usize, + pub parts: Vec, + pub is_latest: bool, +} #[async_trait::async_trait] pub trait StorageAPI { diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index dbf2e5ce..fdcf2565 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -275,4 +275,7 @@ pub enum ErasureError { #[error("first disk wiat")] FirstDiskWait, + + #[error("invalid part id {0}")] + InvalidPart(usize), } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 4da4858b..364e9ae5 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use ecstore::disk_api::DiskError; use ecstore::store_api::BucketOptions; +use ecstore::store_api::CompletePart; use ecstore::store_api::MakeBucketOptions; use ecstore::store_api::MultipartUploadResult; use ecstore::store_api::ObjectOptions; @@ -311,6 +312,22 @@ impl S3 for FS { // mc cp step 5 + let Some(multipart_upload) = multipart_upload else { return Err(s3_error!(InvalidPart)) }; + + let opts = &ObjectOptions::default(); + + let mut uploaded_parts = Vec::new(); + + for part in multipart_upload.parts.into_iter().flatten() { + uploaded_parts.push(CompletePart::from(part)); + } + + try_!( + self.store + .complete_multipart_upload(&bucket, &key, &upload_id, uploaded_parts, opts) + .await + ); + let output = CompleteMultipartUploadOutput { bucket: Some(bucket), key: Some(key),