todo:readMultipleFiles

This commit is contained in:
weisd
2024-07-11 17:57:59 +08:00
parent d0d82adc85
commit 4afd8872d0
8 changed files with 172 additions and 77 deletions

View File

@@ -15,7 +15,7 @@ use tracing::debug;
use uuid::Uuid;
use crate::{
disk_api::{DiskAPI, DiskError, FileWriter, ReadOptions, VolumeInfo},
disk_api::{DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, VolumeInfo},
endpoint::{Endpoint, Endpoints},
file_meta::FileMeta,
format::FormatV3,
@@ -623,73 +623,57 @@ impl DiskAPI for LocalDisk {
Ok(RawFileInfo { buf })
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
let mut results = Vec::new();
let mut found = 0;
for v in req.files.iter() {
let fpath = self.get_object_path(&req.bucket, format!("{}/{}", req.prefix, v).as_str())?;
let mut res = ReadMultipleResp::default();
// if req.metadata_only {}
match read_file_all(fpath).await {
Ok((data, meta)) => {
found += 1;
if req.max_size > 0 && data.len() > req.max_size {
res.exists = true;
res.error = format!("max size ({}) exceeded: {}", req.max_size, data.len());
results.push(res);
break;
}
res.exists = true;
res.data = data;
res.mod_time = match meta.modified() {
Ok(md) => OffsetDateTime::from(md),
Err(_) => return Err(Error::msg("Not supported modified on this platform")),
};
results.push(res);
if req.max_results > 0 && found >= req.max_results {
break;
}
}
Err(e) => {
if !(DiskError::is_err(&e, &DiskError::FileNotFound) || DiskError::is_err(&e, &DiskError::VolumeNotFound)) {
res.exists = true;
res.error = e.to_string();
}
if req.abort404 && !res.exists {
results.push(res);
break;
}
results.push(res);
}
}
}
Ok(results)
}
}
// pub async fn copy_bytes<S, W>(mut stream: S, writer: &mut W) -> Result<u64>
// where
// S: Stream<Item = Result<Bytes, StdError>> + Unpin,
// W: AsyncWrite + Unpin,
// {
// let mut nwritten: u64 = 0;
// while let Some(result) = stream.next().await {
// let bytes = match result {
// Ok(x) => x,
// Err(e) => return Err(Error::new(e)),
// };
// writer.write_all(&bytes).await?;
// nwritten += bytes.len() as u64;
// }
// writer.flush().await?;
// Ok(nwritten)
// }
// pub struct RemoteDisk {}
// impl RemoteDisk {
// pub fn new(_ep: &Endpoint, _health_check: bool) -> Result<Self> {
// Ok(Self {})
// }
// }
// pub(crate) struct FileWriter<'a> {
// tmp_path: PathBuf,
// dest_path: &'a Path,
// writer: BufWriter<File>,
// clean_tmp: bool,
// }
// impl<'a> FileWriter<'a> {
// pub(crate) fn tmp_path(&self) -> &Path {
// &self.tmp_path
// }
// pub(crate) fn dest_path(&self) -> &'a Path {
// self.dest_path
// }
// pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
// &mut self.writer
// }
// pub(crate) async fn done(mut self) -> Result<()> {
// if let Some(final_dir_path) = self.dest_path().parent() {
// fs::create_dir_all(&final_dir_path).await?;
// }
// fs::rename(&self.tmp_path, self.dest_path()).await?;
// self.clean_tmp = false;
// Ok(())
// }
// }
// impl<'a> Drop for FileWriter<'a> {
// fn drop(&mut self) {
// if self.clean_tmp {
// let _ = std::fs::remove_file(&self.tmp_path);
// }
// }
// }
#[cfg(test)]
mod test {

View File

@@ -41,6 +41,41 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
opts: &ReadOptions,
) -> Result<FileInfo>;
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo>;
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
pub struct ReadMultipleReq {
pub bucket: String,
pub prefix: String,
pub files: Vec<String>,
pub max_size: usize,
pub metadata_only: bool,
pub abort404: bool,
pub max_results: usize,
}
pub struct ReadMultipleResp {
pub bucket: String,
pub prefix: String,
pub file: String,
pub exists: bool,
pub error: String,
pub data: Vec<u8>,
pub mod_time: OffsetDateTime,
}
impl Default for ReadMultipleResp {
fn default() -> Self {
Self {
bucket: Default::default(),
prefix: Default::default(),
file: Default::default(),
exists: Default::default(),
error: Default::default(),
data: Default::default(),
mod_time: OffsetDateTime::UNIX_EPOCH,
}
}
}
pub struct VolumeInfo {

View File

@@ -1,5 +1,4 @@
use anyhow::anyhow;
use anyhow::Error;
use anyhow::Result;
use bytes::Bytes;
use futures::{Stream, StreamExt};
@@ -7,7 +6,7 @@ use reed_solomon_erasure::galois_8::ReedSolomon;
use s3s::StdError;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tracing::debug;
// use tracing::debug;
use uuid::Uuid;
use crate::chunk_stream::ChunkedStream;

View File

@@ -7,7 +7,8 @@ use crate::{
format::{DistributionAlgoVersion, FormatV3},
set_disk::SetDisks,
store_api::{
BucketInfo, BucketOptions, MakeBucketOptions, MultipartUploadResult, ObjectOptions, PartInfo, PutObjReader, StorageAPI,
BucketInfo, BucketOptions, CompletePart, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo,
PutObjReader, StorageAPI,
},
utils::hash,
};
@@ -149,4 +150,17 @@ impl StorageAPI for Sets {
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult> {
self.get_disks_by_key(object).new_multipart_upload(bucket, object, opts).await
}
async fn complete_multipart_upload(
&self,
bucket: &str,
object: &str,
upload_id: &str,
uploaded_parts: Vec<CompletePart>,
opts: &ObjectOptions,
) -> Result<ObjectInfo> {
self.get_disks_by_key(object)
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
.await
}
}

View File

@@ -14,7 +14,8 @@ use crate::{
peer::{PeerS3Client, S3PeerSys},
sets::Sets,
store_api::{
BucketInfo, BucketOptions, MakeBucketOptions, MultipartUploadResult, ObjectOptions, PartInfo, PutObjReader, StorageAPI,
BucketInfo, BucketOptions, CompletePart, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo,
PutObjReader, StorageAPI,
},
store_init, utils,
};
@@ -177,4 +178,19 @@ impl StorageAPI for ECStore {
}
unimplemented!()
}
async fn complete_multipart_upload(
&self,
bucket: &str,
object: &str,
upload_id: &str,
uploaded_parts: Vec<CompletePart>,
opts: &ObjectOptions,
) -> Result<ObjectInfo> {
if self.single_pool() {
return self.pools[0]
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
.await;
}
unimplemented!()
}
}

