From e484002d4a0807d0eec3e633f16286f59295746f Mon Sep 17 00:00:00 2001 From: weisd Date: Sat, 28 Dec 2024 22:07:03 +0800 Subject: [PATCH] fix: #174 etag for multipart upload --- ecstore/src/set_disk.rs | 77 +++++++++++++++++++++++++++++++++---- ecstore/src/store_api.rs | 8 ++-- ecstore/src/store_err.rs | 5 +++ rustfs/src/grpc.rs | 4 +- rustfs/src/main.rs | 1 - rustfs/src/storage/ecfs.rs | 2 + rustfs/src/storage/error.rs | 9 +++++ 7 files changed, 92 insertions(+), 14 deletions(-) diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 615f1a37..3ddf13a7 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -59,6 +59,7 @@ use crate::{ heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING}, store_api::ListObjectVersionsInfo, }; +use bytesize::ByteSize; use chrono::Utc; use futures::future::join_all; use glob::Pattern; @@ -69,6 +70,7 @@ use lock::{ LockApi, }; use madmin::heal_commands::{HealDriveInfo, HealResultItem}; +use md5::{Digest as Md5Digest, Md5}; use rand::{ thread_rng, {seq::SliceRandom, Rng}, @@ -2352,7 +2354,7 @@ impl SetDisks { parts_metadata[index].data_dir = Some(dst_data_dir); parts_metadata[index].add_object_part( part.number, - part.etag.clone(), + part.e_tag.clone(), part.size, part.mod_time, part.actual_size, @@ -4050,7 +4052,7 @@ impl StorageAPI for SetDisks { } let part_info = ObjectPartInfo { - etag: Some(etag.clone()), + e_tag: Some(etag.clone()), number: part_id, size: w_size, mod_time: Some(OffsetDateTime::now_utc()), @@ -4431,7 +4433,7 @@ impl StorageAPI for SetDisks { return Err(Error::new(ErasureError::InvalidPart(part_id))); } - fi.add_object_part(part.number, part.etag.clone(), part.size, part.mod_time, part.actual_size); + fi.add_object_part(part.number, part.e_tag.clone(), part.size, part.mod_time, part.actual_size); } let (shuffle_disks, mut parts_metadatas) = Self::shuffle_disks_and_parts_metadata_by_index(&disks, &files_metas, &fi); @@ -4440,23 +4442,45 @@ impl StorageAPI for SetDisks { fi.parts = Vec::with_capacity(uploaded_parts.len()); - let mut total_size: usize = 0; + let mut object_size: usize = 0; + let mut object_actual_size: usize = 0; for (i, p) in uploaded_parts.iter().enumerate() { let has_part = curr_fi.parts.iter().find(|v| v.number == p.part_num); if has_part.is_none() { // error!("complete_multipart_upload has_part.is_none() {:?}", has_part); - return Err(Error::new(ErasureError::InvalidPart(p.part_num))); + return Err(Error::new(StorageError::InvalidPart( + p.part_num, + "".to_owned(), + p.e_tag.clone().unwrap_or_default(), + ))); } let ext_part = &curr_fi.parts[i]; + if p.e_tag != ext_part.e_tag { + return Err(Error::new(StorageError::InvalidPart( + p.part_num, + ext_part.e_tag.clone().unwrap_or_default(), + p.e_tag.clone().unwrap_or_default(), + ))); + } + // TODO: crypto - total_size += ext_part.size; + if (i < uploaded_parts.len() - 1) && !is_min_allowed_part_size(ext_part.size) { + return Err(Error::new(StorageError::InvalidPart( + p.part_num, + ext_part.e_tag.clone().unwrap_or_default(), + p.e_tag.clone().unwrap_or_default(), + ))); + } + + object_size += ext_part.size; + object_actual_size += ext_part.actual_size; fi.parts.push(ObjectPartInfo { - etag: ext_part.etag.clone(), + e_tag: ext_part.e_tag.clone(), number: p.part_num, size: ext_part.size, mod_time: ext_part.mod_time, @@ -4464,17 +4488,34 @@ impl StorageAPI for SetDisks { }); } - fi.size = total_size; + fi.size = object_size; fi.mod_time = opts.mod_time; if fi.mod_time.is_none() { fi.mod_time = Some(OffsetDateTime::now_utc()); } + // etag + + if let Some(metadata) = fi.metadata.as_mut() { + if let Some(etag) = opts.user_defined.get("etag") { + metadata.insert("etag".to_owned(), etag.clone()); + } else { + metadata.insert("etag".to_owned(), get_complete_multipart_md5(&uploaded_parts)); + } + } + + // TODO: object_actual_size + let _ = object_actual_size; + for meta in parts_metadatas.iter_mut() { if meta.is_valid() { meta.size = fi.size; meta.mod_time = fi.mod_time; meta.parts.clone_from(&fi.parts); + meta.metadata = fi.metadata.clone(); + meta.versioned = opts.versioned || opts.version_suspended; + + // TODO: Checksum } } @@ -5123,3 +5164,23 @@ pub async fn stat_all_dirs(disks: &[Option], bucket: &str, prefix: &s } errs } + +const GLOBAL_MIN_PART_SIZE: ByteSize = ByteSize::mib(5); +fn is_min_allowed_part_size(size: usize) -> bool { + size as u64 >= GLOBAL_MIN_PART_SIZE.as_u64() +} + +fn get_complete_multipart_md5(parts: &[CompletePart]) -> String { + let mut buf = Vec::new(); + + for part in parts.iter() { + if let Some(etag) = &part.e_tag { + buf.extend(etag.bytes()); + } + } + + let mut hasher = Md5::new(); + hasher.write(&buf); + + format!("{:x}-{}", hasher.finalize(), parts.len()) +} diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index fc37c3cc..f9459b05 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -137,13 +137,13 @@ impl FileInfo { pub fn add_object_part( &mut self, num: usize, - etag: Option, + e_tag: Option, part_size: usize, mod_time: Option, actual_size: usize, ) { let part = ObjectPartInfo { - etag, + e_tag, number: num, size: part_size, mod_time, @@ -272,7 +272,7 @@ impl FileInfo { #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] pub struct ObjectPartInfo { - pub etag: Option, + pub e_tag: Option, pub number: usize, pub size: usize, pub actual_size: usize, // 源数据大小 @@ -606,12 +606,14 @@ pub struct PartInfo { #[derive(Debug, Clone)] pub struct CompletePart { pub part_num: usize, + pub e_tag: Option, } impl From for CompletePart { fn from(value: s3s::dto::CompletedPart) -> Self { Self { part_num: value.part_number.unwrap_or_default() as usize, + e_tag: value.e_tag, } } } diff --git a/ecstore/src/store_err.rs b/ecstore/src/store_err.rs index 518e48a8..b1b7a4cb 100644 --- a/ecstore/src/store_err.rs +++ b/ecstore/src/store_err.rs @@ -58,6 +58,9 @@ pub enum StorageError { #[error("Invalid upload id: {0}/{1}-{2}")] InvalidUploadID(String, String, String), + #[error("Specified part could not be found. PartNumber {0}, Expected {1}, got {2}")] + InvalidPart(usize, String, String), + #[error("Invalid version id: {0}/{1}-{2}")] InvalidVersionID(String, String, String), #[error("invalid data movement operation, source and destination pool are the same for : {0}/{1}-{2}")] @@ -103,6 +106,7 @@ impl StorageError { StorageError::InsufficientReadQuorum => 0x16, StorageError::InsufficientWriteQuorum => 0x17, StorageError::DecommissionNotStarted => 0x18, + StorageError::InvalidPart(_, _, _) => 0x19, } } @@ -136,6 +140,7 @@ impl StorageError { 0x16 => Some(StorageError::InsufficientReadQuorum), 0x17 => Some(StorageError::InsufficientWriteQuorum), 0x18 => Some(StorageError::DecommissionNotStarted), + 0x19 => Some(StorageError::InvalidPart(Default::default(), Default::default(), Default::default())), _ => None, } } diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 023cbd07..0e488712 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -29,14 +29,14 @@ use lock::{lock_args::LockArgs, Locker, GLOBAL_LOCAL_SERVER}; use common::globals::GLOBAL_Local_Node_Name; use ecstore::store_err::StorageError; -use ecstore::utils::{err_to_proto_err, error_to_u32}; +use ecstore::utils::err_to_proto_err; use madmin::health::{ get_cpus, get_mem_info, get_os_info, get_partitions, get_proc_info, get_sys_config, get_sys_errors, get_sys_services, }; use madmin::net::get_net_info; use protos::{ models::{PingBody, PingBodyBuilder}, - proto_gen::node_service::{node_service_server::NodeService as Node, Error as Proto_Error, *}, + proto_gen::node_service::{node_service_server::NodeService as Node, *}, }; use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 4dea0f4f..f9b6746a 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -211,7 +211,6 @@ async fn run(opt: config::Opt) -> Result<()> { .await .map_err(|err| { error!("ECStore::new {:?}", &err); - panic!("{}", err); Error::from_string(err.to_string()) })?; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 917176d4..7dc8e84e 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -391,6 +391,8 @@ impl S3 for FS { .await .map_err(to_s3_error)?; + warn!("head_object info {:?}", &info); + let content_type = { if let Some(content_type) = info.content_type { match ContentType::from_str(&content_type) { diff --git a/rustfs/src/storage/error.rs b/rustfs/src/storage/error.rs index 53aad658..8de64552 100644 --- a/rustfs/src/storage/error.rs +++ b/rustfs/src/storage/error.rs @@ -62,6 +62,15 @@ pub fn to_s3_error(err: Error) -> S3Error { s3_error!(SlowDown, "Storage resources are insufficient for the write operation") } StorageError::DecommissionNotStarted => s3_error!(InvalidArgument, "Decommission Not Started"), + StorageError::InvalidPart(bucket, object, version_id) => { + s3_error!( + InvalidPart, + "Specified part could not be found. PartNumber {}, Expected {}, got {}", + bucket, + object, + version_id + ) + } }; }