refactor(app): migrate create/delete/head bucket flows (#1913)

This commit is contained in:
安正超
2026-02-23 11:05:56 +08:00
committed by GitHub
parent d9c97c5c52
commit d1768aa1c3
2 changed files with 194 additions and 116 deletions

View File

@@ -17,10 +17,20 @@
use crate::app::context::{AppContext, get_global_app_context};
use crate::error::ApiError;
use crate::storage::access::authorize_request;
use crate::storage::ecfs::{default_owner, serialize_acl, stored_acl_from_canned_bucket, stored_acl_from_grant_headers};
use crate::storage::helper::OperationHelper;
use crate::storage::*;
use metrics::counter;
use rustfs_ecstore::bucket::{metadata::BUCKET_ACL_CONFIG, metadata_sys};
use rustfs_ecstore::client::object_api_utils::to_s3s_etag;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::store_api::{BucketOptions, DeleteBucketOptions, MakeBucketOptions, StorageAPI};
use rustfs_policy::policy::action::{Action, S3Action};
use rustfs_targets::EventName;
use rustfs_utils::http::RUSTFS_FORCE_DELETE;
use rustfs_utils::string::parse_bool;
use s3s::dto::*;
use s3s::{S3Error, S3ErrorCode, S3Request, S3Response, S3Result, s3_error};
use std::sync::Arc;
@@ -117,6 +127,143 @@ impl DefaultBucketUsecase {
self.context.clone()
}
#[instrument(
level = "debug",
skip(self, req),
fields(start_time=?time::OffsetDateTime::now_utc())
)]
pub async fn execute_create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
if let Some(context) = &self.context {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::BucketCreated, "s3:CreateBucket");
let CreateBucketInput {
bucket,
acl,
grant_full_control,
grant_read,
grant_read_acp,
grant_write,
grant_write_acp,
object_lock_enabled_for_bucket,
..
} = req.input;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
counter!("rustfs_create_bucket_total").increment(1);
store
.make_bucket(
&bucket,
&MakeBucketOptions {
force_create: false, // TODO: force support
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
..Default::default()
},
)
.await
.map_err(ApiError::from)?;
let owner = default_owner();
let mut stored_acl = stored_acl_from_grant_headers(
&owner,
grant_read.map(|v| v.to_string()),
grant_write.map(|v| v.to_string()),
grant_read_acp.map(|v| v.to_string()),
grant_write_acp.map(|v| v.to_string()),
grant_full_control.map(|v| v.to_string()),
)?;
if stored_acl.is_none()
&& let Some(canned) = acl
{
stored_acl = Some(stored_acl_from_canned_bucket(canned.as_str(), &owner));
}
let stored_acl = stored_acl.unwrap_or_else(|| stored_acl_from_canned_bucket(BucketCannedACL::PRIVATE, &owner));
let data = serialize_acl(&stored_acl)?;
metadata_sys::update(&bucket, BUCKET_ACL_CONFIG, data)
.await
.map_err(ApiError::from)?;
let output = CreateBucketOutput::default();
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_delete_bucket(&self, mut req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
if let Some(context) = &self.context {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::BucketRemoved, "s3:DeleteBucket");
let input = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// get value from header, support mc style
let force_str = req
.headers
.get(RUSTFS_FORCE_DELETE)
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or(
req.headers
.get("x-minio-force-delete")
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default(),
);
let force = parse_bool(force_str).unwrap_or_default();
if force {
authorize_request(&mut req, Action::S3Action(S3Action::ForceDeleteBucketAction)).await?;
}
store
.delete_bucket(
&input.bucket,
&DeleteBucketOptions {
force,
..Default::default()
},
)
.await
.map_err(ApiError::from)?;
let result = Ok(S3Response::new(DeleteBucketOutput {}));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
if let Some(context) = &self.context {
let _ = context.object_store();
}
let input = req.input;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
store
.get_bucket_info(&input.bucket, &BucketOptions::default())
.await
.map_err(ApiError::from)?;
Ok(S3Response::new(HeadBucketOutput::default()))
}
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
if let Some(context) = &self.context {
@@ -326,6 +473,45 @@ mod tests {
}
}
#[tokio::test]
async fn execute_create_bucket_returns_internal_error_when_store_uninitialized() {
let input = CreateBucketInput::builder()
.bucket("test-bucket".to_string())
.build()
.unwrap();
let req = build_request(input, Method::PUT);
let usecase = DefaultBucketUsecase::without_context();
let err = usecase.execute_create_bucket(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_delete_bucket_returns_internal_error_when_store_uninitialized() {
let input = DeleteBucketInput::builder()
.bucket("test-bucket".to_string())
.build()
.unwrap();
let req = build_request(input, Method::DELETE);
let usecase = DefaultBucketUsecase::without_context();
let err = usecase.execute_delete_bucket(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_head_bucket_returns_internal_error_when_store_uninitialized() {
let input = HeadBucketInput::builder().bucket("test-bucket".to_string()).build().unwrap();
let req = build_request(input, Method::HEAD);
let usecase = DefaultBucketUsecase::without_context();
let err = usecase.execute_head_bucket(req).await.unwrap_err();
assert_eq!(err.code(), &S3ErrorCode::InternalError);
}
#[tokio::test]
async fn execute_list_objects_v2_rejects_negative_max_keys() {
let input = ListObjectsV2Input::builder()

View File

@@ -76,8 +76,6 @@ use rustfs_ecstore::{
store_api::{
BucketOptions,
CompletePart,
DeleteBucketOptions,
MakeBucketOptions,
MultipartUploadResult,
ObjectIO,
ObjectInfo,
@@ -104,8 +102,6 @@ use rustfs_targets::{
EventName,
arn::{ARN, TargetIDError},
};
use rustfs_utils::http::RUSTFS_FORCE_DELETE;
use rustfs_utils::string::parse_bool;
use rustfs_utils::{
CompressionAlgorithm, extract_params_header, extract_resp_elements, get_request_host, get_request_port,
get_request_user_agent,
@@ -1333,64 +1329,8 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let helper = OperationHelper::new(&req, EventName::BucketCreated, "s3:CreateBucket");
let CreateBucketInput {
bucket,
acl,
grant_full_control,
grant_read,
grant_read_acp,
grant_write,
grant_write_acp,
object_lock_enabled_for_bucket,
..
} = req.input;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
counter!("rustfs_create_bucket_total").increment(1);
store
.make_bucket(
&bucket,
&MakeBucketOptions {
force_create: false, // TODO: force support
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
..Default::default()
},
)
.await
.map_err(ApiError::from)?;
let owner = default_owner();
let mut stored_acl = stored_acl_from_grant_headers(
&owner,
grant_read.map(|v| v.to_string()),
grant_write.map(|v| v.to_string()),
grant_read_acp.map(|v| v.to_string()),
grant_write_acp.map(|v| v.to_string()),
grant_full_control.map(|v| v.to_string()),
)?;
if stored_acl.is_none()
&& let Some(canned) = acl
{
stored_acl = Some(stored_acl_from_canned_bucket(canned.as_str(), &owner));
}
let stored_acl = stored_acl.unwrap_or_else(|| stored_acl_from_canned_bucket(BucketCannedACL::PRIVATE, &owner));
let data = serialize_acl(&stored_acl)?;
metadata_sys::update(&bucket, BUCKET_ACL_CONFIG, data)
.await
.map_err(ApiError::from)?;
let output = CreateBucketOutput::default();
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_create_bucket(req).await
}
#[instrument(level = "debug", skip(self, req))]
@@ -1549,46 +1489,9 @@ impl S3 for FS {
/// Delete a bucket
#[instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, mut req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let helper = OperationHelper::new(&req, EventName::BucketRemoved, "s3:DeleteBucket");
let input = req.input.clone();
// TODO: DeleteBucketInput doesn't have force parameter?
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// get value from header, support mc style
let force_str = req
.headers
.get(RUSTFS_FORCE_DELETE)
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or(
req.headers
.get("x-minio-force-delete")
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default(),
);
let force = parse_bool(force_str).unwrap_or_default();
if force {
authorize_request(&mut req, Action::S3Action(S3Action::ForceDeleteBucketAction)).await?;
}
store
.delete_bucket(
&input.bucket,
&DeleteBucketOptions {
force,
..Default::default()
},
)
.await
.map_err(ApiError::from)?;
let result = Ok(S3Response::new(DeleteBucketOutput {}));
let _ = helper.complete(&result);
result
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket(req).await
}
#[instrument(level = "debug", skip(self))]
@@ -2842,19 +2745,8 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
let input = req.input;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
store
.get_bucket_info(&input.bucket, &BucketOptions::default())
.await
.map_err(ApiError::from)?;
// mc cp step 2 GetBucketInfo
Ok(S3Response::new(HeadBucketOutput::default()))
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_head_bucket(req).await
}
#[instrument(level = "debug", skip(self, req))]