View File

@@ -1,12 +1,15 @@
use anyhow::Result;
use rmp_serde::Serializer;
use s3s::dto::StreamingBlob;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
pub const BLOCK_SIZE_V2: usize = 1048576; // 1M
#[derive(Debug, Clone)]
// #[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct FileInfo {
pub name: String,
pub volume: String,
@@ -19,6 +22,7 @@ pub struct FileInfo {
pub size: usize,
pub data: Option<Vec<u8>>,
pub fresh: bool, // indicates this is a first time call to write FileInfo.
pub parts: Vec<ObjectPartInfo>,
}
impl FileInfo {
@@ -38,6 +42,14 @@ impl FileInfo {
self.erasure.data_blocks
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut Serializer::new(&mut buf))?;
Ok(buf)
}
}
impl Default for FileInfo {
@@ -53,6 +65,7 @@ impl Default for FileInfo {
fresh: Default::default(),
name: Default::default(),
volume: Default::default(),
parts: Default::default(),
}
}
}
@@ -100,11 +113,32 @@ impl FileInfo {
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ObjectPartInfo {
// pub etag: Option<String>,
pub number: usize,
pub size: usize,
// pub actual_size: usize,
pub mod_time: OffsetDateTime,
// pub index: Option<Vec<u8>>,
// pub checksums: Option<std::collections::HashMap<String, String>>,
}
impl Default for ObjectPartInfo {
fn default() -> Self {
Self {
number: Default::default(),
size: Default::default(),
mod_time: OffsetDateTime::UNIX_EPOCH,
}
}
}
pub struct RawFileInfo {
pub buf: Vec<u8>,
}
#[derive(Debug, Default, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)]
// ErasureInfo holds erasure coding and bitrot related information.
pub struct ErasureInfo {
// Algorithm is the String representation of erasure-coding-algorithm
@@ -123,7 +157,7 @@ pub struct ErasureInfo {
pub checksums: Vec<ChecksumInfo>,
}
#[derive(Debug, Default, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)]
// ChecksumInfo - carries checksums of individual scattered parts per disk.
pub struct ChecksumInfo {
pub part_number: usize,
@@ -131,7 +165,7 @@ pub struct ChecksumInfo {
pub hash: Vec<u8>,
}
#[derive(Debug, Default, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)]
// BitrotAlgorithm specifies a algorithm used for bitrot protection.
pub enum BitrotAlgorithm {
// SHA256 represents the SHA-256 hash function
@@ -184,6 +218,10 @@ pub struct PartInfo {
pub size: usize,
}
pub struct CompletePart {}
pub struct ObjectInfo {}
#[async_trait::async_trait]
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
@@ -200,4 +238,12 @@ pub trait StorageAPI {
opts: &ObjectOptions,
) -> Result<PartInfo>;
async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<MultipartUploadResult>;
async fn complete_multipart_upload(
&self,
bucket: &str,
object: &str,
upload_id: &str,
uploaded_parts: Vec<CompletePart>,
opts: &ObjectOptions,
) -> Result<ObjectInfo>;
}

View File

@@ -1,4 +1,3 @@
use bytes::Bytes;
use futures::future::join_all;
use uuid::Uuid;

View File

@@ -302,13 +302,15 @@ impl S3 for FS {
req: S3Request<CompleteMultipartUploadInput>,
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
let CompleteMultipartUploadInput {
// multipart_upload,
multipart_upload,
bucket,
key,
// upload_id,
upload_id,
..
} = req.input;
// mc cp step 5
let output = CompleteMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),