From 4afd8872d06a89c8d7c6ff9b4b3043cfbc87de4d Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 11 Jul 2024 17:57:59 +0800 Subject: [PATCH] todo:readMultipleFiles --- ecstore/src/disk.rs | 116 ++++++++++++++++--------------------- ecstore/src/disk_api.rs | 35 +++++++++++ ecstore/src/erasure.rs | 3 +- ecstore/src/sets.rs | 16 ++++- ecstore/src/store.rs | 18 +++++- ecstore/src/store_api.rs | 54 +++++++++++++++-- ecstore/src/store_init.rs | 1 - rustfs/src/storage/ecfs.rs | 6 +- 8 files changed, 172 insertions(+), 77 deletions(-) diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 2b2966dd..dced1d28 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -15,7 +15,7 @@ use tracing::debug; use uuid::Uuid; use crate::{ - disk_api::{DiskAPI, DiskError, FileWriter, ReadOptions, VolumeInfo}, + disk_api::{DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, VolumeInfo}, endpoint::{Endpoint, Endpoints}, file_meta::FileMeta, format::FormatV3, @@ -623,73 +623,57 @@ impl DiskAPI for LocalDisk { Ok(RawFileInfo { buf }) } + async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { + let mut results = Vec::new(); + let mut found = 0; + + for v in req.files.iter() { + let fpath = self.get_object_path(&req.bucket, format!("{}/{}", req.prefix, v).as_str())?; + let mut res = ReadMultipleResp::default(); + // if req.metadata_only {} + match read_file_all(fpath).await { + Ok((data, meta)) => { + found += 1; + + if req.max_size > 0 && data.len() > req.max_size { + res.exists = true; + res.error = format!("max size ({}) exceeded: {}", req.max_size, data.len()); + results.push(res); + break; + } + + res.exists = true; + res.data = data; + res.mod_time = match meta.modified() { + Ok(md) => OffsetDateTime::from(md), + Err(_) => return Err(Error::msg("Not supported modified on this platform")), + }; + results.push(res); + + if req.max_results > 0 && found >= req.max_results { + break; + } + } + Err(e) => { + if !(DiskError::is_err(&e, &DiskError::FileNotFound) || DiskError::is_err(&e, &DiskError::VolumeNotFound)) { + res.exists = true; + res.error = e.to_string(); + } + + if req.abort404 && !res.exists { + results.push(res); + break; + } + + results.push(res); + } + } + } + + Ok(results) + } } -// pub async fn copy_bytes(mut stream: S, writer: &mut W) -> Result -// where -// S: Stream> + Unpin, -// W: AsyncWrite + Unpin, -// { -// let mut nwritten: u64 = 0; -// while let Some(result) = stream.next().await { -// let bytes = match result { -// Ok(x) => x, -// Err(e) => return Err(Error::new(e)), -// }; -// writer.write_all(&bytes).await?; -// nwritten += bytes.len() as u64; -// } -// writer.flush().await?; -// Ok(nwritten) -// } - -// pub struct RemoteDisk {} - -// impl RemoteDisk { -// pub fn new(_ep: &Endpoint, _health_check: bool) -> Result { -// Ok(Self {}) -// } -// } - -// pub(crate) struct FileWriter<'a> { -// tmp_path: PathBuf, -// dest_path: &'a Path, -// writer: BufWriter, -// clean_tmp: bool, -// } - -// impl<'a> FileWriter<'a> { -// pub(crate) fn tmp_path(&self) -> &Path { -// &self.tmp_path -// } - -// pub(crate) fn dest_path(&self) -> &'a Path { -// self.dest_path -// } - -// pub(crate) fn writer(&mut self) -> &mut BufWriter { -// &mut self.writer -// } - -// pub(crate) async fn done(mut self) -> Result<()> { -// if let Some(final_dir_path) = self.dest_path().parent() { -// fs::create_dir_all(&final_dir_path).await?; -// } - -// fs::rename(&self.tmp_path, self.dest_path()).await?; -// self.clean_tmp = false; -// Ok(()) -// } -// } - -// impl<'a> Drop for FileWriter<'a> { -// fn drop(&mut self) { -// if self.clean_tmp { -// let _ = std::fs::remove_file(&self.tmp_path); -// } -// } -// } - #[cfg(test)] mod test { diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index b3c0be12..14652fcd 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -41,6 +41,41 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { opts: &ReadOptions, ) -> Result; async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result; + async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; +} + +pub struct ReadMultipleReq { + pub bucket: String, + pub prefix: String, + pub files: Vec, + pub max_size: usize, + pub metadata_only: bool, + pub abort404: bool, + pub max_results: usize, +} + +pub struct ReadMultipleResp { + pub bucket: String, + pub prefix: String, + pub file: String, + pub exists: bool, + pub error: String, + pub data: Vec, + pub mod_time: OffsetDateTime, +} + +impl Default for ReadMultipleResp { + fn default() -> Self { + Self { + bucket: Default::default(), + prefix: Default::default(), + file: Default::default(), + exists: Default::default(), + error: Default::default(), + data: Default::default(), + mod_time: OffsetDateTime::UNIX_EPOCH, + } + } } pub struct VolumeInfo { diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 1d01c620..904b9c43 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,5 +1,4 @@ use anyhow::anyhow; -use anyhow::Error; use anyhow::Result; use bytes::Bytes; use futures::{Stream, StreamExt}; @@ -7,7 +6,7 @@ use reed_solomon_erasure::galois_8::ReedSolomon; use s3s::StdError; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tracing::debug; +// use tracing::debug; use uuid::Uuid; use crate::chunk_stream::ChunkedStream; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 8a64dab0..f718de5f 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -7,7 +7,8 @@ use crate::{ format::{DistributionAlgoVersion, FormatV3}, set_disk::SetDisks, store_api::{ - BucketInfo, BucketOptions, MakeBucketOptions, MultipartUploadResult, ObjectOptions, PartInfo, PutObjReader, StorageAPI, + BucketInfo, BucketOptions, CompletePart, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo, + PutObjReader, StorageAPI, }, utils::hash, }; @@ -149,4 +150,17 @@ impl StorageAPI for Sets { async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { self.get_disks_by_key(object).new_multipart_upload(bucket, object, opts).await } + + async fn complete_multipart_upload( + &self, + bucket: &str, + object: &str, + upload_id: &str, + uploaded_parts: Vec, + opts: &ObjectOptions, + ) -> Result { + self.get_disks_by_key(object) + .complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts) + .await + } } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index e7bf2622..50187782 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -14,7 +14,8 @@ use crate::{ peer::{PeerS3Client, S3PeerSys}, sets::Sets, store_api::{ - BucketInfo, BucketOptions, MakeBucketOptions, MultipartUploadResult, ObjectOptions, PartInfo, PutObjReader, StorageAPI, + BucketInfo, BucketOptions, CompletePart, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo, + PutObjReader, StorageAPI, }, store_init, utils, }; @@ -177,4 +178,19 @@ impl StorageAPI for ECStore { } unimplemented!() } + async fn complete_multipart_upload( + &self, + bucket: &str, + object: &str, + upload_id: &str, + uploaded_parts: Vec, + opts: &ObjectOptions, + ) -> Result { + if self.single_pool() { + return self.pools[0] + .complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts) + .await; + } + unimplemented!() + } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 57f595ed..b4dd1fef 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,12 +1,15 @@ use anyhow::Result; +use rmp_serde::Serializer; use s3s::dto::StreamingBlob; +use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; pub const BLOCK_SIZE_V2: usize = 1048576; // 1M -#[derive(Debug, Clone)] +// #[derive(Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct FileInfo { pub name: String, pub volume: String, @@ -19,6 +22,7 @@ pub struct FileInfo { pub size: usize, pub data: Option>, pub fresh: bool, // indicates this is a first time call to write FileInfo. + pub parts: Vec, } impl FileInfo { @@ -38,6 +42,14 @@ impl FileInfo { self.erasure.data_blocks } + + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut Serializer::new(&mut buf))?; + + Ok(buf) + } } impl Default for FileInfo { @@ -53,6 +65,7 @@ impl Default for FileInfo { fresh: Default::default(), name: Default::default(), volume: Default::default(), + parts: Default::default(), } } } @@ -100,11 +113,32 @@ impl FileInfo { } } +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct ObjectPartInfo { + // pub etag: Option, + pub number: usize, + pub size: usize, + // pub actual_size: usize, + pub mod_time: OffsetDateTime, + // pub index: Option>, + // pub checksums: Option>, +} + +impl Default for ObjectPartInfo { + fn default() -> Self { + Self { + number: Default::default(), + size: Default::default(), + mod_time: OffsetDateTime::UNIX_EPOCH, + } + } +} + pub struct RawFileInfo { pub buf: Vec, } -#[derive(Debug, Default, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)] // ErasureInfo holds erasure coding and bitrot related information. pub struct ErasureInfo { // Algorithm is the String representation of erasure-coding-algorithm @@ -123,7 +157,7 @@ pub struct ErasureInfo { pub checksums: Vec, } -#[derive(Debug, Default, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)] // ChecksumInfo - carries checksums of individual scattered parts per disk. pub struct ChecksumInfo { pub part_number: usize, @@ -131,7 +165,7 @@ pub struct ChecksumInfo { pub hash: Vec, } -#[derive(Debug, Default, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)] // BitrotAlgorithm specifies a algorithm used for bitrot protection. pub enum BitrotAlgorithm { // SHA256 represents the SHA-256 hash function @@ -184,6 +218,10 @@ pub struct PartInfo { pub size: usize, } +pub struct CompletePart {} + +pub struct ObjectInfo {} + #[async_trait::async_trait] pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; @@ -200,4 +238,12 @@ pub trait StorageAPI { opts: &ObjectOptions, ) -> Result; async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; + async fn complete_multipart_upload( + &self, + bucket: &str, + object: &str, + upload_id: &str, + uploaded_parts: Vec, + opts: &ObjectOptions, + ) -> Result; } diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index 3697ebdb..dbf2e5ce 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use futures::future::join_all; use uuid::Uuid; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index baf6afb2..4da4858b 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -302,13 +302,15 @@ impl S3 for FS { req: S3Request, ) -> S3Result> { let CompleteMultipartUploadInput { - // multipart_upload, + multipart_upload, bucket, key, - // upload_id, + upload_id, .. } = req.input; + // mc cp step 5 + let output = CompleteMultipartUploadOutput { bucket: Some(bucket), key: Some(key),