refactor(app): migrate delete-objects and listing orchestration (#1933)

This commit is contained in:
安正超
2026-02-24 20:09:01 +08:00
committed by GitHub
parent c10084867a
commit c692777ead
3 changed files with 548 additions and 483 deletions

View File

@@ -1590,6 +1590,152 @@ impl DefaultBucketUsecase {
// let output = ListObjectsV2Output { ..Default::default() };
Ok(S3Response::new(output))
}
pub async fn execute_list_object_versions(
&self,
req: S3Request<ListObjectVersionsInput>,
) -> S3Result<S3Response<ListObjectVersionsOutput>> {
if let Some(context) = &self.context {
let _ = context.object_store();
}
let ListObjectVersionsInput {
bucket,
delimiter,
key_marker,
version_id_marker,
max_keys,
prefix,
..
} = req.input;
let prefix = prefix.unwrap_or_default();
let max_keys = max_keys.unwrap_or(1000);
let key_marker = key_marker.filter(|v| !v.is_empty());
let version_id_marker = version_id_marker.filter(|v| !v.is_empty());
let delimiter = delimiter.filter(|v| !v.is_empty());
let store = get_validated_store(&bucket).await?;
let object_infos = store
.list_object_versions(&bucket, &prefix, key_marker, version_id_marker, delimiter.clone(), max_keys)
.await
.map_err(ApiError::from)?;
let objects: Vec<ObjectVersion> = object_infos
.objects
.iter()
.filter(|v| !v.name.is_empty() && !v.delete_marker)
.map(|v| ObjectVersion {
key: Some(v.name.to_owned()),
last_modified: v.mod_time.map(Timestamp::from),
size: Some(v.size),
version_id: Some(v.version_id.map(|v| v.to_string()).unwrap_or_else(|| "null".to_string())),
is_latest: Some(v.is_latest),
e_tag: v.etag.clone().map(|etag| to_s3s_etag(&etag)),
storage_class: v.storage_class.clone().map(ObjectVersionStorageClass::from),
..Default::default()
})
.collect();
let common_prefixes = object_infos
.prefixes
.into_iter()
.map(|v| CommonPrefix { prefix: Some(v) })
.collect();
let delete_markers = object_infos
.objects
.iter()
.filter(|o| o.delete_marker)
.map(|o| DeleteMarkerEntry {
key: Some(o.name.clone()),
version_id: Some(o.version_id.map(|v| v.to_string()).unwrap_or_else(|| "null".to_string())),
is_latest: Some(o.is_latest),
last_modified: o.mod_time.map(Timestamp::from),
..Default::default()
})
.collect::<Vec<_>>();
let next_key_marker = object_infos.next_marker.filter(|v| !v.is_empty());
let next_version_id_marker = object_infos.next_version_idmarker.filter(|v| !v.is_empty());
let output = ListObjectVersionsOutput {
is_truncated: Some(object_infos.is_truncated),
max_keys: Some(max_keys),
delimiter,
name: Some(bucket),
prefix: Some(prefix),
common_prefixes: Some(common_prefixes),
versions: Some(objects),
delete_markers: Some(delete_markers),
next_key_marker,
next_version_id_marker,
..Default::default()
};
Ok(S3Response::new(output))
}
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
if let Some(context) = &self.context {
let _ = context.object_store();
}
let request_marker = req.input.marker.clone();
let v2_resp = self.execute_list_objects_v2(req.map_input(Into::into)).await?;
Ok(v2_resp.map_output(|v2| {
let next_marker = if v2.is_truncated.unwrap_or(false) {
let last_key = v2
.contents
.as_ref()
.and_then(|contents| contents.last())
.and_then(|obj| obj.key.as_ref())
.cloned();
let last_prefix = v2
.common_prefixes
.as_ref()
.and_then(|prefixes| prefixes.last())
.and_then(|prefix| prefix.prefix.as_ref())
.cloned();
match (last_key, last_prefix) {
(Some(k), Some(p)) => {
if k > p {
Some(k)
} else {
Some(p)
}
}
(Some(k), None) => Some(k),
(None, Some(p)) => Some(p),
(None, None) => None,
}
} else {
None
};
let marker = Some(request_marker.unwrap_or_default());
ListObjectsOutput {
contents: v2.contents,
delimiter: v2.delimiter,
encoding_type: v2.encoding_type,
name: v2.name,
prefix: v2.prefix,
max_keys: v2.max_keys,
common_prefixes: v2.common_prefixes,
is_truncated: v2.is_truncated,
marker,
next_marker,
..Default::default()
}
}))
}
}
#[async_trait::async_trait]
@@ -1850,6 +1996,31 @@ mod tests {
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_list_object_versions_returns_internal_error_when_store_uninitialized() {
let input = ListObjectVersionsInput::builder()
.bucket("test-bucket".to_string())
.build()
.unwrap();
let req = build_request(input, Method::GET);
let usecase = DefaultBucketUsecase::without_context();
let err = usecase.execute_list_object_versions(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_list_objects_returns_internal_error_when_store_uninitialized() {
let input = ListObjectsInput::builder().bucket("test-bucket".to_string()).build().unwrap();
let req = build_request(input, Method::GET);
let usecase = DefaultBucketUsecase::without_context();
let err = usecase.execute_list_objects(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_put_bucket_lifecycle_configuration_rejects_missing_configuration() {
let input = PutBucketLifecycleConfigurationInput::builder()

View File

@@ -50,7 +50,7 @@ use rustfs_ecstore::bucket::{
object_lock::objectlock_sys::{BucketObjectLockSys, check_object_lock_for_deletion, check_retention_for_modification},
quota::QuotaOperation,
replication::{
DeletedObjectReplicationInfo, get_must_replicate_options, must_replicate, schedule_replication,
DeletedObjectReplicationInfo, check_replicate_delete, get_must_replicate_options, must_replicate, schedule_replication,
schedule_replication_delete,
},
tagging::{decode_tags, encode_tags},
@@ -60,14 +60,18 @@ use rustfs_ecstore::bucket::{
};
use rustfs_ecstore::client::object_api_utils::to_s3s_etag;
use rustfs_ecstore::compress::{MIN_COMPRESSIBLE_SIZE, is_compressible};
use rustfs_ecstore::disk::{error::DiskError, error_reduce::is_all_buckets_not_found};
use rustfs_ecstore::error::{StorageError, is_err_bucket_not_found, is_err_object_not_found, is_err_version_not_found};
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::is_valid_storage_class;
use rustfs_ecstore::store_api::{BucketOptions, HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader};
use rustfs_ecstore::store_api::{
BucketOptions, HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PutObjReader,
};
use rustfs_filemeta::{
REPLICATE_INCOMING_DELETE, ReplicationStatusType, ReplicationType, RestoreStatusOps, VersionPurgeStatusType,
parse_restore_obj_status,
};
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_policy::policy::action::{Action, S3Action};
use rustfs_rio::{CompressReader, DecryptReader, EncryptReader, EtagReader, HardLimitReader, HashReader, Reader, WarpReader};
use rustfs_s3select_api::{
@@ -76,7 +80,6 @@ use rustfs_s3select_api::{
};
use rustfs_s3select_query::get_global_db;
use rustfs_targets::EventName;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::{
AMZ_BUCKET_REPLICATION_STATUS, AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE, RESERVED_METADATA_PREFIX,
headers::{
@@ -86,6 +89,9 @@ use rustfs_utils::http::{
},
};
use rustfs_utils::path::{is_dir_object, path_join_buf};
use rustfs_utils::{
CompressionAlgorithm, extract_params_header, extract_resp_elements, get_request_host, get_request_user_agent,
};
use s3s::dto::*;
use s3s::header::{X_AMZ_RESTORE, X_AMZ_RESTORE_OUTPUT_PATH};
use s3s::{S3Error, S3ErrorCode, S3Request, S3Response, S3Result, s3_error};
@@ -103,6 +109,21 @@ use uuid::Uuid;
pub type ObjectUsecaseResult<T> = Result<T, ApiError>;
fn normalize_delete_objects_version_id(version_id: Option<String>) -> Result<(Option<String>, Option<Uuid>), String> {
let version_id = version_id.map(|v| v.trim().to_string()).filter(|v| !v.is_empty());
match version_id {
Some(id) => {
if id.eq_ignore_ascii_case("null") {
Ok((Some("null".to_string()), Some(Uuid::nil())))
} else {
let uuid = Uuid::parse_str(&id).map_err(|e| e.to_string())?;
Ok((Some(id), Some(uuid)))
}
}
None => Ok((None, None)),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PutObjectRequest {
pub bucket: String,
@@ -2357,6 +2378,308 @@ impl DefaultObjectUsecase {
result
}
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_delete_objects(
&self,
mut req: S3Request<DeleteObjectsInput>,
) -> S3Result<S3Response<DeleteObjectsOutput>> {
if let Some(context) = &self.context {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObjects").suppress_event();
let (bucket, delete) = {
let bucket = req.input.bucket.clone();
let delete = req.input.delete.clone();
(bucket, delete)
};
if delete.objects.is_empty() || delete.objects.len() > 1000 {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"No objects to delete or too many objects to delete".to_string(),
));
}
let replicate_deletes = has_replication_rules(
&bucket,
&delete
.objects
.iter()
.map(|v| ObjectToDelete {
object_name: v.key.clone(),
..Default::default()
})
.collect::<Vec<ObjectToDelete>>(),
)
.await;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let version_cfg = BucketVersioningSys::get(&bucket).await.unwrap_or_default();
let bypass_governance = has_bypass_governance_header(&req.headers);
#[derive(Default, Clone)]
struct DeleteResult {
delete_object: Option<rustfs_ecstore::store_api::DeletedObject>,
error: Option<Error>,
}
let mut delete_results = vec![DeleteResult::default(); delete.objects.len()];
let mut object_to_delete = Vec::new();
let mut object_to_delete_index = HashMap::new();
let mut object_sizes = HashMap::new();
for (idx, obj_id) in delete.objects.iter().enumerate() {
let raw_version_id = obj_id.version_id.clone();
let (version_id, version_uuid) = match normalize_delete_objects_version_id(raw_version_id.clone()) {
Ok(parsed) => parsed,
Err(err) => {
delete_results[idx].error = Some(Error {
code: Some("NoSuchVersion".to_string()),
key: Some(obj_id.key.clone()),
message: Some(err),
version_id: raw_version_id,
});
continue;
}
};
{
let req_info = req.extensions.get_mut::<ReqInfo>().expect("ReqInfo not found");
req_info.bucket = Some(bucket.clone());
req_info.object = Some(obj_id.key.clone());
req_info.version_id = version_id.clone();
}
let auth_res = authorize_request(&mut req, Action::S3Action(S3Action::DeleteObjectAction)).await;
if let Err(e) = auth_res {
delete_results[idx].error = Some(Error {
code: Some("AccessDenied".to_string()),
key: Some(obj_id.key.clone()),
message: Some(e.to_string()),
version_id: version_id.clone(),
});
continue;
}
let mut object = ObjectToDelete {
object_name: obj_id.key.clone(),
version_id: version_uuid,
..Default::default()
};
let metadata = extract_metadata(&req.headers);
let opts: ObjectOptions = del_opts(
&bucket,
&object.object_name,
object.version_id.map(|f| f.to_string()),
&req.headers,
metadata,
)
.await
.map_err(ApiError::from)?;
let (goi, gerr) = match store.get_object_info(&bucket, &object.object_name, &opts).await {
Ok(res) => (res, None),
Err(e) => (ObjectInfo::default(), Some(e.to_string())),
};
if gerr.is_none()
&& let Some(block_reason) = check_object_lock_for_deletion(&bucket, &goi, bypass_governance).await
{
delete_results[idx].error = Some(Error {
code: Some("AccessDenied".to_string()),
key: Some(obj_id.key.clone()),
message: Some(block_reason.error_message()),
version_id: version_id.clone(),
});
continue;
}
object_sizes.insert(object.object_name.clone(), goi.size);
if is_dir_object(&object.object_name) && object.version_id.is_none() {
object.version_id = Some(Uuid::nil());
}
if replicate_deletes {
let dsc = check_replicate_delete(
&bucket,
&ObjectToDelete {
object_name: object.object_name.clone(),
version_id: object.version_id,
..Default::default()
},
&goi,
&opts,
gerr.clone(),
)
.await;
if dsc.replicate_any() {
if object.version_id.is_some() {
object.version_purge_status = Some(VersionPurgeStatusType::Pending);
object.version_purge_statuses = dsc.pending_status();
} else {
object.delete_marker_replication_status = dsc.pending_status();
}
object.replicate_decision_str = Some(dsc.to_string());
}
}
object_to_delete_index.insert(object.object_name.clone(), idx);
object_to_delete.push(object);
}
let (mut dobjs, errs) = store
.delete_objects(
&bucket,
object_to_delete.clone(),
ObjectOptions {
version_suspended: version_cfg.suspended(),
..Default::default()
},
)
.await;
let manager = get_concurrency_manager();
let bucket_clone = bucket.clone();
let deleted_objects = dobjs.clone();
tokio::spawn(async move {
for dobj in deleted_objects {
manager
.invalidate_cache_versioned(
&bucket_clone,
&dobj.object_name,
dobj.version_id.map(|v| v.to_string()).as_deref(),
)
.await;
}
});
if is_all_buckets_not_found(
&errs
.iter()
.map(|v| v.as_ref().map(|v| v.clone().into()))
.collect::<Vec<Option<DiskError>>>() as &[Option<DiskError>],
) {
let result = Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string()));
let _ = helper.complete(&result);
return result;
}
for (i, err) in errs.iter().enumerate() {
let obj = dobjs[i].clone();
let Some(didx) = object_to_delete_index.get(&obj.object_name) else {
continue;
};
if err.is_none()
|| err
.clone()
.is_some_and(|v| is_err_object_not_found(&v) || is_err_version_not_found(&v))
{
if replicate_deletes {
dobjs[i].replication_state = Some(object_to_delete[i].replication_state());
}
delete_results[*didx].delete_object = Some(dobjs[i].clone());
if let Some(&size) = object_sizes.get(&obj.object_name) {
rustfs_ecstore::data_usage::decrement_bucket_usage_memory(&bucket, size as u64).await;
}
continue;
}
if let Some(err) = err.clone() {
delete_results[*didx].error = Some(Error {
code: Some(err.to_string()),
key: Some(object_to_delete[i].object_name.clone()),
message: Some(err.to_string()),
version_id: object_to_delete[i].version_id.map(|v| v.to_string()),
});
}
}
let deleted = delete_results
.iter()
.filter_map(|v| v.delete_object.clone())
.map(|v| DeletedObject {
delete_marker: { if v.delete_marker { Some(true) } else { None } },
delete_marker_version_id: v.delete_marker_version_id.map(|v| v.to_string()),
key: Some(v.object_name.clone()),
version_id: if is_dir_object(v.object_name.as_str()) && v.version_id == Some(Uuid::nil()) {
None
} else {
v.version_id.map(|v| v.to_string())
},
})
.collect();
let errors = delete_results.iter().filter_map(|v| v.error.clone()).collect::<Vec<Error>>();
let output = DeleteObjectsOutput {
deleted: Some(deleted),
errors: Some(errors),
..Default::default()
};
for dobjs in &delete_results {
if let Some(dobj) = &dobjs.delete_object
&& replicate_deletes
&& (dobj.delete_marker_replication_status() == ReplicationStatusType::Pending
|| dobj.version_purge_status() == VersionPurgeStatusType::Pending)
{
let mut dobj = dobj.clone();
if is_dir_object(dobj.object_name.as_str()) && dobj.version_id.is_none() {
dobj.version_id = Some(Uuid::nil());
}
let deleted_object = DeletedObjectReplicationInfo {
delete_object: dobj,
bucket: bucket.clone(),
event_type: REPLICATE_INCOMING_DELETE.to_string(),
..Default::default()
};
schedule_replication_delete(deleted_object).await;
}
}
let req_headers = req.headers.clone();
tokio::spawn(async move {
for res in delete_results {
if let Some(dobj) = res.delete_object {
let event_name = if dobj.delete_marker {
EventName::ObjectRemovedDeleteMarkerCreated
} else {
EventName::ObjectRemovedDelete
};
let event_args = EventArgsBuilder::new(
event_name,
bucket.clone(),
ObjectInfo {
name: dobj.object_name.clone(),
bucket: bucket.clone(),
..Default::default()
},
)
.version_id(dobj.version_id.map(|v| v.to_string()).unwrap_or_default())
.req_params(extract_params_header(&req_headers))
.resp_elements(extract_resp_elements(&S3Response::new(DeleteObjectsOutput::default())))
.host(get_request_host(&req_headers))
.user_agent(get_request_user_agent(&req_headers))
.build();
notifier_global::notify(event_args).await;
}
}
});
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_delete_object(&self, mut req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
if let Some(context) = &self.context {
@@ -3265,6 +3588,46 @@ mod tests {
assert_eq!(err.code(), &S3ErrorCode::InvalidArgument);
}
#[tokio::test]
async fn execute_delete_objects_rejects_empty_object_list() {
let input = DeleteObjectsInput::builder()
.bucket("test-bucket".to_string())
.delete(Delete {
objects: vec![],
quiet: None,
})
.build()
.unwrap();
let req = build_request(input, Method::POST);
let usecase = DefaultObjectUsecase::without_context();
let err = usecase.execute_delete_objects(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InvalidArgument);
}
#[tokio::test]
async fn execute_delete_objects_returns_internal_error_when_store_uninitialized() {
let input = DeleteObjectsInput::builder()
.bucket("test-bucket".to_string())
.delete(Delete {
objects: vec![ObjectIdentifier {
key: "test-key".to_string(),
version_id: None,
..Default::default()
}],
quiet: None,
})
.build()
.unwrap();
let req = build_request(input, Method::POST);
let usecase = DefaultObjectUsecase::without_context();
let err = usecase.execute_delete_objects(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_delete_object_tagging_returns_internal_error_when_store_uninitialized() {
let input = DeleteObjectTaggingInput::builder()

View File

@@ -17,49 +17,31 @@ use crate::app::multipart_usecase::DefaultMultipartUsecase;
use crate::app::object_usecase::DefaultObjectUsecase;
use crate::error::ApiError;
use crate::storage::concurrency::get_concurrency_manager;
use crate::storage::get_buffer_size_opt_in;
use crate::storage::helper::OperationHelper;
use crate::storage::options::get_content_sha256;
use crate::storage::{
access::{ReqInfo, authorize_request, has_bypass_governance_header},
options::{del_opts, extract_metadata},
};
use crate::storage::{get_buffer_size_opt_in, get_validated_store, has_replication_rules};
use futures::StreamExt;
use http::HeaderMap;
use rustfs_ecstore::{
bucket::{
object_lock::objectlock_sys::check_object_lock_for_deletion,
replication::{DeletedObjectReplicationInfo, check_replicate_delete, schedule_replication_delete},
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
},
client::object_api_utils::to_s3s_etag,
compress::{MIN_COMPRESSIBLE_SIZE, is_compressible},
disk::{error::DiskError, error_reduce::is_all_buckets_not_found},
error::{StorageError, is_err_object_not_found, is_err_version_not_found},
error::StorageError,
new_object_layer_fn,
store_api::{
ObjectIO,
ObjectInfo,
ObjectOptions,
ObjectToDelete,
PutObjReader,
StorageAPI,
// RESERVED_METADATA_PREFIX,
},
};
use rustfs_filemeta::REPLICATE_INCOMING_DELETE;
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
use rustfs_kms::DataKey;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_policy::policy::action::{Action, S3Action};
use rustfs_notify::notifier_global;
use rustfs_rio::{CompressReader, HashReader, Reader, WarpReader};
use rustfs_targets::EventName;
use rustfs_utils::{
CompressionAlgorithm, extract_params_header, extract_resp_elements, get_request_host, get_request_port,
get_request_user_agent,
http::headers::{AMZ_DECODED_CONTENT_LENGTH, RESERVED_METADATA_PREFIX_LOWER},
path::is_dir_object,
};
use rustfs_zip::CompressionFormat;
use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, dto::*, s3_error};
@@ -884,6 +866,7 @@ impl FS {
result
}
#[cfg(test)]
pub(crate) fn normalize_delete_objects_version_id(
&self,
version_id: Option<String>,
@@ -1050,324 +1033,9 @@ impl S3 for FS {
/// Delete multiple objects
#[instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, mut req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObjects").suppress_event();
let (bucket, delete) = {
let bucket = req.input.bucket.clone();
let delete = req.input.delete.clone();
(bucket, delete)
};
if delete.objects.is_empty() || delete.objects.len() > 1000 {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"No objects to delete or too many objects to delete".to_string(),
));
}
let replicate_deletes = has_replication_rules(
&bucket,
&delete
.objects
.iter()
.map(|v| ObjectToDelete {
object_name: v.key.clone(),
..Default::default()
})
.collect::<Vec<ObjectToDelete>>(),
)
.await;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let version_cfg = BucketVersioningSys::get(&bucket).await.unwrap_or_default();
// Check for bypass governance retention header (permission already verified in access.rs)
let bypass_governance = has_bypass_governance_header(&req.headers);
#[derive(Default, Clone)]
struct DeleteResult {
delete_object: Option<rustfs_ecstore::store_api::DeletedObject>,
error: Option<Error>,
}
let mut delete_results = vec![DeleteResult::default(); delete.objects.len()];
let mut object_to_delete = Vec::new();
let mut object_to_delete_index = HashMap::new();
let mut object_sizes = HashMap::new();
for (idx, obj_id) in delete.objects.iter().enumerate() {
let raw_version_id = obj_id.version_id.clone();
let (version_id, version_uuid) = match self.normalize_delete_objects_version_id(raw_version_id.clone()) {
Ok(parsed) => parsed,
Err(err) => {
delete_results[idx].error = Some(Error {
code: Some("NoSuchVersion".to_string()),
key: Some(obj_id.key.clone()),
message: Some(err),
version_id: raw_version_id,
});
continue;
}
};
{
let req_info = req.extensions.get_mut::<ReqInfo>().expect("ReqInfo not found");
req_info.bucket = Some(bucket.clone());
req_info.object = Some(obj_id.key.clone());
req_info.version_id = version_id.clone();
}
let auth_res = authorize_request(&mut req, Action::S3Action(S3Action::DeleteObjectAction)).await;
if let Err(e) = auth_res {
delete_results[idx].error = Some(Error {
code: Some("AccessDenied".to_string()),
key: Some(obj_id.key.clone()),
message: Some(e.to_string()),
version_id: version_id.clone(),
});
continue;
}
let mut object = ObjectToDelete {
object_name: obj_id.key.clone(),
version_id: version_uuid,
..Default::default()
};
let metadata = extract_metadata(&req.headers);
let opts: ObjectOptions = del_opts(
&bucket,
&object.object_name,
object.version_id.map(|f| f.to_string()),
&req.headers,
metadata,
)
.await
.map_err(ApiError::from)?;
// Get object info to collect size for quota tracking
let (goi, gerr) = match store.get_object_info(&bucket, &object.object_name, &opts).await {
Ok(res) => (res, None),
Err(e) => (ObjectInfo::default(), Some(e.to_string())),
};
// Check Object Lock retention before deletion
// NOTE: Unlike single DeleteObject, this reuses the get_object_info result from quota
// tracking above, so no additional storage operation is required for the retention check.
if gerr.is_none()
&& let Some(block_reason) = check_object_lock_for_deletion(&bucket, &goi, bypass_governance).await
{
delete_results[idx].error = Some(Error {
code: Some("AccessDenied".to_string()),
key: Some(obj_id.key.clone()),
message: Some(block_reason.error_message()),
version_id: version_id.clone(),
});
continue;
}
// Store object size for quota tracking
object_sizes.insert(object.object_name.clone(), goi.size);
if is_dir_object(&object.object_name) && object.version_id.is_none() {
object.version_id = Some(Uuid::nil());
}
if replicate_deletes {
let dsc = check_replicate_delete(
&bucket,
&ObjectToDelete {
object_name: object.object_name.clone(),
version_id: object.version_id,
..Default::default()
},
&goi,
&opts,
gerr.clone(),
)
.await;
if dsc.replicate_any() {
if object.version_id.is_some() {
object.version_purge_status = Some(VersionPurgeStatusType::Pending);
object.version_purge_statuses = dsc.pending_status();
} else {
object.delete_marker_replication_status = dsc.pending_status();
}
object.replicate_decision_str = Some(dsc.to_string());
}
}
// TODO: Retention
object_to_delete_index.insert(object.object_name.clone(), idx);
object_to_delete.push(object);
}
let (mut dobjs, errs) = {
store
.delete_objects(
&bucket,
object_to_delete.clone(),
ObjectOptions {
version_suspended: version_cfg.suspended(),
..Default::default()
},
)
.await
};
// Invalidate cache for successfully deleted objects
let manager = get_concurrency_manager();
let bucket_clone = bucket.clone();
let deleted_objects = dobjs.clone();
tokio::spawn(async move {
for dobj in deleted_objects {
manager
.invalidate_cache_versioned(
&bucket_clone,
&dobj.object_name,
dobj.version_id.map(|v| v.to_string()).as_deref(),
)
.await;
}
});
if is_all_buckets_not_found(
&errs
.iter()
.map(|v| v.as_ref().map(|v| v.clone().into()))
.collect::<Vec<Option<DiskError>>>() as &[Option<DiskError>],
) {
let result = Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string()));
let _ = helper.complete(&result);
return result;
}
for (i, err) in errs.iter().enumerate() {
let obj = dobjs[i].clone();
// let replication_state = obj.replication_state.clone().unwrap_or_default();
// let obj_to_del = ObjectToDelete {
// object_name: decode_dir_object(dobjs[i].object_name.as_str()),
// version_id: obj.version_id,
// delete_marker_replication_status: replication_state.replication_status_internal.clone(),
// version_purge_status: Some(obj.version_purge_status()),
// version_purge_statuses: replication_state.version_purge_status_internal.clone(),
// replicate_decision_str: Some(replication_state.replicate_decision_str.clone()),
// };
let Some(didx) = object_to_delete_index.get(&obj.object_name) else {
continue;
};
if err.is_none()
|| err
.clone()
.is_some_and(|v| is_err_object_not_found(&v) || is_err_version_not_found(&v))
{
if replicate_deletes {
dobjs[i].replication_state = Some(object_to_delete[i].replication_state());
}
delete_results[*didx].delete_object = Some(dobjs[i].clone());
// Update quota tracking for successfully deleted objects
if let Some(&size) = object_sizes.get(&obj.object_name) {
rustfs_ecstore::data_usage::decrement_bucket_usage_memory(&bucket, size as u64).await;
}
continue;
}
if let Some(err) = err.clone() {
delete_results[*didx].error = Some(Error {
code: Some(err.to_string()),
key: Some(object_to_delete[i].object_name.clone()),
message: Some(err.to_string()),
version_id: object_to_delete[i].version_id.map(|v| v.to_string()),
});
}
}
let deleted = delete_results
.iter()
.filter_map(|v| v.delete_object.clone())
.map(|v| DeletedObject {
delete_marker: { if v.delete_marker { Some(true) } else { None } },
delete_marker_version_id: v.delete_marker_version_id.map(|v| v.to_string()),
key: Some(v.object_name.clone()),
version_id: if is_dir_object(v.object_name.as_str()) && v.version_id == Some(Uuid::nil()) {
None
} else {
v.version_id.map(|v| v.to_string())
},
})
.collect();
let errors = delete_results.iter().filter_map(|v| v.error.clone()).collect::<Vec<Error>>();
let output = DeleteObjectsOutput {
deleted: Some(deleted),
errors: Some(errors),
..Default::default()
};
for dobjs in delete_results.iter() {
if let Some(dobj) = &dobjs.delete_object
&& replicate_deletes
&& (dobj.delete_marker_replication_status() == ReplicationStatusType::Pending
|| dobj.version_purge_status() == VersionPurgeStatusType::Pending)
{
let mut dobj = dobj.clone();
if is_dir_object(dobj.object_name.as_str()) && dobj.version_id.is_none() {
dobj.version_id = Some(Uuid::nil());
}
let deleted_object = DeletedObjectReplicationInfo {
delete_object: dobj,
bucket: bucket.clone(),
event_type: REPLICATE_INCOMING_DELETE.to_string(),
..Default::default()
};
schedule_replication_delete(deleted_object).await;
}
}
let req_headers = req.headers.clone();
tokio::spawn(async move {
for res in delete_results {
if let Some(dobj) = res.delete_object {
let event_name = if dobj.delete_marker {
EventName::ObjectRemovedDeleteMarkerCreated
} else {
EventName::ObjectRemovedDelete
};
let event_args = EventArgsBuilder::new(
event_name,
bucket.clone(),
ObjectInfo {
name: dobj.object_name.clone(),
bucket: bucket.clone(),
..Default::default()
},
)
.version_id(dobj.version_id.map(|v| v.to_string()).unwrap_or_default())
.req_params(extract_params_header(&req_headers))
.resp_elements(extract_resp_elements(&S3Response::new(DeleteObjectsOutput::default())))
.host(get_request_host(&req_headers))
.user_agent(get_request_user_agent(&req_headers))
.build();
notifier_global::notify(event_args).await;
}
}
});
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_delete_objects(req).await
}
async fn get_bucket_acl(&self, req: S3Request<GetBucketAclInput>) -> S3Result<S3Response<GetBucketAclOutput>> {
@@ -1551,151 +1219,14 @@ impl S3 for FS {
&self,
req: S3Request<ListObjectVersionsInput>,
) -> S3Result<S3Response<ListObjectVersionsOutput>> {
let ListObjectVersionsInput {
bucket,
delimiter,
key_marker,
version_id_marker,
max_keys,
prefix,
..
} = req.input;
let prefix = prefix.unwrap_or_default();
let max_keys = max_keys.unwrap_or(1000);
let key_marker = key_marker.filter(|v| !v.is_empty());
let version_id_marker = version_id_marker.filter(|v| !v.is_empty());
let delimiter = delimiter.filter(|v| !v.is_empty());
let store = get_validated_store(&bucket).await?;
let object_infos = store
.list_object_versions(&bucket, &prefix, key_marker, version_id_marker, delimiter.clone(), max_keys)
.await
.map_err(ApiError::from)?;
let objects: Vec<ObjectVersion> = object_infos
.objects
.iter()
.filter(|v| !v.name.is_empty() && !v.delete_marker)
.map(|v| {
ObjectVersion {
key: Some(v.name.to_owned()),
last_modified: v.mod_time.map(Timestamp::from),
size: Some(v.size),
version_id: Some(v.version_id.map(|v| v.to_string()).unwrap_or_else(|| "null".to_string())),
is_latest: Some(v.is_latest),
e_tag: v.etag.clone().map(|etag| to_s3s_etag(&etag)),
storage_class: v.storage_class.clone().map(ObjectVersionStorageClass::from),
..Default::default() // TODO: another fields
}
})
.collect();
let common_prefixes = object_infos
.prefixes
.into_iter()
.map(|v| CommonPrefix { prefix: Some(v) })
.collect();
let delete_markers = object_infos
.objects
.iter()
.filter(|o| o.delete_marker)
.map(|o| DeleteMarkerEntry {
key: Some(o.name.clone()),
version_id: Some(o.version_id.map(|v| v.to_string()).unwrap_or_else(|| "null".to_string())),
is_latest: Some(o.is_latest),
last_modified: o.mod_time.map(Timestamp::from),
..Default::default()
})
.collect::<Vec<_>>();
// Only set next_key_marker and next_version_id_marker if they have values, per AWS S3 API spec
// boto3 expects them to be strings or omitted, not None or empty strings
let next_key_marker = object_infos.next_marker.filter(|v| !v.is_empty());
let next_version_id_marker = object_infos.next_version_idmarker.filter(|v| !v.is_empty());
let output = ListObjectVersionsOutput {
is_truncated: Some(object_infos.is_truncated),
// max_keys should be the requested maximum number of keys, not the actual count returned
// Per AWS S3 API spec, this field represents the maximum number of keys that can be returned in the response
max_keys: Some(max_keys),
delimiter,
name: Some(bucket),
prefix: Some(prefix),
common_prefixes: Some(common_prefixes),
versions: Some(objects),
delete_markers: Some(delete_markers),
next_key_marker,
next_version_id_marker,
..Default::default()
};
Ok(S3Response::new(output))
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_list_object_versions(req).await
}
#[instrument(level = "debug", skip(self, req))]
async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
// Capture the original marker from the request before conversion
// S3 API requires the marker field to be echoed back in the response
let request_marker = req.input.marker.clone();
let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
Ok(v2_resp.map_output(|v2| {
// For ListObjects (v1) API, NextMarker should be the last item returned when truncated
// When both Contents and CommonPrefixes are present, NextMarker should be the
// lexicographically last item (either last key or last prefix)
let next_marker = if v2.is_truncated.unwrap_or(false) {
let last_key = v2
.contents
.as_ref()
.and_then(|contents| contents.last())
.and_then(|obj| obj.key.as_ref())
.cloned();
let last_prefix = v2
.common_prefixes
.as_ref()
.and_then(|prefixes| prefixes.last())
.and_then(|prefix| prefix.prefix.as_ref())
.cloned();
// NextMarker should be the lexicographically last item
// This matches S3 standard behavior
match (last_key, last_prefix) {
(Some(k), Some(p)) => {
// Return the lexicographically greater one
if k > p { Some(k) } else { Some(p) }
}
(Some(k), None) => Some(k),
(None, Some(p)) => Some(p),
(None, None) => None,
}
} else {
None
};
// S3 API requires marker field in response, echoing back the request marker
// If no marker was provided in request, return empty string per S3 standard
let marker = Some(request_marker.unwrap_or_default());
ListObjectsOutput {
contents: v2.contents,
delimiter: v2.delimiter,
encoding_type: v2.encoding_type,
name: v2.name,
prefix: v2.prefix,
max_keys: v2.max_keys,
common_prefixes: v2.common_prefixes,
is_truncated: v2.is_truncated,
marker,
next_marker,
..Default::default()
}
}))
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_list_objects(req).await
}
#[instrument(level = "debug", skip(self, req))]