Fix object tagging functionality issues #1415 (#1485)

This commit is contained in:
houseme
2026-01-13 01:11:50 +08:00
committed by GitHub
parent b5140f0098
commit dc76e4472e
2 changed files with 4169 additions and 4069 deletions

8068
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -39,7 +39,7 @@ use datafusion::arrow::{
};
use futures::StreamExt;
use http::{HeaderMap, StatusCode};
use metrics::counter;
use metrics::{counter, histogram};
use rustfs_ecstore::bucket::quota::checker::QuotaChecker;
use rustfs_ecstore::{
bucket::{
@@ -782,6 +782,25 @@ impl FS {
let _ = helper.complete(&result);
result
}
/// Auxiliary functions: parse version ID
///
/// # Arguments
/// * `version_id` - An optional string representing the version ID to be parsed.
///
/// # Returns
/// * `S3Result<Option<Uuid>>` - A result containing an optional UUID if parsing is successful, or an S3 error if parsing fails.
fn parse_version_id(&self, version_id: Option<String>) -> S3Result<Option<Uuid>> {
if let Some(vid) = version_id {
let uuid = Uuid::parse_str(&vid).map_err(|e| {
error!("Invalid version ID: {}", e);
s3_error!(InvalidArgument, "Invalid version ID")
})?;
Ok(Some(uuid))
} else {
Ok(None)
}
}
}
/// Helper function to get store and validate bucket exists
@@ -4670,6 +4689,7 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn put_object_tagging(&self, req: S3Request<PutObjectTaggingInput>) -> S3Result<S3Response<PutObjectTaggingOutput>> {
let start_time = std::time::Instant::now();
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutTagging, "s3:PutObjectTagging");
let PutObjectTaggingInput {
bucket,
@@ -4683,6 +4703,8 @@ impl S3 for FS {
// Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html
// Reference: https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_PutObjectTagging.html
// https://github.com/minio/mint/blob/master/run/core/aws-sdk-go-v2/main.go#L1647
error!("Tag set exceeds maximum of 10 tags: {}", tagging.tag_set.len());
return Err(s3_error!(InvalidTag, "Cannot have more than 10 tags per object"));
}
let Some(store) = new_object_layer_fn() else {
@@ -4691,71 +4713,118 @@ impl S3 for FS {
let mut tag_keys = std::collections::HashSet::with_capacity(tagging.tag_set.len());
for tag in &tagging.tag_set {
let key = tag
.key
.as_ref()
.filter(|k| !k.is_empty())
.ok_or_else(|| s3_error!(InvalidTag, "Tag key cannot be empty"))?;
let key = tag.key.as_ref().filter(|k| !k.is_empty()).ok_or_else(|| {
error!("Empty tag key");
s3_error!(InvalidTag, "Tag key cannot be empty")
})?;
if key.len() > 128 {
error!("Tag key too long: {} bytes", key.len());
return Err(s3_error!(InvalidTag, "Tag key is too long, maximum allowed length is 128 characters"));
}
let value = tag
.value
.as_ref()
.ok_or_else(|| s3_error!(InvalidTag, "Tag value cannot be null"))?;
let value = tag.value.as_ref().ok_or_else(|| {
error!("Null tag value");
s3_error!(InvalidTag, "Tag value cannot be null")
})?;
if value.is_empty() {
error!("Empty tag value");
return Err(s3_error!(InvalidTag, "Tag value cannot be empty"));
}
if value.len() > 256 {
error!("Tag value too long: {} bytes", value.len());
return Err(s3_error!(InvalidTag, "Tag value is too long, maximum allowed length is 256 characters"));
}
if !tag_keys.insert(key) {
error!("Duplicate tag key: {}", key);
return Err(s3_error!(InvalidTag, "Cannot provide multiple Tags with the same key"));
}
}
let tags = encode_tags(tagging.tag_set);
debug!("Encoded tags: {}", tags);
// TODO: getOpts
// TODO: Replicate
// TODO: getOpts, Replicate
// Support versioned objects
let version_id = req.input.version_id.clone();
let opts = ObjectOptions {
version_id: self.parse_version_id(version_id)?.map(Into::into),
..Default::default()
};
store
.put_object_tags(&bucket, &object, &tags, &ObjectOptions::default())
.await
.map_err(ApiError::from)?;
store.put_object_tags(&bucket, &object, &tags, &opts).await.map_err(|e| {
error!("Failed to put object tags: {}", e);
counter!("rustfs.put_object_tagging.failure").increment(1);
ApiError::from(e)
})?;
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.version_id(version_id);
// Invalidate cache for the tagged object
let manager = get_concurrency_manager();
let version_id = req.input.version_id.clone();
let cache_key = ConcurrencyManager::make_cache_key(&bucket, &object, version_id.clone().as_deref());
tokio::spawn(async move {
manager
.invalidate_cache_versioned(&bucket, &object, version_id.as_deref())
.await;
debug!("Cache invalidated for tagged object: {}", cache_key);
});
let result = Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }));
// Add metrics
counter!("rustfs.put_object_tagging.success").increment(1);
let version_id_resp = req.input.version_id.clone().unwrap_or_default();
helper = helper.version_id(version_id_resp);
let result = Ok(S3Response::new(PutObjectTaggingOutput {
version_id: req.input.version_id.clone(),
}));
let _ = helper.complete(&result);
let duration = start_time.elapsed();
histogram!("rustfs.object_tagging.operation.duration.seconds", "operation" => "put").record(duration.as_secs_f64());
result
}
#[instrument(level = "debug", skip(self))]
async fn get_object_tagging(&self, req: S3Request<GetObjectTaggingInput>) -> S3Result<S3Response<GetObjectTaggingOutput>> {
let start_time = std::time::Instant::now();
let GetObjectTaggingInput { bucket, key: object, .. } = req.input;
info!("Starting get_object_tagging for bucket: {}, object: {}", bucket, object);
let Some(store) = new_object_layer_fn() else {
error!("Store not initialized");
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// TODO: version
let tags = store
.get_object_tags(&bucket, &object, &ObjectOptions::default())
.await
.map_err(ApiError::from)?;
// Support versioned objects
let version_id = req.input.version_id.clone();
let opts = ObjectOptions {
version_id: self.parse_version_id(version_id)?.map(Into::into),
..Default::default()
};
let tags = store.get_object_tags(&bucket, &object, &opts).await.map_err(|e| {
if is_err_object_not_found(&e) {
error!("Object not found: {}", e);
return s3_error!(NoSuchKey);
}
error!("Failed to get object tags: {}", e);
ApiError::from(e).into()
})?;
let tag_set = decode_tags(tags.as_str());
debug!("Decoded tag set: {:?}", tag_set);
// Add metrics
counter!("rustfs.get_object_tagging.success").increment(1);
let duration = start_time.elapsed();
histogram!("rustfs.object_tagging.operation.duration.seconds", "operation" => "put").record(duration.as_secs_f64());
Ok(S3Response::new(GetObjectTaggingOutput {
tag_set,
version_id: None,
version_id: req.input.version_id.clone(),
}))
}
@@ -4764,25 +4833,56 @@ impl S3 for FS {
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
let start_time = std::time::Instant::now();
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedDeleteTagging, "s3:DeleteObjectTagging");
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input.clone();
let DeleteObjectTaggingInput {
bucket,
key: object,
version_id,
..
} = req.input.clone();
let Some(store) = new_object_layer_fn() else {
error!("Store not initialized");
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// TODO: Replicate
// TODO: version
store
.delete_object_tags(&bucket, &object, &ObjectOptions::default())
.await
.map_err(ApiError::from)?;
// Support versioned objects
let version_id_for_parse = version_id.clone();
let opts = ObjectOptions {
version_id: self.parse_version_id(version_id_for_parse)?.map(Into::into),
..Default::default()
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.version_id(version_id);
// TODO: Replicate (keep the original TODO, if further replication logic is needed)
store.delete_object_tags(&bucket, &object, &opts).await.map_err(|e| {
error!("Failed to delete object tags: {}", e);
ApiError::from(e)
})?;
let result = Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }));
// Invalidate cache for the deleted tagged object
let manager = get_concurrency_manager();
let version_id_clone = version_id.clone();
tokio::spawn(async move {
manager
.invalidate_cache_versioned(&bucket, &object, version_id_clone.as_deref())
.await;
debug!(
"Cache invalidated for deleted tagged object: bucket={}, object={}, version_id={:?}",
bucket, object, version_id_clone
);
});
// Add metrics
counter!("rustfs.delete_object_tagging.success").increment(1);
let version_id_resp = version_id.clone().unwrap_or_default();
helper = helper.version_id(version_id_resp);
let result = Ok(S3Response::new(DeleteObjectTaggingOutput { version_id }));
let _ = helper.complete(&result);
let duration = start_time.elapsed();
histogram!("rustfs.object_tagging.operation.duration.seconds", "operation" => "delete").record(duration.as_secs_f64());
result
}