Merge pull request #185 from rustfs/fix/etag

fix: #174  etag for multipart upload
This commit is contained in:
weisd
2024-12-28 22:17:40 +08:00
committed by GitHub
7 changed files with 92 additions and 14 deletions

View File

@@ -59,6 +59,7 @@ use crate::{
heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING},
store_api::ListObjectVersionsInfo,
};
use bytesize::ByteSize;
use chrono::Utc;
use futures::future::join_all;
use glob::Pattern;
@@ -69,6 +70,7 @@ use lock::{
LockApi,
};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use md5::{Digest as Md5Digest, Md5};
use rand::{
thread_rng,
{seq::SliceRandom, Rng},
@@ -2352,7 +2354,7 @@ impl SetDisks {
parts_metadata[index].data_dir = Some(dst_data_dir);
parts_metadata[index].add_object_part(
part.number,
part.etag.clone(),
part.e_tag.clone(),
part.size,
part.mod_time,
part.actual_size,
@@ -4050,7 +4052,7 @@ impl StorageAPI for SetDisks {
}
let part_info = ObjectPartInfo {
etag: Some(etag.clone()),
e_tag: Some(etag.clone()),
number: part_id,
size: w_size,
mod_time: Some(OffsetDateTime::now_utc()),
@@ -4431,7 +4433,7 @@ impl StorageAPI for SetDisks {
return Err(Error::new(ErasureError::InvalidPart(part_id)));
}
fi.add_object_part(part.number, part.etag.clone(), part.size, part.mod_time, part.actual_size);
fi.add_object_part(part.number, part.e_tag.clone(), part.size, part.mod_time, part.actual_size);
}
let (shuffle_disks, mut parts_metadatas) = Self::shuffle_disks_and_parts_metadata_by_index(&disks, &files_metas, &fi);
@@ -4440,23 +4442,45 @@ impl StorageAPI for SetDisks {
fi.parts = Vec::with_capacity(uploaded_parts.len());
let mut total_size: usize = 0;
let mut object_size: usize = 0;
let mut object_actual_size: usize = 0;
for (i, p) in uploaded_parts.iter().enumerate() {
let has_part = curr_fi.parts.iter().find(|v| v.number == p.part_num);
if has_part.is_none() {
// error!("complete_multipart_upload has_part.is_none() {:?}", has_part);
return Err(Error::new(ErasureError::InvalidPart(p.part_num)));
return Err(Error::new(StorageError::InvalidPart(
p.part_num,
"".to_owned(),
p.e_tag.clone().unwrap_or_default(),
)));
}
let ext_part = &curr_fi.parts[i];
if p.e_tag != ext_part.e_tag {
return Err(Error::new(StorageError::InvalidPart(
p.part_num,
ext_part.e_tag.clone().unwrap_or_default(),
p.e_tag.clone().unwrap_or_default(),
)));
}
// TODO: crypto
total_size += ext_part.size;
if (i < uploaded_parts.len() - 1) && !is_min_allowed_part_size(ext_part.size) {
return Err(Error::new(StorageError::InvalidPart(
p.part_num,
ext_part.e_tag.clone().unwrap_or_default(),
p.e_tag.clone().unwrap_or_default(),
)));
}
object_size += ext_part.size;
object_actual_size += ext_part.actual_size;
fi.parts.push(ObjectPartInfo {
etag: ext_part.etag.clone(),
e_tag: ext_part.e_tag.clone(),
number: p.part_num,
size: ext_part.size,
mod_time: ext_part.mod_time,
@@ -4464,17 +4488,34 @@ impl StorageAPI for SetDisks {
});
}
fi.size = total_size;
fi.size = object_size;
fi.mod_time = opts.mod_time;
if fi.mod_time.is_none() {
fi.mod_time = Some(OffsetDateTime::now_utc());
}
// etag
if let Some(metadata) = fi.metadata.as_mut() {
if let Some(etag) = opts.user_defined.get("etag") {
metadata.insert("etag".to_owned(), etag.clone());
} else {
metadata.insert("etag".to_owned(), get_complete_multipart_md5(&uploaded_parts));
}
}
// TODO: object_actual_size
let _ = object_actual_size;
for meta in parts_metadatas.iter_mut() {
if meta.is_valid() {
meta.size = fi.size;
meta.mod_time = fi.mod_time;
meta.parts.clone_from(&fi.parts);
meta.metadata = fi.metadata.clone();
meta.versioned = opts.versioned || opts.version_suspended;
// TODO: Checksum
}
}
@@ -5123,3 +5164,23 @@ pub async fn stat_all_dirs(disks: &[Option<DiskStore>], bucket: &str, prefix: &s
}
errs
}
const GLOBAL_MIN_PART_SIZE: ByteSize = ByteSize::mib(5);
fn is_min_allowed_part_size(size: usize) -> bool {
size as u64 >= GLOBAL_MIN_PART_SIZE.as_u64()
}
fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
let mut buf = Vec::new();
for part in parts.iter() {
if let Some(etag) = &part.e_tag {
buf.extend(etag.bytes());
}
}
let mut hasher = Md5::new();
hasher.write(&buf);
format!("{:x}-{}", hasher.finalize(), parts.len())
}

View File

@@ -137,13 +137,13 @@ impl FileInfo {
pub fn add_object_part(
&mut self,
num: usize,
etag: Option<String>,
e_tag: Option<String>,
part_size: usize,
mod_time: Option<OffsetDateTime>,
actual_size: usize,
) {
let part = ObjectPartInfo {
etag,
e_tag,
number: num,
size: part_size,
mod_time,
@@ -272,7 +272,7 @@ impl FileInfo {
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ObjectPartInfo {
pub etag: Option<String>,
pub e_tag: Option<String>,
pub number: usize,
pub size: usize,
pub actual_size: usize, // 源数据大小
@@ -606,12 +606,14 @@ pub struct PartInfo {
#[derive(Debug, Clone)]
pub struct CompletePart {
pub part_num: usize,
pub e_tag: Option<String>,
}
impl From<s3s::dto::CompletedPart> for CompletePart {
fn from(value: s3s::dto::CompletedPart) -> Self {
Self {
part_num: value.part_number.unwrap_or_default() as usize,
e_tag: value.e_tag,
}
}
}

View File

@@ -58,6 +58,9 @@ pub enum StorageError {
#[error("Invalid upload id: {0}/{1}-{2}")]
InvalidUploadID(String, String, String),
#[error("Specified part could not be found. PartNumber {0}, Expected {1}, got {2}")]
InvalidPart(usize, String, String),
#[error("Invalid version id: {0}/{1}-{2}")]
InvalidVersionID(String, String, String),
#[error("invalid data movement operation, source and destination pool are the same for : {0}/{1}-{2}")]
@@ -103,6 +106,7 @@ impl StorageError {
StorageError::InsufficientReadQuorum => 0x16,
StorageError::InsufficientWriteQuorum => 0x17,
StorageError::DecommissionNotStarted => 0x18,
StorageError::InvalidPart(_, _, _) => 0x19,
}
}
@@ -136,6 +140,7 @@ impl StorageError {
0x16 => Some(StorageError::InsufficientReadQuorum),
0x17 => Some(StorageError::InsufficientWriteQuorum),
0x18 => Some(StorageError::DecommissionNotStarted),
0x19 => Some(StorageError::InvalidPart(Default::default(), Default::default(), Default::default())),
_ => None,
}
}

View File

@@ -29,14 +29,14 @@ use lock::{lock_args::LockArgs, Locker, GLOBAL_LOCAL_SERVER};
use common::globals::GLOBAL_Local_Node_Name;
use ecstore::store_err::StorageError;
use ecstore::utils::{err_to_proto_err, error_to_u32};
use ecstore::utils::err_to_proto_err;
use madmin::health::{
get_cpus, get_mem_info, get_os_info, get_partitions, get_proc_info, get_sys_config, get_sys_errors, get_sys_services,
};
use madmin::net::get_net_info;
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{node_service_server::NodeService as Node, Error as Proto_Error, *},
proto_gen::node_service::{node_service_server::NodeService as Node, *},
};
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};

View File

@@ -211,7 +211,6 @@ async fn run(opt: config::Opt) -> Result<()> {
.await
.map_err(|err| {
error!("ECStore::new {:?}", &err);
panic!("{}", err);
Error::from_string(err.to_string())
})?;

View File

@@ -391,6 +391,8 @@ impl S3 for FS {
.await
.map_err(to_s3_error)?;
warn!("head_object info {:?}", &info);
let content_type = {
if let Some(content_type) = info.content_type {
match ContentType::from_str(&content_type) {

View File

@@ -62,6 +62,15 @@ pub fn to_s3_error(err: Error) -> S3Error {
s3_error!(SlowDown, "Storage resources are insufficient for the write operation")
}
StorageError::DecommissionNotStarted => s3_error!(InvalidArgument, "Decommission Not Started"),
StorageError::InvalidPart(bucket, object, version_id) => {
s3_error!(
InvalidPart,
"Specified part could not be found. PartNumber {}, Expected {}, got {}",
bucket,
object,
version_id
)
}
};
}