diff --git a/Cargo.lock b/Cargo.lock index 704d1422..1e6e755c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,9 +313,11 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64-simd", "bytes", "crc32fast", "futures", + "hex-simd", "lazy_static", "netif", "path-absolutize", @@ -325,6 +327,7 @@ dependencies = [ "s3s", "serde", "serde_json", + "sha2", "siphasher", "thiserror", "time", diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 8b7a5d17..5b227e4e 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -33,6 +33,9 @@ tokio-util = { version = "0.7.11", features = ["io"] } s3s = "0.10.0" crc32fast = "1.4.2" siphasher = "1.0.1" +base64-simd = "0.8.0" +sha2 = "0.10.8" +hex-simd = "0.8.0" [dev-dependencies] diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 3f0ba828..e708a2a7 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -20,7 +20,7 @@ use tracing::debug; use uuid::Uuid; use crate::{ - disk_api::{DiskAPI, DiskError, VolumeInfo}, + disk_api::{DiskAPI, DiskError, ReadOptions, VolumeInfo}, endpoint::{Endpoint, Endpoints}, file_meta::FileMeta, format::{DistributionAlgoVersion, FormatV3}, @@ -217,6 +217,48 @@ impl LocalDisk { Ok(()) } + + async fn read_raw( + &self, + bucket: &str, + volume_dir: impl AsRef, + path: impl AsRef, + read_data: bool, + ) -> Result<(Vec, OffsetDateTime)> { + let meta_path = path.as_ref().join(Path::new(STORAGE_FORMAT_FILE)); + if read_data { + self.read_all_data(bucket, volume_dir, meta_path).await + } else { + self.read_metadata_with_dmtime(meta_path).await + } + } + + async fn read_metadata_with_dmtime(&self, path: impl AsRef) -> Result<(Vec, OffsetDateTime)> { + let (data, meta) = read_file_all(path).await?; + + let modtime = match meta.modified() { + Ok(md) => OffsetDateTime::from(md), + Err(_) => return Err(Error::msg("Not supported modified on this platform")), + }; + + Ok((data, modtime)) + } + + async fn read_all_data( + &self, + bucket: &str, + volume_dir: impl AsRef, + path: impl AsRef, + ) -> Result<(Vec, OffsetDateTime)> { + let (data, meta) = read_file_all(path).await?; + + let modtime = match meta.modified() { + Ok(md) => OffsetDateTime::from(md), + Err(_) => return Err(Error::msg("Not supported modified on this platform")), + }; + + Ok((data, modtime)) + } } fn is_root_path(path: &PathBuf) -> bool { @@ -245,6 +287,14 @@ pub async fn read_file_exists(path: impl AsRef) -> Result<(Vec, Option Ok((data, meta)) } +pub async fn write_all_internal(p: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { + // create top dir if not exists + fs::create_dir_all(&p.as_ref().parent().unwrap_or_else(|| Path::new("."))).await?; + + fs::write(&p, data).await?; + Ok(()) +} + pub async fn read_file_all(path: impl AsRef) -> Result<(Vec, Metadata)> { let p = path.as_ref(); let meta = read_file_metadata(&path).await?; @@ -307,10 +357,8 @@ impl DiskAPI for LocalDisk { async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> { let p = self.get_object_path(&volume, &path)?; - // create top dir if not exists - fs::create_dir_all(&p.parent().unwrap_or_else(|| Path::new("."))).await?; + write_all_internal(p, data).await?; - fs::write(&p, data).await?; Ok(()) } @@ -418,7 +466,7 @@ impl DiskAPI for LocalDisk { } if !dst_buf.is_empty() { - meta = match FileMeta::unmarshal(dst_buf) { + meta = match FileMeta::unmarshal(&dst_buf) { Ok(m) => m, Err(e) => FileMeta::new(), } @@ -492,6 +540,44 @@ impl DiskAPI for LocalDisk { created: modtime, }) } + + async fn write_metadata(&self, org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { + let p = self.get_object_path(&volume, format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str())?; + + let mut meta = FileMeta::new(); + if !fi.fresh { + let (buf, _) = read_file_exists(&p).await?; + if !buf.is_empty() { + meta = FileMeta::unmarshal(&buf)?; + } + } + + meta.add_version(fi)?; + + let fm_data = meta.marshal_msg()?; + + write_all_internal(p, fm_data).await?; + + return Ok(()); + } + + async fn read_version( + &self, + org_volume: &str, + volume: &str, + path: &str, + version_id: &str, + opts: ReadOptions, + ) -> Result { + let file_path = self.get_object_path(volume, path)?; + let file_dir = self.get_bucket_path(volume)?; + + let read_data = opts.read_data; + + let (data, _) = self.read_raw(volume, file_dir, file_path, read_data).await?; + + unimplemented!() + } } // pub async fn copy_bytes(mut stream: S, writer: &mut W) -> Result diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 3995a4cf..7503acd1 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -27,6 +27,16 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>; async fn make_volume(&self, volume: &str) -> Result<()>; async fn stat_volume(&self, volume: &str) -> Result; + + async fn write_metadata(&self, org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()>; + async fn read_version( + &self, + org_volume: &str, + volume: &str, + path: &str, + version_id: &str, + opts: ReadOptions, + ) -> Result; } pub struct VolumeInfo { @@ -34,10 +44,19 @@ pub struct VolumeInfo { pub created: OffsetDateTime, } +pub struct ReadOptions { + pub read_data: bool, + // pub healing: bool, +} + #[derive(Debug, thiserror::Error)] pub enum DiskError { #[error("file not found")] FileNotFound, + + #[error("file version not found")] + FileVersionNotFound, + #[error("disk not found")] DiskNotFound, diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 8795b14d..bbaa929b 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,19 +1,24 @@ use std::sync::Arc; use anyhow::{Error, Result}; - use futures::{future::join_all, AsyncWrite, StreamExt}; use time::OffsetDateTime; use tracing::debug; use uuid::Uuid; use crate::{ - disk::{self, DiskStore, RUSTFS_META_TMP_BUCKET}, + disk::{self, DiskStore, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET}, endpoint::PoolEndpoints, erasure::Erasure, format::{DistributionAlgoVersion, FormatV3}, - store_api::{BucketInfo, BucketOptions, FileInfo, MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI}, - utils::hash, + store_api::{ + BucketInfo, BucketOptions, FileInfo, MakeBucketOptions, MultipartUploadResult, ObjectOptions, PartInfo, PutObjReader, + StorageAPI, + }, + utils::{ + crypto::{base64_decode, base64_encode, hex, sha256}, + hash, + }, }; const DEFAULT_INLINE_BLOCKS: usize = 128 * 1024; @@ -142,6 +147,61 @@ impl Sets { } } +async fn write_unique_file_info( + disks: &Vec>, + org_bucket: &str, + bucket: &str, + prefix: &str, + files: &Vec, + // write_quorum: usize, +) -> Vec> { + let mut futures = Vec::with_capacity(disks.len()); + + for (i, disk) in disks.iter().enumerate() { + let disk = disk.as_ref().unwrap(); + let mut file_info = files[i].clone(); + file_info.erasure.index = i + 1; + futures.push(async move { disk.write_metadata(org_bucket, bucket, prefix, file_info).await }) + } + + let mut errors = Vec::with_capacity(disks.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(_) => { + errors.push(None); + } + Err(e) => { + errors.push(Some(e)); + } + } + } + errors +} + +fn get_upload_id_dir(bucket: &str, object: &str, upload_id: &str) -> String { + let upload_uuid = match base64_decode(upload_id.as_bytes()) { + Ok(res) => { + let decoded_str = String::from_utf8(res).expect("Failed to convert decoded bytes to a UTF-8 string"); + let parts: Vec<&str> = decoded_str.splitn(2, '.').collect(); + if parts.len() == 2 { + parts[1].to_string() + } else { + upload_id.to_string() + } + } + Err(_) => upload_id.to_string(), + }; + + format!("{}/{}", get_multipart_sha_dir(bucket, object), upload_uuid) +} + +fn get_multipart_sha_dir(bucket: &str, object: &str) -> String { + let path = format!("{}/{}", bucket, object); + hex(sha256(path.as_bytes()).as_ref()) +} + // #[derive(Debug)] // pub struct Objects { // pub endpoints: Vec, @@ -162,7 +222,7 @@ impl StorageAPI for Sets { unimplemented!() } - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()> { + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<()> { let disks = self.get_disks_by_key(object); let mut parity_drives = self.partiy_count; @@ -265,6 +325,71 @@ impl StorageAPI for Sets { Ok(()) } + + async fn put_object_part( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_id: usize, + data: PutObjReader, + opts: &ObjectOptions, + ) -> Result { + let upload_path = get_upload_id_dir(bucket, object, upload_id); + + // TODO: checkUploadIDExists + + unimplemented!() + } + + async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { + let disks = self.get_disks_by_key(object); + + let mut parity_drives = self.partiy_count; + if opts.max_parity { + parity_drives = disks.len() / 2; + } + + let data_drives = disks.len() - parity_drives; + let mut write_quorum = data_drives; + if data_drives == parity_drives { + write_quorum += 1 + } + + let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives); + + fi.data_dir = Uuid::new_v4(); + fi.fresh = true; + + let parts_metadata = vec![fi.clone(); disks.len()]; + + let (shuffle_disks, mut shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi); + + for fi in shuffle_parts_metadata.iter_mut() { + fi.mod_time = OffsetDateTime::now_utc(); + } + + let upload_uuid = format!("{}x{}", Uuid::new_v4(), fi.mod_time); + + let upload_id = base64_encode(format!("{}.{}", "globalDeploymentID", upload_uuid).as_bytes()); + + let upload_path = get_upload_id_dir(bucket, object, upload_uuid.as_str()); + + let errs = write_unique_file_info( + &shuffle_disks, + bucket, + RUSTFS_META_MULTIPART_BUCKET, + upload_path.as_str(), + &shuffle_parts_metadata, + ) + .await; + + debug!("write_unique_file_info errs :{:?}", &errs); + // TODO: reduceWriteQuorumErrs + // evalDisks + + Ok(MultipartUploadResult { upload_id }) + } } // 打乱顺序 diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 3655a6bd..f66c2916 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -14,7 +14,9 @@ use crate::{ endpoint::EndpointServerPools, peer::{PeerS3Client, S3PeerSys}, sets::Sets, - store_api::{BucketInfo, BucketOptions, MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI}, + store_api::{ + BucketInfo, BucketOptions, MakeBucketOptions, MultipartUploadResult, ObjectOptions, PartInfo, PutObjReader, StorageAPI, + }, store_init, utils, }; @@ -128,7 +130,7 @@ impl StorageAPI for ECStore { let reader = PutObjReader::new(StreamingBlob::from(body), content_len); - self.put_object(RUSTFS_META_BUCKET, &file_path, reader, ObjectOptions { max_parity: true }) + self.put_object(RUSTFS_META_BUCKET, &file_path, reader, &ObjectOptions { max_parity: true }) .await?; // TODO: toObjectErr @@ -140,17 +142,39 @@ impl StorageAPI for ECStore { Ok(info) } - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()> { + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<()> { // checkPutObjectArgs let object = utils::path::encode_dir_object(object); if self.single_pool() { - println!("put_object single_pool"); - self.pools[0].put_object(bucket, object.as_str(), data, opts).await?; - return Ok(()); + return self.pools[0].put_object(bucket, object.as_str(), data, opts).await; } unimplemented!() } + + async fn put_object_part( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_id: usize, + data: PutObjReader, + opts: &ObjectOptions, + ) -> Result { + if self.single_pool() { + return self.pools[0] + .put_object_part(bucket, object, upload_id, part_id, data, opts) + .await; + } + unimplemented!() + } + + async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { + if self.single_pool() { + return self.pools[0].new_multipart_upload(bucket, object, opts).await; + } + unimplemented!() + } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 999aa492..b24548c7 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -21,6 +21,7 @@ pub struct FileInfo { pub mod_time: OffsetDateTime, pub size: usize, pub data: Vec, + pub fresh: bool, // indicates this is a first time call to write FileInfo. } impl FileInfo { @@ -40,6 +41,7 @@ impl Default for FileInfo { mod_time: OffsetDateTime::UNIX_EPOCH, size: Default::default(), data: Default::default(), + fresh: Default::default(), } } } @@ -157,10 +159,30 @@ pub struct BucketInfo { pub created: OffsetDateTime, } +pub struct MultipartUploadResult { + pub upload_id: String, +} + +pub struct PartInfo { + pub part_num: usize, + pub last_mod: OffsetDateTime, + pub size: usize, +} + #[async_trait::async_trait] pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: ObjectOptions) -> Result<()>; + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<()>; + async fn put_object_part( + &self, + bucket: &str, + object: &str, + upload_id: &str, + part_id: usize, + data: PutObjReader, + opts: &ObjectOptions, + ) -> Result; + async fn new_multipart_upload(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; } diff --git a/ecstore/src/utils/crypto.rs b/ecstore/src/utils/crypto.rs new file mode 100644 index 00000000..5a7f86b3 --- /dev/null +++ b/ecstore/src/utils/crypto.rs @@ -0,0 +1,25 @@ +pub fn base64_encode(input: &[u8]) -> String { + base64_simd::URL_SAFE_NO_PAD.encode_to_string(input) +} + +pub fn base64_decode(input: &[u8]) -> Result, base64_simd::Error> { + base64_simd::URL_SAFE_NO_PAD.decode_to_vec(input) +} + +pub fn hex(data: impl AsRef<[u8]>) -> String { + hex_simd::encode_to_string(data, hex_simd::AsciiCase::Lower) +} + +#[cfg(not(all(feature = "openssl", not(windows))))] +pub fn sha256(data: &[u8]) -> impl AsRef<[u8; 32]> { + use sha2::{Digest, Sha256}; + ::digest(data) +} + +#[cfg(all(feature = "openssl", not(windows)))] +pub fn sha256(data: &[u8]) -> impl AsRef<[u8]> { + use openssl::hash::{Hasher, MessageDigest}; + let mut h = Hasher::new(MessageDigest::sha256()).unwrap(); + h.update(data).unwrap(); + h.finish().unwrap() +} diff --git a/ecstore/src/utils/mod.rs b/ecstore/src/utils/mod.rs index dc9cae50..20d8332a 100644 --- a/ecstore/src/utils/mod.rs +++ b/ecstore/src/utils/mod.rs @@ -1,3 +1,4 @@ +pub mod crypto; pub mod hash; pub mod net; pub mod path; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index ef2bd1a2..5c2ecdb4 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use ecstore::disk_api::DiskError; use ecstore::store_api::BucketOptions; use ecstore::store_api::MakeBucketOptions; +use ecstore::store_api::MultipartUploadResult; use ecstore::store_api::ObjectOptions; use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; @@ -96,6 +97,14 @@ impl S3 for FS { async fn get_bucket_location(&self, req: S3Request) -> S3Result> { let input = req.input; + if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await { + if DiskError::is_err(&e, &DiskError::VolumeNotFound) { + return Err(s3_error!(NoSuchBucket)); + } else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e))); + } + } + let output = GetBucketLocationOutput::default(); Ok(S3Response::new(output)) } @@ -198,7 +207,7 @@ impl S3 for FS { let reader = PutObjReader::new(body.into(), content_length as usize); - try_!(self.store.put_object(&bucket, &key, reader, ObjectOptions::default()).await); + try_!(self.store.put_object(&bucket, &key, reader, &ObjectOptions::default()).await); // self.store.put_object(bucket, object, data, opts); @@ -211,11 +220,22 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { - let input = req.input; + let CreateMultipartUploadInput { bucket, key, .. } = req.input; // mc cp step 3 - let output = CreateMultipartUploadOutput { ..Default::default() }; + let MultipartUploadResult { upload_id, .. } = try_!( + self.store + .new_multipart_upload(&bucket, &key, &ObjectOptions::default()) + .await + ); + + let output = CreateMultipartUploadOutput { + bucket: Some(bucket), + key: Some(key), + upload_id: Some(upload_id), + ..Default::default() + }; Ok(S3Response::new(output)) } @@ -226,9 +246,15 @@ impl S3 for FS { body, upload_id, part_number, + content_length, .. } = req.input; + let body = body.ok_or_else(|| s3_error!(IncompleteBody))?; + let content_length = content_length.ok_or_else(|| s3_error!(IncompleteBody))?; + + // mc cp step 4 + let output = UploadPartOutput { ..Default::default() }; Ok(S3Response::new(output)) }