mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
change objectinfo metadata use HashMap
add copyobject compresss support fix listobject size use actual_size
This commit is contained in:
@@ -480,7 +480,7 @@ impl Event {
|
||||
s3_metadata.object.content_type = args.object.content_type.clone();
|
||||
// Filter out internal reserved metadata
|
||||
let mut user_metadata = HashMap::new();
|
||||
for (k, v) in &args.object.user_defined.unwrap_or_default() {
|
||||
for (k, v) in args.object.user_defined.iter() {
|
||||
if !k.to_lowercase().starts_with("x-amz-meta-internal-") {
|
||||
user_metadata.insert(k.clone(), v.clone());
|
||||
}
|
||||
|
||||
@@ -474,7 +474,7 @@ mod tests {
|
||||
async fn test_compress_reader_large() {
|
||||
use rand::Rng;
|
||||
// Generate 1MB of random bytes
|
||||
let mut data = vec![0u8; 1024 * 1024];
|
||||
let mut data = vec![0u8; 1024 * 1024 * 32];
|
||||
rand::rng().fill(&mut data[..]);
|
||||
let reader = Cursor::new(data.clone());
|
||||
let mut compress_reader = CompressReader::new(WarpReader::new(reader), CompressionAlgorithm::Gzip);
|
||||
|
||||
@@ -417,21 +417,16 @@ pub async fn get_heal_replicate_object_info(
|
||||
// .map(|(k, v)| (k.clone(), v.clone()))
|
||||
// .collect::<HashMap<String, String>>()
|
||||
// .collect();
|
||||
let to_replace: Vec<(String, String)> = match &user_defined {
|
||||
Some(map) => map
|
||||
.iter()
|
||||
.filter(|(k, _)| k.eq_ignore_ascii_case(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_RESET)))
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect(),
|
||||
None => Vec::new(),
|
||||
};
|
||||
let to_replace: Vec<(String, String)> = user_defined
|
||||
.iter()
|
||||
.filter(|(k, _)| k.eq_ignore_ascii_case(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_RESET)))
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
|
||||
// 第二步:apply 修改
|
||||
for (k, v) in to_replace {
|
||||
if let Some(mp) = user_defined.as_mut() {
|
||||
mp.remove(&k);
|
||||
mp.insert(target_reset_header(&rcfg.role), v);
|
||||
}
|
||||
user_defined.remove(&k);
|
||||
user_defined.insert(target_reset_header(&rcfg.role), v);
|
||||
}
|
||||
}
|
||||
//}
|
||||
@@ -468,13 +463,8 @@ pub async fn get_heal_replicate_object_info(
|
||||
mod_time: oi.mod_time,
|
||||
..Default::default()
|
||||
};
|
||||
let repoptions = get_must_replicate_options(
|
||||
mt2.as_ref().unwrap_or(&HashMap::new()),
|
||||
"",
|
||||
ReplicationStatusType::Unknown,
|
||||
ReplicationType::ObjectReplicationType,
|
||||
&opts,
|
||||
);
|
||||
let repoptions =
|
||||
get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType, &opts);
|
||||
|
||||
let decision = must_replicate(&oi.bucket, &oi.name, &repoptions).await;
|
||||
error!("decision:");
|
||||
@@ -490,11 +480,10 @@ pub async fn get_heal_replicate_object_info(
|
||||
// .and_then(|v| DateTime::parse_from_rfc3339(v).ok())
|
||||
// .map(|dt| dt.with_timezone(&Utc));
|
||||
|
||||
let tm = user_defined.as_ref().and_then(|map| {
|
||||
map.get(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_TIMESTAMP))
|
||||
.and_then(|v| DateTime::parse_from_rfc3339(v).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
});
|
||||
let tm = user_defined
|
||||
.get(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_TIMESTAMP))
|
||||
.and_then(|v| DateTime::parse_from_rfc3339(v).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc));
|
||||
|
||||
let mut rstate = oi.replication_state();
|
||||
rstate.replicate_decision_str = dsc.to_string();
|
||||
@@ -503,8 +492,6 @@ pub async fn get_heal_replicate_object_info(
|
||||
|
||||
let key = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, REPLICATION_TIMESTAMP);
|
||||
let tm: Option<DateTime<Utc>> = user_defined
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get(&key)
|
||||
.and_then(|v| DateTime::parse_from_rfc3339(v).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc));
|
||||
@@ -1731,14 +1718,14 @@ impl TraitForObjectInfo for ObjectInfo {
|
||||
rs.purge_targets = version_purge_statuses_map("");
|
||||
|
||||
// Process reset statuses map
|
||||
if self.user_defined.is_some() {
|
||||
for (k, v) in self.user_defined.as_ref().unwrap() {
|
||||
if k.starts_with(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_RESET)) {
|
||||
let arn = k.trim_start_matches(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_RESET));
|
||||
rs.reset_statuses_map.insert(arn.to_string(), v.clone());
|
||||
}
|
||||
|
||||
for (k, v) in self.user_defined.iter() {
|
||||
if k.starts_with(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_RESET)) {
|
||||
let arn = k.trim_start_matches(&(RESERVED_METADATA_PREFIX_LOWER.to_owned() + REPLICATION_RESET));
|
||||
rs.reset_statuses_map.insert(arn.to_string(), v.clone());
|
||||
}
|
||||
}
|
||||
|
||||
rs
|
||||
}
|
||||
}
|
||||
@@ -2021,7 +2008,7 @@ impl ReplicateObjectInfo {
|
||||
size: self.size,
|
||||
actual_size: self.actual_size,
|
||||
is_dir: false,
|
||||
user_defined: None, // 可以按需从别处导入
|
||||
user_defined: HashMap::new(), // 可以按需从别处导入
|
||||
parity_blocks: 0,
|
||||
data_blocks: 0,
|
||||
version_id: Uuid::try_parse(&self.version_id).ok(),
|
||||
|
||||
@@ -573,8 +573,7 @@ impl ScannerItem {
|
||||
let mut oi = oi.clone();
|
||||
oi.replication_status = ReplicationStatusType::from(
|
||||
oi.user_defined
|
||||
.as_ref()
|
||||
.and_then(|map| map.get("x-amz-bucket-replication-status"))
|
||||
.get("x-amz-bucket-replication-status")
|
||||
.unwrap_or(&"PENDING".to_string()),
|
||||
);
|
||||
info!("apply status is: {:?}", oi.replication_status);
|
||||
|
||||
@@ -3816,7 +3816,7 @@ impl ObjectIO for SetDisks {
|
||||
// _ns = Some(ns_lock);
|
||||
// }
|
||||
|
||||
let mut user_defined = opts.user_defined.clone().unwrap_or_default();
|
||||
let mut user_defined = opts.user_defined.clone();
|
||||
|
||||
let sc_parity_drives = {
|
||||
if let Some(sc) = GLOBAL_StorageClass.get() {
|
||||
@@ -4152,19 +4152,17 @@ impl StorageAPI for SetDisks {
|
||||
};
|
||||
|
||||
let inline_data = fi.inline_data();
|
||||
fi.metadata = src_info.user_defined.as_ref().cloned().unwrap_or_default();
|
||||
fi.metadata = src_info.user_defined.clone();
|
||||
|
||||
if let Some(ud) = src_info.user_defined.as_mut() {
|
||||
if let Some(etag) = &src_info.etag {
|
||||
ud.insert("etag".to_owned(), etag.clone());
|
||||
}
|
||||
if let Some(etag) = &src_info.etag {
|
||||
fi.metadata.insert("etag".to_owned(), etag.clone());
|
||||
}
|
||||
|
||||
let mod_time = OffsetDateTime::now_utc();
|
||||
|
||||
for fi in metas.iter_mut() {
|
||||
if fi.is_valid() {
|
||||
fi.metadata = src_info.user_defined.as_ref().cloned().unwrap_or_default();
|
||||
fi.metadata = src_info.user_defined.clone();
|
||||
fi.mod_time = Some(mod_time);
|
||||
fi.version_id = version_id;
|
||||
fi.versioned = src_opts.versioned || src_opts.version_suspended;
|
||||
@@ -4439,10 +4437,8 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
let obj_info = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
|
||||
|
||||
if let Some(ud) = obj_info.user_defined.as_ref() {
|
||||
for (k, v) in ud {
|
||||
fi.metadata.insert(k.clone(), v.clone());
|
||||
}
|
||||
for (k, v) in obj_info.user_defined {
|
||||
fi.metadata.insert(k, v);
|
||||
}
|
||||
|
||||
if let Some(mt) = &opts.eval_metadata {
|
||||
@@ -4851,7 +4847,7 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
let disks = disks.clone();
|
||||
|
||||
let mut user_defined = opts.user_defined.clone().unwrap_or_default();
|
||||
let mut user_defined = opts.user_defined.clone();
|
||||
|
||||
if let Some(ref etag) = opts.preserve_etag {
|
||||
user_defined.insert("etag".to_owned(), etag.clone());
|
||||
@@ -5111,7 +5107,7 @@ impl StorageAPI for SetDisks {
|
||||
|
||||
// etag
|
||||
let etag = {
|
||||
if let Some(Some(etag)) = opts.user_defined.as_ref().map(|v| v.get("etag")) {
|
||||
if let Some(etag) = opts.user_defined.get("etag") {
|
||||
etag.clone()
|
||||
} else {
|
||||
get_complete_multipart_md5(&uploaded_parts)
|
||||
|
||||
@@ -283,7 +283,7 @@ pub struct ObjectOptions {
|
||||
|
||||
pub data_movement: bool,
|
||||
pub src_pool_idx: usize,
|
||||
pub user_defined: Option<HashMap<String, String>>,
|
||||
pub user_defined: HashMap<String, String>,
|
||||
pub preserve_etag: Option<String>,
|
||||
pub metadata_chg: bool,
|
||||
|
||||
@@ -357,7 +357,7 @@ pub struct ObjectInfo {
|
||||
// Actual size is the real size of the object uploaded by client.
|
||||
pub actual_size: i64,
|
||||
pub is_dir: bool,
|
||||
pub user_defined: Option<HashMap<String, String>>,
|
||||
pub user_defined: HashMap<String, String>,
|
||||
pub parity_blocks: usize,
|
||||
pub data_blocks: usize,
|
||||
pub version_id: Option<Uuid>,
|
||||
@@ -418,18 +418,15 @@ impl Clone for ObjectInfo {
|
||||
|
||||
impl ObjectInfo {
|
||||
pub fn is_compressed(&self) -> bool {
|
||||
if let Some(meta) = &self.user_defined {
|
||||
meta.contains_key(&format!("{}compression", RESERVED_METADATA_PREFIX_LOWER))
|
||||
} else {
|
||||
false
|
||||
}
|
||||
self.user_defined
|
||||
.contains_key(&format!("{}compression", RESERVED_METADATA_PREFIX_LOWER))
|
||||
}
|
||||
|
||||
pub fn is_compressed_ok(&self) -> Result<(CompressionAlgorithm, bool)> {
|
||||
let scheme = self
|
||||
.user_defined
|
||||
.as_ref()
|
||||
.and_then(|meta| meta.get(&format!("{}compression", RESERVED_METADATA_PREFIX_LOWER)).cloned());
|
||||
.get(&format!("{}compression", RESERVED_METADATA_PREFIX_LOWER))
|
||||
.cloned();
|
||||
|
||||
if let Some(scheme) = scheme {
|
||||
let algorithm = CompressionAlgorithm::from_str(&scheme)?;
|
||||
@@ -449,16 +446,16 @@ impl ObjectInfo {
|
||||
}
|
||||
|
||||
if self.is_compressed() {
|
||||
if let Some(meta) = &self.user_defined {
|
||||
if let Some(size_str) = meta.get(&format!("{}actual-size", RESERVED_METADATA_PREFIX_LOWER)) {
|
||||
if !size_str.is_empty() {
|
||||
// Todo: deal with error
|
||||
let size = size_str.parse::<i64>().map_err(|e| std::io::Error::other(e.to_string()))?;
|
||||
return Ok(size);
|
||||
}
|
||||
if let Some(size_str) = self
|
||||
.user_defined
|
||||
.get(&format!("{}actual-size", RESERVED_METADATA_PREFIX_LOWER))
|
||||
{
|
||||
if !size_str.is_empty() {
|
||||
// Todo: deal with error
|
||||
let size = size_str.parse::<i64>().map_err(|e| std::io::Error::other(e.to_string()))?;
|
||||
return Ok(size);
|
||||
}
|
||||
}
|
||||
|
||||
let mut actual_size = 0;
|
||||
self.parts.iter().for_each(|part| {
|
||||
actual_size += part.actual_size;
|
||||
@@ -505,7 +502,7 @@ impl ObjectInfo {
|
||||
let metadata = {
|
||||
let mut v = fi.metadata.clone();
|
||||
clean_metadata(&mut v);
|
||||
Some(v)
|
||||
v
|
||||
};
|
||||
|
||||
// Convert parts from rustfs_filemeta::ObjectPartInfo to store_api::ObjectPartInfo
|
||||
|
||||
@@ -67,6 +67,7 @@ use query::instance::make_rustfsms;
|
||||
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
|
||||
use rustfs_rio::CompressReader;
|
||||
use rustfs_rio::EtagReader;
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_rio::Reader;
|
||||
use rustfs_rio::WarpReader;
|
||||
@@ -329,7 +330,7 @@ impl S3 for FS {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let dst_opts = copy_dst_opts(&bucket, &key, version_id, &req.headers, None)
|
||||
let dst_opts = copy_dst_opts(&bucket, &key, version_id, &req.headers, HashMap::new())
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
@@ -356,13 +357,50 @@ impl S3 for FS {
|
||||
src_info.metadata_only = true;
|
||||
}
|
||||
|
||||
let reader = Box::new(WarpReader::new(gr.stream));
|
||||
let hrd = HashReader::new(reader, gr.object_info.size, gr.object_info.size, None, false).map_err(ApiError::from)?;
|
||||
let mut reader: Box<dyn Reader> = Box::new(WarpReader::new(gr.stream));
|
||||
|
||||
let actual_size = src_info.get_actual_size().map_err(ApiError::from)?;
|
||||
|
||||
let mut length = actual_size;
|
||||
|
||||
let mut compress_metadata = HashMap::new();
|
||||
|
||||
if is_compressible(&req.headers, &key) && actual_size > MIN_COMPRESSIBLE_SIZE as i64 {
|
||||
compress_metadata.insert(
|
||||
format!("{}compression", RESERVED_METADATA_PREFIX_LOWER),
|
||||
CompressionAlgorithm::default().to_string(),
|
||||
);
|
||||
compress_metadata.insert(format!("{}actual-size", RESERVED_METADATA_PREFIX_LOWER,), actual_size.to_string());
|
||||
|
||||
let hrd = EtagReader::new(reader, None);
|
||||
|
||||
// let hrd = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?;
|
||||
|
||||
reader = Box::new(CompressReader::new(hrd, CompressionAlgorithm::default()));
|
||||
length = -1;
|
||||
} else {
|
||||
src_info
|
||||
.user_defined
|
||||
.remove(&format!("{}compression", RESERVED_METADATA_PREFIX_LOWER));
|
||||
src_info
|
||||
.user_defined
|
||||
.remove(&format!("{}actual-size", RESERVED_METADATA_PREFIX_LOWER));
|
||||
src_info
|
||||
.user_defined
|
||||
.remove(&format!("{}compression-size", RESERVED_METADATA_PREFIX_LOWER));
|
||||
}
|
||||
|
||||
let hrd = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?;
|
||||
|
||||
src_info.put_object_reader = Some(PutObjReader::new(hrd));
|
||||
|
||||
// check quota
|
||||
// TODO: src metadada
|
||||
|
||||
for (k, v) in compress_metadata {
|
||||
src_info.user_defined.insert(k, v);
|
||||
}
|
||||
|
||||
// TODO: src tags
|
||||
|
||||
let oi = store
|
||||
@@ -417,7 +455,7 @@ impl S3 for FS {
|
||||
|
||||
let metadata = extract_metadata(&req.headers);
|
||||
|
||||
let opts: ObjectOptions = del_opts(&bucket, &key, version_id, &req.headers, Some(metadata))
|
||||
let opts: ObjectOptions = del_opts(&bucket, &key, version_id, &req.headers, metadata)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
@@ -487,7 +525,7 @@ impl S3 for FS {
|
||||
|
||||
let metadata = extract_metadata(&req.headers);
|
||||
|
||||
let opts: ObjectOptions = del_opts(&bucket, "", None, &req.headers, Some(metadata))
|
||||
let opts: ObjectOptions = del_opts(&bucket, "", None, &req.headers, metadata)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
@@ -724,7 +762,7 @@ impl S3 for FS {
|
||||
content_type,
|
||||
last_modified,
|
||||
e_tag: info.etag,
|
||||
metadata,
|
||||
metadata: Some(metadata),
|
||||
version_id: info.version_id.map(|v| v.to_string()),
|
||||
// metadata: object_metadata,
|
||||
..Default::default()
|
||||
@@ -844,7 +882,7 @@ impl S3 for FS {
|
||||
let mut obj = Object {
|
||||
key: Some(v.name.to_owned()),
|
||||
last_modified: v.mod_time.map(Timestamp::from),
|
||||
size: Some(v.size),
|
||||
size: Some(v.get_actual_size().unwrap_or_default()),
|
||||
e_tag: v.etag.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
@@ -1043,7 +1081,7 @@ impl S3 for FS {
|
||||
let mt = metadata.clone();
|
||||
let mt2 = metadata.clone();
|
||||
|
||||
let mut opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(mt))
|
||||
let mut opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, mt)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
@@ -1053,14 +1091,12 @@ impl S3 for FS {
|
||||
let dsc = must_replicate(&bucket, &key, &repoptions).await;
|
||||
// warn!("dsc {}", &dsc.replicate_any().clone());
|
||||
if dsc.replicate_any() {
|
||||
if let Some(metadata) = opts.user_defined.as_mut() {
|
||||
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-timestamp");
|
||||
let now: DateTime<Utc> = Utc::now();
|
||||
let formatted_time = now.to_rfc3339();
|
||||
metadata.insert(k, formatted_time);
|
||||
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-status");
|
||||
metadata.insert(k, dsc.pending_status());
|
||||
}
|
||||
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-timestamp");
|
||||
let now: DateTime<Utc> = Utc::now();
|
||||
let formatted_time = now.to_rfc3339();
|
||||
opts.user_defined.insert(k, formatted_time);
|
||||
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-status");
|
||||
opts.user_defined.insert(k, dsc.pending_status());
|
||||
}
|
||||
|
||||
let obj_info = store
|
||||
@@ -1121,7 +1157,7 @@ impl S3 for FS {
|
||||
);
|
||||
}
|
||||
|
||||
let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(metadata))
|
||||
let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, metadata)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
@@ -2255,11 +2291,10 @@ impl S3 for FS {
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let legal_hold = if let Some(ud) = object_info.user_defined {
|
||||
ud.get("x-amz-object-lock-legal-hold").map(|v| v.as_str().to_string())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let legal_hold = object_info
|
||||
.user_defined
|
||||
.get("x-amz-object-lock-legal-hold")
|
||||
.map(|v| v.as_str().to_string());
|
||||
|
||||
let status = if let Some(v) = legal_hold {
|
||||
v
|
||||
@@ -2356,20 +2391,16 @@ impl S3 for FS {
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let mode = if let Some(ref ud) = object_info.user_defined {
|
||||
ud.get("x-amz-object-lock-mode")
|
||||
.map(|v| ObjectLockRetentionMode::from(v.as_str().to_string()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mode = object_info
|
||||
.user_defined
|
||||
.get("x-amz-object-lock-mode")
|
||||
.map(|v| ObjectLockRetentionMode::from(v.as_str().to_string()));
|
||||
|
||||
let retain_until_date = if let Some(ref ud) = object_info.user_defined {
|
||||
ud.get("x-amz-object-lock-retain-until-date")
|
||||
.and_then(|v| OffsetDateTime::parse(v.as_str(), &Rfc3339).ok())
|
||||
.map(Timestamp::from)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let retain_until_date = object_info
|
||||
.user_defined
|
||||
.get("x-amz-object-lock-retain-until-date")
|
||||
.and_then(|v| OffsetDateTime::parse(v.as_str(), &Rfc3339).ok())
|
||||
.map(Timestamp::from);
|
||||
|
||||
Ok(S3Response::new(GetObjectRetentionOutput {
|
||||
retention: Some(ObjectLockRetention { mode, retain_until_date }),
|
||||
|
||||
@@ -14,7 +14,7 @@ pub async fn del_opts(
|
||||
object: &str,
|
||||
vid: Option<String>,
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
metadata: Option<HashMap<String, String>>,
|
||||
metadata: HashMap<String, String>,
|
||||
) -> Result<ObjectOptions> {
|
||||
let versioned = BucketVersioningSys::prefix_enabled(bucket, object).await;
|
||||
let version_suspended = BucketVersioningSys::suspended(bucket).await;
|
||||
@@ -33,7 +33,7 @@ pub async fn del_opts(
|
||||
}
|
||||
}
|
||||
|
||||
let mut opts = put_opts_from_headers(headers, metadata)
|
||||
let mut opts = put_opts_from_headers(headers, metadata.clone())
|
||||
.map_err(|err| StorageError::InvalidArgument(bucket.to_owned(), object.to_owned(), err.to_string()))?;
|
||||
|
||||
opts.version_id = {
|
||||
@@ -72,7 +72,7 @@ pub async fn get_opts(
|
||||
}
|
||||
}
|
||||
|
||||
let mut opts = get_default_opts(headers, None, false)
|
||||
let mut opts = get_default_opts(headers, HashMap::new(), false)
|
||||
.map_err(|err| StorageError::InvalidArgument(bucket.to_owned(), object.to_owned(), err.to_string()))?;
|
||||
|
||||
opts.version_id = {
|
||||
@@ -97,7 +97,7 @@ pub async fn put_opts(
|
||||
object: &str,
|
||||
vid: Option<String>,
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
metadata: Option<HashMap<String, String>>,
|
||||
metadata: HashMap<String, String>,
|
||||
) -> Result<ObjectOptions> {
|
||||
let versioned = BucketVersioningSys::prefix_enabled(bucket, object).await;
|
||||
let version_suspended = BucketVersioningSys::prefix_suspended(bucket, object).await;
|
||||
@@ -136,30 +136,27 @@ pub async fn copy_dst_opts(
|
||||
object: &str,
|
||||
vid: Option<String>,
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
metadata: Option<HashMap<String, String>>,
|
||||
metadata: HashMap<String, String>,
|
||||
) -> Result<ObjectOptions> {
|
||||
put_opts(bucket, object, vid, headers, metadata).await
|
||||
}
|
||||
|
||||
pub fn copy_src_opts(_bucket: &str, _object: &str, headers: &HeaderMap<HeaderValue>) -> Result<ObjectOptions> {
|
||||
get_default_opts(headers, None, false)
|
||||
get_default_opts(headers, HashMap::new(), false)
|
||||
}
|
||||
|
||||
pub fn put_opts_from_headers(
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
metadata: Option<HashMap<String, String>>,
|
||||
) -> Result<ObjectOptions> {
|
||||
pub fn put_opts_from_headers(headers: &HeaderMap<HeaderValue>, metadata: HashMap<String, String>) -> Result<ObjectOptions> {
|
||||
get_default_opts(headers, metadata, false)
|
||||
}
|
||||
|
||||
/// Creates default options for getting an object from a bucket.
|
||||
pub fn get_default_opts(
|
||||
_headers: &HeaderMap<HeaderValue>,
|
||||
metadata: Option<HashMap<String, String>>,
|
||||
metadata: HashMap<String, String>,
|
||||
_copy_source: bool,
|
||||
) -> Result<ObjectOptions> {
|
||||
Ok(ObjectOptions {
|
||||
user_defined: metadata.clone(),
|
||||
user_defined: metadata,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
@@ -244,13 +241,13 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_del_opts_basic() {
|
||||
let headers = create_test_headers();
|
||||
let metadata = Some(create_test_metadata());
|
||||
let metadata = create_test_metadata();
|
||||
|
||||
let result = del_opts("test-bucket", "test-object", None, &headers, metadata).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_some());
|
||||
assert!(!opts.user_defined.is_empty());
|
||||
assert_eq!(opts.version_id, None);
|
||||
}
|
||||
|
||||
@@ -258,7 +255,7 @@ mod tests {
|
||||
async fn test_del_opts_with_directory_object() {
|
||||
let headers = create_test_headers();
|
||||
|
||||
let result = del_opts("test-bucket", "test-dir/", None, &headers, None).await;
|
||||
let result = del_opts("test-bucket", "test-dir/", None, &headers, HashMap::new()).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
@@ -270,7 +267,7 @@ mod tests {
|
||||
let headers = create_test_headers();
|
||||
let valid_uuid = Uuid::new_v4().to_string();
|
||||
|
||||
let result = del_opts("test-bucket", "test-object", Some(valid_uuid.clone()), &headers, None).await;
|
||||
let result = del_opts("test-bucket", "test-object", Some(valid_uuid.clone()), &headers, HashMap::new()).await;
|
||||
|
||||
// This test may fail if versioning is not enabled for the bucket
|
||||
// In a real test environment, you would mock BucketVersioningSys
|
||||
@@ -289,7 +286,7 @@ mod tests {
|
||||
let headers = create_test_headers();
|
||||
let invalid_uuid = "invalid-uuid".to_string();
|
||||
|
||||
let result = del_opts("test-bucket", "test-object", Some(invalid_uuid), &headers, None).await;
|
||||
let result = del_opts("test-bucket", "test-object", Some(invalid_uuid), &headers, HashMap::new()).await;
|
||||
|
||||
assert!(result.is_err());
|
||||
if let Err(err) = result {
|
||||
@@ -361,13 +358,13 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_put_opts_basic() {
|
||||
let headers = create_test_headers();
|
||||
let metadata = Some(create_test_metadata());
|
||||
let metadata = create_test_metadata();
|
||||
|
||||
let result = put_opts("test-bucket", "test-object", None, &headers, metadata).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_some());
|
||||
assert!(!opts.user_defined.is_empty());
|
||||
assert_eq!(opts.version_id, None);
|
||||
}
|
||||
|
||||
@@ -375,7 +372,7 @@ mod tests {
|
||||
async fn test_put_opts_with_directory_object() {
|
||||
let headers = create_test_headers();
|
||||
|
||||
let result = put_opts("test-bucket", "test-dir/", None, &headers, None).await;
|
||||
let result = put_opts("test-bucket", "test-dir/", None, &headers, HashMap::new()).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
@@ -387,7 +384,7 @@ mod tests {
|
||||
let headers = create_test_headers();
|
||||
let invalid_uuid = "invalid-uuid".to_string();
|
||||
|
||||
let result = put_opts("test-bucket", "test-object", Some(invalid_uuid), &headers, None).await;
|
||||
let result = put_opts("test-bucket", "test-object", Some(invalid_uuid), &headers, HashMap::new()).await;
|
||||
|
||||
assert!(result.is_err());
|
||||
if let Err(err) = result {
|
||||
@@ -405,13 +402,13 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_copy_dst_opts() {
|
||||
let headers = create_test_headers();
|
||||
let metadata = Some(create_test_metadata());
|
||||
let metadata = create_test_metadata();
|
||||
|
||||
let result = copy_dst_opts("test-bucket", "test-object", None, &headers, metadata).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_some());
|
||||
assert!(!opts.user_defined.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -422,20 +419,20 @@ mod tests {
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_none());
|
||||
assert!(opts.user_defined.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_put_opts_from_headers() {
|
||||
let headers = create_test_headers();
|
||||
let metadata = Some(create_test_metadata());
|
||||
let metadata = create_test_metadata();
|
||||
|
||||
let result = put_opts_from_headers(&headers, metadata);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_some());
|
||||
let user_defined = opts.user_defined.unwrap();
|
||||
assert!(!opts.user_defined.is_empty());
|
||||
let user_defined = opts.user_defined;
|
||||
assert_eq!(user_defined.get("key1"), Some(&"value1".to_string()));
|
||||
assert_eq!(user_defined.get("key2"), Some(&"value2".to_string()));
|
||||
}
|
||||
@@ -443,14 +440,14 @@ mod tests {
|
||||
#[test]
|
||||
fn test_get_default_opts_with_metadata() {
|
||||
let headers = create_test_headers();
|
||||
let metadata = Some(create_test_metadata());
|
||||
let metadata = create_test_metadata();
|
||||
|
||||
let result = get_default_opts(&headers, metadata, false);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_some());
|
||||
let user_defined = opts.user_defined.unwrap();
|
||||
assert!(!opts.user_defined.is_empty());
|
||||
let user_defined = opts.user_defined;
|
||||
assert_eq!(user_defined.get("key1"), Some(&"value1".to_string()));
|
||||
assert_eq!(user_defined.get("key2"), Some(&"value2".to_string()));
|
||||
}
|
||||
@@ -459,11 +456,11 @@ mod tests {
|
||||
fn test_get_default_opts_without_metadata() {
|
||||
let headers = create_test_headers();
|
||||
|
||||
let result = get_default_opts(&headers, None, false);
|
||||
let result = get_default_opts(&headers, HashMap::new(), false);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let opts = result.unwrap();
|
||||
assert!(opts.user_defined.is_none());
|
||||
assert!(opts.user_defined.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -19,7 +19,7 @@ mkdir -p ./target/volume/test{0..4}
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
export RUST_BACKTRACE=1
|
||||
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,iam=debug"
|
||||
export RUST_LOG="rustfs=info,ecstore=info,s3s=debug,iam=info"
|
||||
fi
|
||||
|
||||
# export RUSTFS_ERASURE_SET_DRIVE_COUNT=5
|
||||
|
||||
Reference in New Issue
Block a user