feat(storage): integrate S3Operation into OperationHelper for unified metrics and audit (#2103)

This commit is contained in:
houseme
2026-03-08 17:57:33 +08:00
committed by GitHub
parent 8e4a1ef917
commit 60aa47bf61
39 changed files with 574 additions and 357 deletions

14
Cargo.lock generated
View File

@@ -7215,6 +7215,7 @@ dependencies = [
"rustfs-protocols",
"rustfs-protos",
"rustfs-rio",
"rustfs-s3-common",
"rustfs-s3select-api",
"rustfs-s3select-query",
"rustfs-scanner",
@@ -7277,6 +7278,7 @@ dependencies = [
"rumqttc",
"rustfs-config",
"rustfs-ecstore",
"rustfs-s3-common",
"rustfs-targets",
"serde",
"serde_json",
@@ -7415,6 +7417,7 @@ dependencies = [
"rustfs-policy",
"rustfs-protos",
"rustfs-rio",
"rustfs-s3-common",
"rustfs-signer",
"rustfs-utils",
"rustfs-workers",
@@ -7652,6 +7655,7 @@ dependencies = [
"rustc-hash",
"rustfs-config",
"rustfs-ecstore",
"rustfs-s3-common",
"rustfs-targets",
"rustfs-utils",
"serde",
@@ -7813,6 +7817,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "rustfs-s3-common"
version = "0.0.5"
dependencies = [
"metrics",
"serde",
"serde_json",
]
[[package]]
name = "rustfs-s3select-api"
version = "0.0.5"
@@ -7908,6 +7921,7 @@ dependencies = [
"reqwest 0.13.2",
"rumqttc",
"rustfs-config",
"rustfs-s3-common",
"rustfs-utils",
"serde",
"serde_json",

View File

@@ -39,6 +39,7 @@ members = [
"crates/protocols", # Protocol implementations (FTPS, SFTP, etc.)
"crates/protos", # Protocol buffer definitions
"crates/rio", # Rust I/O utilities and abstractions
"crates/s3-common", # Common utilities and data structures for S3 compatibility
"crates/s3select-api", # S3 Select API interface
"crates/s3select-query", # S3 Select query engine
"crates/scanner", # Scanner for data integrity checks and health monitoring
@@ -94,6 +95,7 @@ rustfs-obs = { path = "crates/obs", version = "0.0.5" }
rustfs-policy = { path = "crates/policy", version = "0.0.5" }
rustfs-protos = { path = "crates/protos", version = "0.0.5" }
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
rustfs-s3-common = { path = "crates/s3-common", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
rustfs-scanner = { path = "crates/scanner", version = "0.0.5" }

View File

@@ -29,6 +29,7 @@ categories = ["web-programming", "development-tools", "asynchronous", "api-bindi
rustfs-targets = { workspace = true }
rustfs-config = { workspace = true, features = ["audit", "constants"] }
rustfs-ecstore = { workspace = true }
rustfs-s3-common = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
const-str = { workspace = true }

View File

@@ -14,7 +14,7 @@
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use serde::{Deserialize, Serialize};
use serde_json::Value;

View File

@@ -44,6 +44,7 @@ rustfs-credentials = { workspace = true }
rustfs-common.workspace = true
rustfs-policy.workspace = true
rustfs-protos.workspace = true
rustfs-s3-common = { workspace = true }
async-trait.workspace = true
bytes.workspace = true
byteorder = { workspace = true }

View File

@@ -28,7 +28,6 @@ use crate::client::object_api_utils::new_getobjectreader;
use crate::error::Error;
use crate::error::StorageError;
use crate::error::{error_resp_to_object_err, is_err_object_not_found, is_err_version_not_found, is_network_or_host_down};
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id};
@@ -45,6 +44,7 @@ use rustfs_common::data_usage::TierStats;
use rustfs_common::heal_channel::rep_has_active_rules;
use rustfs_common::metrics::{IlmAction, Metrics};
use rustfs_filemeta::{NULL_VERSION_ID, RestoreStatusOps, is_restored_object_on_disk};
use rustfs_s3_common::EventName;
use rustfs_utils::path::encode_dir_object;
use rustfs_utils::string::strings_has_prefix_fold;
use s3s::Body;
@@ -471,7 +471,7 @@ impl TransitionState {
}
pub async fn init(api: Arc<ECStore>) {
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
let max_workers = env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
@@ -569,14 +569,14 @@ impl TransitionState {
pub async fn update_workers_inner(api: Arc<ECStore>, n: i64) {
let mut n = n;
if n == 0 {
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
let max_workers = env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
n = max_workers;
}
// Allow environment override of maximum workers
let absolute_max = std::env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
let absolute_max = env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(32);
@@ -603,7 +603,7 @@ impl TransitionState {
}
pub async fn init_background_expiry(api: Arc<ECStore>) {
let mut workers = std::env::var("RUSTFS_MAX_EXPIRY_WORKERS")
let mut workers = env::var("RUSTFS_MAX_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get(), 16));
@@ -615,7 +615,7 @@ pub async fn init_background_expiry(api: Arc<ECStore>) {
}
if workers == 0 {
workers = std::env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
workers = env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(8);
@@ -689,13 +689,13 @@ pub async fn expire_transitioned_object(
//let tags = LcAuditEvent::new(src, lcEvent).Tags();
if lc_event.action == IlmAction::DeleteRestoredAction {
opts.transition.expire_restored = true;
match api.delete_object(&oi.bucket, &oi.name, opts).await {
return match api.delete_object(&oi.bucket, &oi.name, opts).await {
Ok(dobj) => {
//audit_log_lifecycle(*oi, ILMExpiry, tags, traceFn);
return Ok(dobj);
Ok(dobj)
}
Err(err) => return Err(std::io::Error::other(err)),
}
Err(err) => Err(std::io::Error::other(err)),
};
}
let ret = delete_object_from_remote_tier(
@@ -732,7 +732,7 @@ pub async fn expire_transitioned_object(
..Default::default()
};
send_event(EventArgs {
event_name: event_name.as_ref().to_string(),
event_name: event_name.to_string(),
bucket_name: obj_info.bucket.clone(),
object: obj_info,
user_agent: "Internal: [ILM-Expiry]".to_string(),
@@ -847,8 +847,8 @@ pub async fn post_restore_opts(version_id: &str, bucket: &str, object: &str) ->
}
}
Ok(ObjectOptions {
versioned: versioned,
version_suspended: version_suspended,
versioned,
version_suspended,
version_id: Some(vid.to_string()),
..Default::default()
})
@@ -1033,12 +1033,12 @@ pub async fn eval_action_from_lifecycle(
let lock_enabled = if let Some(lr) = lr { lr.mode.is_some() } else { false };
match event.action {
lifecycle::IlmAction::DeleteAllVersionsAction | lifecycle::IlmAction::DelMarkerDeleteAllVersionsAction => {
IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => {
if lock_enabled {
return lifecycle::Event::default();
}
}
lifecycle::IlmAction::DeleteVersionAction | lifecycle::IlmAction::DeleteRestoredVersionAction => {
IlmAction::DeleteVersionAction | IlmAction::DeleteRestoredVersionAction => {
if oi.version_id.is_none() {
return lifecycle::Event::default();
}
@@ -1139,12 +1139,12 @@ pub async fn apply_expiry_on_non_transitioned_objects(
event_name = EventName::ObjectRemovedDeleteMarkerCreated;
}
match lc_event.action {
lifecycle::IlmAction::DeleteAllVersionsAction => event_name = EventName::ObjectRemovedDeleteAllVersions,
lifecycle::IlmAction::DelMarkerDeleteAllVersionsAction => event_name = EventName::ILMDelMarkerExpirationDelete,
IlmAction::DeleteAllVersionsAction => event_name = EventName::ObjectRemovedDeleteAllVersions,
IlmAction::DelMarkerDeleteAllVersionsAction => event_name = EventName::LifecycleDelMarkerExpirationDelete,
_ => (),
}
send_event(EventArgs {
event_name: event_name.as_ref().to_string(),
event_name: event_name.to_string(),
bucket_name: dobj.bucket.clone(),
object: dobj,
user_agent: "Internal: [ILM-Expiry]".to_string(),
@@ -1152,7 +1152,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
..Default::default()
});
if lc_event.action != lifecycle::IlmAction::NoneAction {
if lc_event.action != IlmAction::NoneAction {
let mut num_versions = 1_u64;
if lc_event.action.delete_all() {
num_versions = oi.num_versions as u64;
@@ -1172,15 +1172,15 @@ pub async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &
pub async fn apply_lifecycle_action(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
let mut success = false;
match event.action {
lifecycle::IlmAction::DeleteVersionAction
| lifecycle::IlmAction::DeleteAction
| lifecycle::IlmAction::DeleteRestoredAction
| lifecycle::IlmAction::DeleteRestoredVersionAction
| lifecycle::IlmAction::DeleteAllVersionsAction
| lifecycle::IlmAction::DelMarkerDeleteAllVersionsAction => {
IlmAction::DeleteVersionAction
| IlmAction::DeleteAction
| IlmAction::DeleteRestoredAction
| IlmAction::DeleteRestoredVersionAction
| IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction => {
success = apply_expiry_rule(event, src, oi).await;
}
lifecycle::IlmAction::TransitionAction | lifecycle::IlmAction::TransitionVersionAction => {
IlmAction::TransitionAction | IlmAction::TransitionVersionAction => {
success = apply_transition_rule(event, src, oi).await;
}
_ => (),

View File

@@ -26,7 +26,6 @@ use crate::client::api_get_options::{AdvancedGetOptions, StatObjectOptions};
use crate::config::com::save_config;
use crate::disk::BUCKET_META_PREFIX;
use crate::error::{Error, Result, is_err_object_not_found, is_err_version_not_found};
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::global::get_global_bucket_monitor;
@@ -41,6 +40,10 @@ use aws_smithy_types::body::SdkBody;
use byteorder::ByteOrder;
use futures::future::join_all;
use futures::stream::StreamExt;
use headers::{
AMZ_OBJECT_LOCK_LEGAL_HOLD, AMZ_OBJECT_LOCK_MODE, AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, AMZ_SERVER_SIDE_ENCRYPTION,
AMZ_STORAGE_CLASS, AMZ_TAG_COUNT, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LANGUAGE, CONTENT_TYPE,
};
use http::HeaderMap;
use http_body::Frame;
use http_body_util::StreamBody;
@@ -51,6 +54,7 @@ use rustfs_filemeta::{
ReplicationType, ReplicationWorkerOperation, ResyncDecision, ResyncTargetDecision, VersionPurgeStatusType,
get_replication_state, parse_replicate_decision, replication_statuses_map, target_reset_header, version_purge_statuses_map,
};
use rustfs_s3_common::EventName;
use rustfs_utils::http::{
AMZ_BUCKET_REPLICATION_STATUS, AMZ_OBJECT_TAGGING, AMZ_TAGGING_DIRECTIVE, CONTENT_ENCODING, HeaderExt as _,
RESERVED_METADATA_PREFIX, RESERVED_METADATA_PREFIX_LOWER, RUSTFS_REPLICATION_ACTUAL_OBJECT_SIZE,
@@ -570,7 +574,7 @@ impl ReplicationResyncer {
return;
}
let worker_idx = sip_hash(&roi.name, RESYNC_WORKER_COUNT, &DEFAULT_SIP_HASH_KEY) as usize;
let worker_idx = sip_hash(&roi.name, RESYNC_WORKER_COUNT, &DEFAULT_SIP_HASH_KEY);
if let Err(err) = worker_txs[worker_idx].send(roi).await {
error!("Failed to send object info to worker: {}", err);
@@ -1087,7 +1091,7 @@ pub async fn check_replicate_delete(
}
/// Check if the user-defined metadata contains SSEC encryption headers
fn is_ssec_encrypted(user_defined: &std::collections::HashMap<String, String>) -> bool {
fn is_ssec_encrypted(user_defined: &HashMap<String, String>) -> bool {
user_defined.contains_key(SSEC_ALGORITHM_HEADER)
|| user_defined.contains_key(SSEC_KEY_HEADER)
|| user_defined.contains_key(SSEC_KEY_MD5_HEADER)
@@ -1224,7 +1228,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
Ok(None) => {
warn!("No replication config found for bucket: {}", bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1243,7 +1247,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
Err(err) => {
warn!("replication config for bucket: {} error: {}", bucket, err);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1276,7 +1280,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
bucket, dobj.target_arn, err
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1304,7 +1308,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
bucket, dobj.delete_object.object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1329,7 +1333,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
bucket, dobj.delete_object.object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1370,7 +1374,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
let Some(tgt_client) = BucketTargetSys::get().get_remote_target_client(&bucket, &tgt_entry.arn).await else {
warn!("failed to get target for bucket:{:?}, arn:{:?}", &bucket, &tgt_entry.arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1401,7 +1405,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
Err(e) => {
error!("replicate_delete task failed: {}", e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1454,9 +1458,9 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
}
let event_name = if replication_status == ReplicationStatusType::Completed {
EventName::ObjectReplicationComplete.as_ref().to_string()
EventName::ObjectReplicationComplete.to_string()
} else {
EventName::ObjectReplicationFailed.as_ref().to_string()
EventName::ObjectReplicationFailed.to_string()
};
match storage
@@ -1509,7 +1513,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
Ok(None) => {
warn!("replicate force-delete: no replication config for bucket:{}", bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1525,7 +1529,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
Err(err) => {
warn!("replicate force-delete: replication config error bucket:{} error:{}", bucket, err);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1551,7 +1555,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
bucket, object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1574,7 +1578,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
bucket, object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1604,7 +1608,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
let Some(tgt_client) = BucketTargetSys::get().get_remote_target_client(bucket, &arn).await else {
warn!("replicate force-delete: failed to get target client bucket:{} arn:{}", bucket, arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1625,7 +1629,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
if BucketTargetSys::get().is_offline(&tgt_client.to_url()).await {
error!("replicate force-delete: target offline bucket:{} arn:{}", bucket, tgt_client.arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationFailed.as_ref().to_string(),
event_name: EventName::ObjectReplicationFailed.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1661,7 +1665,7 @@ async fn replicate_force_delete_to_targets<S: StorageAPI>(dobj: &DeletedObjectRe
bucket, object_name, tgt_client.arn, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationFailed.as_ref().to_string(),
event_name: EventName::ObjectReplicationFailed.to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
@@ -1794,7 +1798,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
Ok(None) => {
warn!("No replication config found for bucket: {}", bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1806,7 +1810,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
Err(err) => {
error!("Failed to get replication config for bucket {}: {}", bucket, err);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1832,7 +1836,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
let Some(tgt_client) = BucketTargetSys::get().get_remote_target_client(&bucket, &arn).await else {
warn!("failed to get target for bucket:{:?}, arn:{:?}", &bucket, &arn);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1866,7 +1870,7 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
Err(e) => {
error!("replicate_object task failed: {}", e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: roi.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1900,9 +1904,9 @@ pub async fn replicate_object<S: StorageAPI>(roi: ReplicateObjectInfo, storage:
}
let event_name = if replication_status == ReplicationStatusType::Completed {
EventName::ObjectReplicationComplete.as_ref().to_string()
EventName::ObjectReplicationComplete.to_string()
} else {
EventName::ObjectReplicationFailed.as_ref().to_string()
EventName::ObjectReplicationFailed.to_string()
};
send_event(EventArgs {
@@ -1957,7 +1961,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if BucketTargetSys::get().is_offline(&tgt_client.to_url()).await {
warn!("target is offline: {}", tgt_client.to_url());
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -1988,7 +1992,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
warn!("failed to get object reader for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2010,7 +2014,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
Err(e) => {
warn!("failed to get actual size for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2024,7 +2028,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if tgt_client.bucket.is_empty() {
warn!("target bucket is empty: {}", tgt_client.bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2081,7 +2085,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
bucket, tgt_client.arn, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2154,7 +2158,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if BucketTargetSys::get().is_offline(&tgt_client.to_url()).await {
warn!("target is offline: {}", tgt_client.to_url());
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2184,7 +2188,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if !is_err_object_not_found(&e) || is_err_version_not_found(&e) {
warn!("failed to get object reader for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: self.to_object_info(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2215,7 +2219,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
Err(e) => {
warn!("failed to get actual size for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2231,7 +2235,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if tgt_client.bucket.is_empty() {
warn!("target bucket is empty: {}", tgt_client.bucket);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2262,9 +2266,8 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
if replication_action == ReplicationAction::None {
if self.op_type == ReplicationType::ExistingObject
&& object_info.mod_time
> oi.last_modified.map(|dt| {
time::OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(time::OffsetDateTime::UNIX_EPOCH)
})
> oi.last_modified
.map(|dt| OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(OffsetDateTime::UNIX_EPOCH))
&& object_info.version_id.is_none()
{
warn!(
@@ -2274,7 +2277,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
tgt_client.to_url()
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info.clone(),
host: GLOBAL_LocalNodeName.to_string(),
@@ -2314,7 +2317,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
warn!("failed to head object for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2330,7 +2333,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
warn!("failed to head object for bucket:{} arn:{} error:{}", bucket, tgt_client.arn, e);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2360,7 +2363,7 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
bucket, tgt_client.arn, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
event_name: EventName::ObjectReplicationNotTracked.to_string(),
bucket_name: bucket.clone(),
object: object_info,
host: GLOBAL_LocalNodeName.to_string(),
@@ -2431,19 +2434,19 @@ impl ReplicateObjectInfoExt for ReplicateObjectInfo {
// Standard headers that needs to be extracted from User metadata.
static STANDARD_HEADERS: &[&str] = &[
headers::CONTENT_TYPE,
headers::CACHE_CONTROL,
headers::CONTENT_ENCODING,
headers::CONTENT_LANGUAGE,
headers::CONTENT_DISPOSITION,
headers::AMZ_STORAGE_CLASS,
headers::AMZ_OBJECT_TAGGING,
headers::AMZ_BUCKET_REPLICATION_STATUS,
headers::AMZ_OBJECT_LOCK_MODE,
headers::AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE,
headers::AMZ_OBJECT_LOCK_LEGAL_HOLD,
headers::AMZ_TAG_COUNT,
headers::AMZ_SERVER_SIDE_ENCRYPTION,
CONTENT_TYPE,
CACHE_CONTROL,
CONTENT_ENCODING,
CONTENT_LANGUAGE,
CONTENT_DISPOSITION,
AMZ_STORAGE_CLASS,
AMZ_OBJECT_TAGGING,
AMZ_BUCKET_REPLICATION_STATUS,
AMZ_OBJECT_LOCK_MODE,
AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE,
AMZ_OBJECT_LOCK_LEGAL_HOLD,
AMZ_TAG_COUNT,
AMZ_SERVER_SIDE_ENCRYPTION,
];
fn calc_put_object_header_size(put_opts: &PutObjectOptions) -> usize {
@@ -2577,7 +2580,7 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
meta.insert(REPLICATION_SSEC_CHECKSUM_HEADER.to_string(), encoded);
} else {
// Get checksum metadata for non-SSE-C objects
let (cs_meta, is_mp) = object_info.decrypt_checksums(0, &http::HeaderMap::new())?;
let (cs_meta, is_mp) = object_info.decrypt_checksums(0, &HeaderMap::new())?;
is_multipart = is_mp;
// Set object checksum metadata
@@ -2649,24 +2652,24 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
// Use case-insensitive lookup for headers
let lk_map = object_info.user_defined.clone();
if let Some(lang) = lk_map.lookup(headers::CONTENT_LANGUAGE) {
if let Some(lang) = lk_map.lookup(CONTENT_LANGUAGE) {
put_op.content_language = lang.to_string();
}
if let Some(cd) = lk_map.lookup(headers::CONTENT_DISPOSITION) {
if let Some(cd) = lk_map.lookup(CONTENT_DISPOSITION) {
put_op.content_disposition = cd.to_string();
}
if let Some(v) = lk_map.lookup(headers::CACHE_CONTROL) {
if let Some(v) = lk_map.lookup(CACHE_CONTROL) {
put_op.cache_control = v.to_string();
}
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_MODE) {
if let Some(v) = lk_map.lookup(AMZ_OBJECT_LOCK_MODE) {
let mode = v.to_string().to_uppercase();
put_op.mode = Some(aws_sdk_s3::types::ObjectLockRetentionMode::from(mode.as_str()));
}
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE) {
if let Some(v) = lk_map.lookup(AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE) {
put_op.retain_until_date =
OffsetDateTime::parse(v, &Rfc3339).map_err(|e| Error::other(format!("Failed to parse retain until date: {}", e)))?;
// set retention timestamp in opts
@@ -2680,7 +2683,7 @@ fn put_replication_opts(sc: &str, object_info: &ObjectInfo) -> Result<(PutObject
};
}
if let Some(v) = lk_map.lookup(headers::AMZ_OBJECT_LOCK_LEGAL_HOLD) {
if let Some(v) = lk_map.lookup(AMZ_OBJECT_LOCK_LEGAL_HOLD) {
let hold = v.to_uppercase();
put_op.legalhold = Some(ObjectLockLegalHoldStatus::from(hold.as_str()));
// set legalhold timestamp in opts
@@ -2865,7 +2868,7 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
&& oi1.mod_time
> oi2
.last_modified
.map(|dt| time::OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(time::OffsetDateTime::UNIX_EPOCH))
.map(|dt| OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(OffsetDateTime::UNIX_EPOCH))
&& oi1.version_id.is_none()
{
return ReplicationAction::None;
@@ -2884,7 +2887,7 @@ fn get_replication_action(oi1: &ObjectInfo, oi2: &HeadObjectOutput, op_type: Rep
|| oi1.mod_time
!= oi2
.last_modified
.map(|dt| time::OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(time::OffsetDateTime::UNIX_EPOCH))
.map(|dt| OffsetDateTime::from_unix_timestamp(dt.secs()).unwrap_or(OffsetDateTime::UNIX_EPOCH))
{
return ReplicationAction::All;
}

View File

@@ -13,6 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Defines the EventName enum which represents the various S3 event types that can trigger notifications.
//! This enum includes both specific event types (e.g., ObjectCreated:Put) and aggregate types (e.g., ObjectCreated:*). Each variant has methods to expand into its constituent event types and to compute a bitmask for efficient filtering.
//! The EventName enum is used in the event notification system to determine which events should trigger notifications based on the configured rules.
//!
//! @Deprecated: This module is currently not fully implemented and serves as a placeholder for future development of the event notification system. The EventName enum and its associated methods are defined, but the actual logic for handling events and sending notifications is not yet implemented.
#[derive(Default, Clone)]
pub enum EventName {
ObjectAccessedGet,

View File

@@ -15,7 +15,7 @@
#![allow(unused_variables)]
use crate::bucket::metadata::BucketMetadata;
use crate::event::name::EventName;
// use crate::event::name::EventName;
use crate::event::targetlist::TargetList;
use crate::store::ECStore;
use crate::store_api::ObjectInfo;

View File

@@ -48,7 +48,7 @@ use crate::{
UpdateMetadataOpts, endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk,
},
error::{StorageError, to_object_err},
event::name::EventName,
// event::name::EventName,
event_notification::{EventArgs, send_event},
global::{GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id, is_dist_erasure},
store_api::{
@@ -79,6 +79,7 @@ use rustfs_lock::local_lock::LocalLock;
use rustfs_lock::{FastLockGuard, NamespaceLock, NamespaceLockGuard, NamespaceLockWrapper, ObjectKey};
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, WarpReader};
use rustfs_s3_common::EventName;
use rustfs_utils::http::RUSTFS_BUCKET_REPLICATION_SSEC_CHECKSUM;
use rustfs_utils::http::headers::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX, RESERVED_METADATA_PREFIX_LOWER};
@@ -1682,17 +1683,17 @@ impl ObjectOperations for SetDisks {
if let Err(err) = rv {
return Err(StorageError::Io(err));
}
let rv = rv.unwrap();
let rv = rv?;
fi.transition_status = TRANSITION_COMPLETE.to_string();
fi.transitioned_objname = dest_obj;
fi.transition_tier = opts.transition.tier.clone();
fi.transition_version_id = if rv.is_empty() { None } else { Some(Uuid::parse_str(&rv)?) };
let mut event_name = EventName::ObjectTransitionComplete.as_ref();
let mut event_name = EventName::ObjectTransitionComplete.as_str();
let disks = self.get_disks(0, 0).await?;
if let Err(err) = self.delete_object_version(bucket, object, &fi, false).await {
event_name = EventName::ObjectTransitionFailed.as_ref();
event_name = EventName::ObjectTransitionFailed.as_str();
}
for disk in disks.iter() {

View File

@@ -28,6 +28,7 @@ documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/"
[dependencies]
rustfs-config = { workspace = true, features = ["notify", "constants"] }
rustfs-ecstore = { workspace = true }
rustfs-s3-common = { workspace = true }
rustfs-targets = { workspace = true }
rustfs-utils = { workspace = true }
arc-swap = { workspace = true }

View File

@@ -24,7 +24,7 @@ use rustfs_config::{
use rustfs_ecstore::config::{Config, KV, KVS};
use rustfs_notify::{BucketNotificationConfig, Event, NotificationError};
use rustfs_notify::{initialize, notification_system};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -24,7 +24,7 @@ use rustfs_config::{
use rustfs_ecstore::config::{Config, KV, KVS};
use rustfs_notify::{BucketNotificationConfig, Event, NotificationError};
use rustfs_notify::{initialize, notification_system};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -14,7 +14,7 @@
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use serde::{Deserialize, Serialize};
use url::form_urlencoded;

View File

@@ -14,7 +14,8 @@
use crate::{BucketNotificationConfig, Event, EventArgs, LifecycleError, NotificationError, NotificationSystem};
use rustfs_ecstore::config::Config;
use rustfs_targets::{EventName, arn::TargetID};
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::{Arc, OnceLock};
use tracing::error;

View File

@@ -20,7 +20,7 @@ use crate::{
use hashbrown::HashMap;
use rustfs_config::notify::{DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY, ENV_NOTIFY_TARGET_STREAM_CONCURRENCY};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;

View File

@@ -14,7 +14,7 @@
use crate::BucketNotificationConfig;
use crate::rules::{BucketRulesSnapshot, DynRulesContainer, SubscriberIndex};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
/// NotificationSystemSubscriberView - Provides an interface to manage and query
/// the subscription status of buckets in the notification system.

View File

@@ -15,7 +15,7 @@
use crate::{error::NotificationError, event::Event, rules::RulesMap};
use hashbrown::HashMap;
use rustfs_config::notify::{DEFAULT_NOTIFY_SEND_CONCURRENCY, ENV_NOTIFY_SEND_CONCURRENCY};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::Target;
use rustfs_targets::arn::TargetID;
use rustfs_targets::target::EntityTarget;

View File

@@ -16,7 +16,7 @@ use super::rules_map::RulesMap;
use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::rules::NotificationConfiguration;
use crate::rules::subscriber_snapshot::{BucketRulesSnapshot, DynRulesContainer, RuleEvents, RulesContainer};
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::io::Read;

View File

@@ -14,7 +14,7 @@
use crate::rules::{PatternRules, TargetIdSet};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};

View File

@@ -14,7 +14,7 @@
use crate::rules::{BucketRulesSnapshot, BucketSnapshotRef, DynRulesContainer};
use arc_swap::ArcSwap;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use starshard::ShardedHashMap;
use std::fmt;
use std::sync::Arc;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use std::sync::Arc;
/// Let the rules structure provide "what events it is subscribed to".

View File

@@ -14,7 +14,7 @@
use crate::rules::pattern;
use hashbrown::HashSet;
use rustfs_targets::EventName;
use rustfs_s3_common::EventName;
use rustfs_targets::arn::{ARN, ArnError, TargetIDError};
use serde::{Deserialize, Serialize};
use std::io::Read;

View File

@@ -0,0 +1,33 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-s3-common"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
homepage.workspace = true
description = "Common S3 definitions and metrics for RustFS."
keywords = ["s3", "common", "rustfs"]
categories = ["data-structures"]
[lints]
workspace = true
[dependencies]
metrics = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@@ -0,0 +1,58 @@
# RustFS S3 Common
`rustfs-s3-common` provides shared types, utilities, and definitions for S3-compatible operations within the RustFS
ecosystem. It serves as a foundational crate for handling S3 event notifications, metrics, and operation definitions.
## Features
- **Event Definitions**: Comprehensive `EventName` enum covering standard AWS S3 event notifications (e.g.,
`s3:ObjectCreated:Put`, `s3:ObjectRemoved:Delete`) and RustFS-specific extensions.
- **S3 Operations**: `S3Operation` enum defining supported S3 API actions, used for metrics tracking and audit logging.
- **Metrics Integration**: Utilities for recording S3 operation metrics (`record_s3_op`) using the `metrics` crate.
- **Type Mapping**: robust mapping between `EventName` and `S3Operation` to bridge the gap between API calls and event
notifications.
## Usage
Add this crate to your `Cargo.toml`:
```toml
[dependencies]
rustfs-s3-common = { path = "../s3-common" }
```
### Event Names and Operations
```rust
use rustfs_s3_common::{EventName, S3Operation};
// Parse an event string
let event = EventName::parse("s3:ObjectCreated:Put").unwrap();
assert_eq!(event, EventName::ObjectCreatedPut);
// Map event to S3 operation
let op = event.to_s3_operation();
assert_eq!(op, Some(S3Operation::PutObject));
// Get string representation
assert_eq!(S3Operation::PutObject.as_str(), "s3:PutObject");
```
### Metrics
Initialize and record metrics for S3 operations:
```rust
use rustfs_s3_common::{init_s3_metrics, record_s3_op};
use rustfs_s3_common::S3Operation;
// Initialize metrics (call once)
init_s3_metrics();
// Record an operation
record_s3_op(S3Operation::GetObject, "my-bucket");
```
## License
This project is licensed under the [Apache-2.0 License](../../LICENSE).

View File

@@ -1,16 +1,16 @@
// Copyright 2024 RustFS Team
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
@@ -74,6 +74,11 @@ pub enum EventName {
ObjectScannerAll, // New, from Go
#[default]
Everything, // New, from Go
// Internal events for metrics (not exposed to S3 notifications)
ObjectRemovedAbortMultipartUpload,
ObjectCreatedCreateMultipartUpload,
ObjectRemovedDeleteObjects,
}
// Single event type sequential array for Everything.expand()
@@ -205,6 +210,9 @@ impl EventName {
// Go's String() returns "" for ObjectScannerAll and Everything
EventName::ObjectScannerAll => "s3:Scanner:*", // Follow the pattern in Go Expand
EventName::Everything => "", // Go String() returns "" to unprocessed
EventName::ObjectRemovedAbortMultipartUpload => "s3:ObjectRemoved:AbortMultipartUpload",
EventName::ObjectCreatedCreateMultipartUpload => "s3:ObjectCreated:CreateMultipartUpload",
EventName::ObjectRemovedDeleteObjects => "s3:ObjectRemoved:DeleteObjects",
}
}
@@ -274,6 +282,35 @@ impl EventName {
mask
}
}
/// Returns the corresponding S3Operation if the event triggers a notification event.
pub fn to_s3_operation(&self) -> Option<S3Operation> {
match self {
EventName::BucketCreated => Some(S3Operation::CreateBucket),
EventName::BucketRemoved => Some(S3Operation::DeleteBucket),
EventName::ObjectAccessedGet => Some(S3Operation::GetObject),
EventName::ObjectAccessedGetRetention => Some(S3Operation::GetObjectRetention),
EventName::ObjectAccessedGetLegalHold => Some(S3Operation::GetObjectLegalHold),
EventName::ObjectAccessedHead => Some(S3Operation::HeadObject),
EventName::ObjectAccessedAttributes => Some(S3Operation::GetObjectAttributes),
EventName::ObjectCreatedCompleteMultipartUpload => Some(S3Operation::CompleteMultipartUpload),
EventName::ObjectCreatedCopy => Some(S3Operation::CopyObject),
EventName::ObjectCreatedPost => Some(S3Operation::PutObject),
EventName::ObjectCreatedPut => Some(S3Operation::PutObject),
EventName::ObjectCreatedPutRetention => Some(S3Operation::PutObjectRetention),
EventName::ObjectCreatedPutLegalHold => Some(S3Operation::PutObjectLegalHold),
EventName::ObjectCreatedPutTagging => Some(S3Operation::PutObjectTagging),
EventName::ObjectCreatedDeleteTagging => Some(S3Operation::DeleteObjectTagging),
EventName::ObjectRemovedDelete => Some(S3Operation::DeleteObject),
EventName::ObjectRemovedDeleteMarkerCreated => Some(S3Operation::DeleteObject),
EventName::ObjectRemovedDeleteAllVersions => Some(S3Operation::DeleteObject),
EventName::ObjectRestorePost => Some(S3Operation::RestoreObject),
EventName::ObjectRemovedAbortMultipartUpload => Some(S3Operation::AbortMultipartUpload),
EventName::ObjectCreatedCreateMultipartUpload => Some(S3Operation::CreateMultipartUpload),
EventName::ObjectRemovedDeleteObjects => Some(S3Operation::DeleteObjects),
_ => None,
}
}
}
impl fmt::Display for EventName {
@@ -309,6 +346,176 @@ impl<'de> serde::de::Deserialize<'de> for EventName {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum S3Operation {
AbortMultipartUpload,
CompleteMultipartUpload,
CopyObject,
CreateBucket,
CreateMultipartUpload,
DeleteBucket,
DeleteBucketCors,
DeleteBucketEncryption,
DeleteBucketLifecycle,
DeleteBucketPolicy,
DeleteBucketReplication,
DeleteBucketTagging,
DeleteObject,
DeleteObjectTagging,
DeleteObjects,
DeletePublicAccessBlock,
GetBucketAcl,
GetBucketCors,
GetBucketEncryption,
GetBucketLifecycleConfiguration,
GetBucketLocation,
GetBucketLogging,
GetBucketNotificationConfiguration,
GetBucketPolicy,
GetBucketPolicyStatus,
GetBucketReplication,
GetBucketTagging,
GetBucketVersioning,
GetObject,
GetObjectAcl,
GetObjectAttributes,
GetObjectLegalHold,
GetObjectLockConfiguration,
GetObjectRetention,
GetObjectTagging,
GetObjectTorrent,
GetPublicAccessBlock,
HeadBucket,
HeadObject,
ListBuckets,
ListMultipartUploads,
ListObjectVersions,
ListObjects,
ListObjectsV2,
ListParts,
PutBucketAcl,
PutBucketCors,
PutBucketEncryption,
PutBucketLifecycleConfiguration,
PutBucketLogging,
PutBucketNotificationConfiguration,
PutBucketPolicy,
PutBucketReplication,
PutBucketTagging,
PutBucketVersioning,
PutObject,
PutObjectAcl,
PutObjectLegalHold,
PutObjectLockConfiguration,
PutObjectRetention,
PutObjectTagging,
PutPublicAccessBlock,
RestoreObject,
SelectObjectContent,
UploadPart,
UploadPartCopy,
}
impl S3Operation {
pub fn as_str(self) -> &'static str {
match self {
Self::AbortMultipartUpload => "s3:AbortMultipartUpload",
Self::CompleteMultipartUpload => "s3:CompleteMultipartUpload",
Self::CopyObject => "s3:CopyObject",
Self::CreateBucket => "s3:CreateBucket",
Self::CreateMultipartUpload => "s3:CreateMultipartUpload",
Self::DeleteBucket => "s3:DeleteBucket",
Self::DeleteBucketCors => "s3:DeleteBucketCors",
Self::DeleteBucketEncryption => "s3:DeleteBucketEncryption",
Self::DeleteBucketLifecycle => "s3:DeleteBucketLifecycle",
Self::DeleteBucketPolicy => "s3:DeleteBucketPolicy",
Self::DeleteBucketReplication => "s3:DeleteBucketReplication",
Self::DeleteBucketTagging => "s3:DeleteBucketTagging",
Self::DeleteObject => "s3:DeleteObject",
Self::DeleteObjectTagging => "s3:DeleteObjectTagging",
Self::DeleteObjects => "s3:DeleteObjects",
Self::DeletePublicAccessBlock => "s3:DeletePublicAccessBlock",
Self::GetBucketAcl => "s3:GetBucketAcl",
Self::GetBucketCors => "s3:GetBucketCors",
Self::GetBucketEncryption => "s3:GetBucketEncryption",
Self::GetBucketLifecycleConfiguration => "s3:GetBucketLifecycleConfiguration",
Self::GetBucketLocation => "s3:GetBucketLocation",
Self::GetBucketLogging => "s3:GetBucketLogging",
Self::GetBucketNotificationConfiguration => "s3:GetBucketNotificationConfiguration",
Self::GetBucketPolicy => "s3:GetBucketPolicy",
Self::GetBucketPolicyStatus => "s3:GetBucketPolicyStatus",
Self::GetBucketReplication => "s3:GetBucketReplication",
Self::GetBucketTagging => "s3:GetBucketTagging",
Self::GetBucketVersioning => "s3:GetBucketVersioning",
Self::GetObject => "s3:GetObject",
Self::GetObjectAcl => "s3:GetObjectAcl",
Self::GetObjectAttributes => "s3:GetObjectAttributes",
Self::GetObjectLegalHold => "s3:GetObjectLegalHold",
Self::GetObjectLockConfiguration => "s3:GetObjectLockConfiguration",
Self::GetObjectRetention => "s3:GetObjectRetention",
Self::GetObjectTagging => "s3:GetObjectTagging",
Self::GetObjectTorrent => "s3:GetObjectTorrent",
Self::GetPublicAccessBlock => "s3:GetPublicAccessBlock",
Self::HeadBucket => "s3:HeadBucket",
Self::HeadObject => "s3:HeadObject",
Self::ListBuckets => "s3:ListBuckets",
Self::ListMultipartUploads => "s3:ListMultipartUploads",
Self::ListObjectVersions => "s3:ListObjectVersions",
Self::ListObjects => "s3:ListObjects",
Self::ListObjectsV2 => "s3:ListObjectsV2",
Self::ListParts => "s3:ListParts",
Self::PutBucketAcl => "s3:PutBucketAcl",
Self::PutBucketCors => "s3:PutBucketCors",
Self::PutBucketEncryption => "s3:PutBucketEncryption",
Self::PutBucketLifecycleConfiguration => "s3:PutBucketLifecycleConfiguration",
Self::PutBucketLogging => "s3:PutBucketLogging",
Self::PutBucketNotificationConfiguration => "s3:PutBucketNotificationConfiguration",
Self::PutBucketPolicy => "s3:PutBucketPolicy",
Self::PutBucketReplication => "s3:PutBucketReplication",
Self::PutBucketTagging => "s3:PutBucketTagging",
Self::PutBucketVersioning => "s3:PutBucketVersioning",
Self::PutObject => "s3:PutObject",
Self::PutObjectAcl => "s3:PutObjectAcl",
Self::PutObjectLegalHold => "s3:PutObjectLegalHold",
Self::PutObjectLockConfiguration => "s3:PutObjectLockConfiguration",
Self::PutObjectRetention => "s3:PutObjectRetention",
Self::PutObjectTagging => "s3:PutObjectTagging",
Self::PutPublicAccessBlock => "s3:PutPublicAccessBlock",
Self::RestoreObject => "s3:RestoreObject",
Self::SelectObjectContent => "s3:SelectObjectContent",
Self::UploadPart => "s3:UploadPart",
Self::UploadPartCopy => "s3:UploadPartCopy",
}
}
/// Returns the corresponding EventName if the operation triggers a notification event.
pub fn to_event_name(self) -> Option<EventName> {
match self {
Self::CompleteMultipartUpload => Some(EventName::ObjectCreatedCompleteMultipartUpload),
Self::CopyObject => Some(EventName::ObjectCreatedCopy),
Self::CreateBucket => Some(EventName::BucketCreated),
Self::DeleteBucket => Some(EventName::BucketRemoved),
Self::DeleteObject => Some(EventName::ObjectRemovedDelete),
Self::DeleteObjects => Some(EventName::ObjectRemovedDeleteObjects),
Self::DeleteObjectTagging => Some(EventName::ObjectCreatedDeleteTagging),
Self::GetObject => Some(EventName::ObjectAccessedGet),
Self::GetObjectAttributes => Some(EventName::ObjectAccessedAttributes),
Self::GetObjectLegalHold => Some(EventName::ObjectAccessedGetLegalHold),
Self::GetObjectRetention => Some(EventName::ObjectAccessedGetRetention),
Self::HeadObject => Some(EventName::ObjectAccessedHead),
Self::PutObject => Some(EventName::ObjectCreatedPut),
Self::PutObjectLegalHold => Some(EventName::ObjectCreatedPutLegalHold),
Self::PutObjectRetention => Some(EventName::ObjectCreatedPutRetention),
Self::PutObjectTagging => Some(EventName::ObjectCreatedPutTagging),
Self::RestoreObject => Some(EventName::ObjectRestorePost),
Self::SelectObjectContent => Some(EventName::ObjectAccessedGet),
Self::AbortMultipartUpload => Some(EventName::ObjectRemovedAbortMultipartUpload),
Self::CreateMultipartUpload => Some(EventName::ObjectCreatedCreateMultipartUpload),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -363,4 +570,31 @@ mod tests {
let deserialized = serde_json::from_str::<EventName>(serialized_str);
assert!(deserialized.is_err(), "Deserialization should fail for empty string");
}
#[test]
fn test_s3_operation_to_event_name() {
assert_eq!(S3Operation::PutObject.to_event_name(), Some(EventName::ObjectCreatedPut));
assert_eq!(S3Operation::GetObject.to_event_name(), Some(EventName::ObjectAccessedGet));
assert_eq!(S3Operation::ListBuckets.to_event_name(), None);
assert_eq!(S3Operation::RestoreObject.to_event_name(), Some(EventName::ObjectRestorePost));
assert_eq!(S3Operation::SelectObjectContent.to_event_name(), Some(EventName::ObjectAccessedGet));
assert_eq!(
S3Operation::AbortMultipartUpload.to_event_name(),
Some(EventName::ObjectRemovedAbortMultipartUpload)
);
}
#[test]
fn test_event_name_to_s3_operation() {
assert_eq!(EventName::ObjectCreatedPut.to_s3_operation(), Some(S3Operation::PutObject));
assert_eq!(EventName::ObjectAccessedGet.to_s3_operation(), Some(S3Operation::GetObject));
assert_eq!(EventName::BucketCreated.to_s3_operation(), Some(S3Operation::CreateBucket));
assert_eq!(EventName::Everything.to_s3_operation(), None);
assert_eq!(EventName::ObjectRestorePost.to_s3_operation(), Some(S3Operation::RestoreObject));
assert_eq!(EventName::ObjectCreatedPost.to_s3_operation(), Some(S3Operation::PutObject));
assert_eq!(
EventName::ObjectRemovedAbortMultipartUpload.to_s3_operation(),
Some(S3Operation::AbortMultipartUpload)
);
}
}

View File

@@ -0,0 +1,19 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod event_name;
mod s3_metrics;
pub use event_name::{EventName, ParseEventNameError, S3Operation};
pub use s3_metrics::{init_s3_metrics, record_s3_op};

View File

@@ -0,0 +1,43 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::S3Operation;
use metrics::{counter, describe_counter};
use std::sync::OnceLock;
const S3_OPS_METRIC: &str = "rustfs_s3_operations_total";
/// Records an S3 operation in the metrics system.
/// This function should be called whenever an S3 API operation is handled, allowing us to track the usage of different S3 operations across buckets.
///
/// # Arguments
/// * `op` - The S3 operation being recorded.
/// * `bucket` - The name of the bucket associated with the operation, used as a label for more granular metrics analysis.
///
/// Example usage:
/// ```ignore
/// record_s3_op(S3Operation::GetObject, "my-bucket");
/// ```
pub fn record_s3_op(op: S3Operation, bucket: &str) {
counter!(S3_OPS_METRIC, "op" => op.as_str(), "bucket" => bucket.to_owned()).increment(1);
}
/// One-time registration of indicator meta information
/// This function ensures that metric descriptors are registered only once.
pub fn init_s3_metrics() {
static METRICS_DESC_INIT: OnceLock<()> = OnceLock::new();
METRICS_DESC_INIT.get_or_init(|| {
describe_counter!(S3_OPS_METRIC, "Total number of S3 API operations handled");
});
}

View File

@@ -14,6 +14,7 @@ documentation = "https://docs.rs/rustfs-target/latest/rustfs_target/"
[dependencies]
rustfs-config = { workspace = true, features = ["notify", "constants", "audit"] }
rustfs-utils = { workspace = true, features = ["sys", "notify"] }
rustfs-s3-common = { workspace = true }
async-trait = { workspace = true }
reqwest = { workspace = true }
rumqttc = { workspace = true }

View File

@@ -15,13 +15,12 @@
pub mod arn;
mod check;
pub mod error;
mod event_name;
pub mod store;
pub mod target;
pub use check::check_mqtt_broker_available;
pub use error::{StoreError, TargetError};
pub use event_name::EventName;
pub use rustfs_s3_common::EventName;
use serde::{Deserialize, Serialize};
pub use target::Target;

View File

@@ -14,8 +14,9 @@
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{EventName, StoreError, TargetError};
use crate::{StoreError, TargetError};
use async_trait::async_trait;
use rustfs_s3_common::EventName;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt::Formatter;

View File

@@ -62,6 +62,7 @@ rustfs-policy = { workspace = true }
rustfs-protocols = { workspace = true }
rustfs-protos = { workspace = true }
rustfs-rio.workspace = true
rustfs-s3-common = { workspace = true }
rustfs-s3select-api = { workspace = true }
rustfs-s3select-query = { workspace = true }
rustfs-targets = { workspace = true }

View File

@@ -46,6 +46,7 @@ use rustfs_policy::policy::{
action::{Action, S3Action},
{BucketPolicy, BucketPolicyArgs, Effect, Validator},
};
use rustfs_s3_common::S3Operation;
use rustfs_targets::{
EventName,
arn::{ARN, TargetIDError},
@@ -145,7 +146,7 @@ impl DefaultBucketUsecase {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::BucketCreated, "s3:CreateBucket");
let helper = OperationHelper::new(&req, EventName::BucketCreated, S3Operation::CreateBucket);
let requester_id = match req_info_ref(&req) {
Ok(r) => r.cred.as_ref().map(|c| c.access_key.clone()),
Err(_) => {
@@ -239,7 +240,7 @@ impl DefaultBucketUsecase {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::BucketRemoved, "s3:DeleteBucket");
let helper = OperationHelper::new(&req, EventName::BucketRemoved, S3Operation::DeleteBucket);
let input = req.input.clone();
let Some(store) = new_object_layer_fn() else {

View File

@@ -43,6 +43,7 @@ use rustfs_ecstore::store_api::{CompletePart, HTTPRangeSpec, MultipartUploadResu
use rustfs_ecstore::store_api::{MultipartOperations, ObjectOperations};
use rustfs_filemeta::{ReplicationStatusType, ReplicationType};
use rustfs_rio::{CompressReader, HashReader, Reader, WarpReader};
use rustfs_s3_common::S3Operation;
use rustfs_targets::EventName;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::{
@@ -145,8 +146,11 @@ impl DefaultMultipartUsecase {
let _ = context.object_store();
}
let mut helper =
OperationHelper::new(&req, EventName::ObjectCreatedCompleteMultipartUpload, "s3:CompleteMultipartUpload");
let mut helper = OperationHelper::new(
&req,
EventName::ObjectCreatedCompleteMultipartUpload,
S3Operation::CompleteMultipartUpload,
);
let input = req.input;
let CompleteMultipartUploadInput {
multipart_upload,
@@ -425,7 +429,7 @@ impl DefaultMultipartUsecase {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:CreateMultipartUpload");
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, S3Operation::CreateMultipartUpload);
let CreateMultipartUploadInput {
bucket,
key,

View File

@@ -74,6 +74,7 @@ use rustfs_filemeta::{
use rustfs_notify::EventArgsBuilder;
use rustfs_policy::policy::action::{Action, S3Action};
use rustfs_rio::{CompressReader, EtagReader, HashReader, Reader, WarpReader};
use rustfs_s3_common::S3Operation;
use rustfs_s3select_api::{
object_store::bytes_stream,
query::{Context, Query},
@@ -266,7 +267,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, S3Operation::PutObject);
if req
.headers
.get("X-Amz-Meta-Snowball-Auto-Extract")
@@ -614,7 +615,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, "s3:PutObjectLegalHold");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, S3Operation::PutObjectLegalHold);
let PutObjectLegalHoldInput {
bucket,
key,
@@ -743,7 +744,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, "s3:PutObjectRetention");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, S3Operation::PutObjectRetention);
let PutObjectRetentionInput {
bucket,
key,
@@ -835,7 +836,7 @@ impl DefaultObjectUsecase {
}
let start_time = std::time::Instant::now();
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutTagging, "s3:PutObjectTagging");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutTagging, S3Operation::PutObjectTagging);
let PutObjectTaggingInput {
bucket,
key: object,
@@ -944,7 +945,7 @@ impl DefaultObjectUsecase {
debug!("GetObject request started with {} concurrent requests", concurrent_requests);
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject");
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, S3Operation::GetObject);
// mc get 3
let GetObjectInput {
@@ -1524,7 +1525,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedAttributes, "s3:GetObjectAttributes");
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedAttributes, S3Operation::GetObjectAttributes);
let GetObjectAttributesInput {
bucket,
key,
@@ -1756,7 +1757,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetLegalHold, "s3:GetObjectLegalHold");
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetLegalHold, S3Operation::GetObjectLegalHold);
let GetObjectLegalHoldInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -1847,7 +1848,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetRetention, "s3:GetObjectRetention");
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetRetention, S3Operation::GetObjectRetention);
let GetObjectRetentionInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -1941,7 +1942,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedCopy, "s3:CopyObject");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedCopy, S3Operation::CopyObject);
let CopyObjectInput {
copy_source,
bucket,
@@ -2237,7 +2238,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObjects").suppress_event();
let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, S3Operation::DeleteObjects).suppress_event();
let (bucket, delete) = {
let bucket = req.input.bucket.clone();
let delete = req.input.delete.clone();
@@ -2538,7 +2539,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObject");
let mut helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, S3Operation::DeleteObject);
let DeleteObjectInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -2722,7 +2723,7 @@ impl DefaultObjectUsecase {
}
let start_time = std::time::Instant::now();
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedDeleteTagging, "s3:DeleteObjectTagging");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedDeleteTagging, S3Operation::DeleteObjectTagging);
let DeleteObjectTaggingInput {
bucket,
key: object,
@@ -2776,7 +2777,7 @@ impl DefaultObjectUsecase {
let _ = context.object_store();
}
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedHead, "s3:HeadObject");
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedHead, S3Operation::HeadObject);
// mc get 2
let HeadObjectInput {
bucket,
@@ -3352,7 +3353,7 @@ impl DefaultObjectUsecase {
#[instrument(level = "debug", skip(self, req))]
pub async fn execute_put_object_extract(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject").suppress_event();
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, S3Operation::PutObject).suppress_event();
let input = req.input;
let PutObjectInput {

View File

@@ -15,13 +15,13 @@
use crate::app::bucket_usecase::DefaultBucketUsecase;
use crate::app::multipart_usecase::DefaultMultipartUsecase;
use crate::app::object_usecase::DefaultObjectUsecase;
use crate::storage::s3_metrics::{S3Operation, record_s3_op};
use rustfs_ecstore::{
bucket::tagging::decode_tags_to_map,
error::{is_err_bucket_not_found, is_err_object_not_found, is_err_version_not_found},
new_object_layer_fn,
store_api::{BucketOperations, BucketOptions, ObjectOperations, ObjectOptions},
};
use rustfs_s3_common::{S3Operation, record_s3_op};
use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, dto::*, s3_error};
use std::{fmt::Debug, sync::LazyLock};
use tokio::io::{AsyncRead, AsyncSeek};
@@ -98,7 +98,7 @@ impl AsyncSeek for InMemoryAsyncReader {
impl FS {
pub fn new() -> Self {
crate::storage::s3_metrics::init_s3_metrics();
rustfs_s3_common::init_s3_metrics();
Self {}
}
@@ -190,7 +190,6 @@ impl S3 for FS {
&self,
req: S3Request<AbortMultipartUploadInput>,
) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
record_s3_op(S3Operation::AbortMultipartUpload, &req.input.bucket);
let usecase = DefaultMultipartUsecase::from_global();
usecase.execute_abort_multipart_upload(req).await
}
@@ -200,7 +199,6 @@ impl S3 for FS {
&self,
req: S3Request<CompleteMultipartUploadInput>,
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
record_s3_op(S3Operation::CompleteMultipartUpload, &req.input.bucket);
let usecase = DefaultMultipartUsecase::from_global();
usecase.execute_complete_multipart_upload(req).await
}
@@ -208,7 +206,6 @@ impl S3 for FS {
/// Copy an object from one location to another
#[instrument(level = "debug", skip(self, req))]
async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
record_s3_op(S3Operation::CopyObject, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_copy_object(req).await
}
@@ -219,7 +216,6 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
record_s3_op(S3Operation::CreateBucket, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_create_bucket(req).await
}
@@ -229,7 +225,6 @@ impl S3 for FS {
&self,
req: S3Request<CreateMultipartUploadInput>,
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
record_s3_op(S3Operation::CreateMultipartUpload, &req.input.bucket);
let usecase = DefaultMultipartUsecase::from_global();
usecase.execute_create_multipart_upload(req).await
}
@@ -237,14 +232,12 @@ impl S3 for FS {
/// Delete a bucket
#[instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
record_s3_op(S3Operation::DeleteBucket, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket(req).await
}
#[instrument(level = "debug", skip(self))]
async fn delete_bucket_cors(&self, req: S3Request<DeleteBucketCorsInput>) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
record_s3_op(S3Operation::DeleteBucketCors, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket_cors(req).await
}
@@ -253,7 +246,6 @@ impl S3 for FS {
&self,
req: S3Request<DeleteBucketEncryptionInput>,
) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
record_s3_op(S3Operation::DeleteBucketEncryption, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket_encryption(req).await
}
@@ -263,7 +255,6 @@ impl S3 for FS {
&self,
req: S3Request<DeleteBucketLifecycleInput>,
) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
record_s3_op(S3Operation::DeleteBucketLifecycle, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket_lifecycle(req).await
}
@@ -272,7 +263,6 @@ impl S3 for FS {
&self,
req: S3Request<DeleteBucketPolicyInput>,
) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
record_s3_op(S3Operation::DeleteBucketPolicy, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket_policy(req).await
}
@@ -281,7 +271,6 @@ impl S3 for FS {
&self,
req: S3Request<DeleteBucketReplicationInput>,
) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
record_s3_op(S3Operation::DeleteBucketReplication, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket_replication(req).await
}
@@ -291,7 +280,6 @@ impl S3 for FS {
&self,
req: S3Request<DeleteBucketTaggingInput>,
) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
record_s3_op(S3Operation::DeleteBucketTagging, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_bucket_tagging(req).await
}
@@ -301,7 +289,6 @@ impl S3 for FS {
&self,
req: S3Request<DeletePublicAccessBlockInput>,
) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
record_s3_op(S3Operation::DeletePublicAccessBlock, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_delete_public_access_block(req).await
}
@@ -309,7 +296,6 @@ impl S3 for FS {
/// Delete an object
#[instrument(level = "debug", skip(self, req))]
async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
record_s3_op(S3Operation::DeleteObject, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_delete_object(req).await
}
@@ -319,7 +305,6 @@ impl S3 for FS {
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
record_s3_op(S3Operation::DeleteObjectTagging, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_delete_object_tagging(req).await
}
@@ -327,7 +312,6 @@ impl S3 for FS {
/// Delete multiple objects
#[instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
record_s3_op(S3Operation::DeleteObjects, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_delete_objects(req).await
}
@@ -439,7 +423,6 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
record_s3_op(S3Operation::GetObject, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_get_object(req).await
}
@@ -454,7 +437,6 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectAttributesInput>,
) -> S3Result<S3Response<GetObjectAttributesOutput>> {
record_s3_op(S3Operation::GetObjectAttributes, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_get_object_attributes(req).await
}
@@ -463,7 +445,6 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectLegalHoldInput>,
) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
record_s3_op(S3Operation::GetObjectLegalHold, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_get_object_legal_hold(req).await
}
@@ -482,7 +463,6 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectRetentionInput>,
) -> S3Result<S3Response<GetObjectRetentionOutput>> {
record_s3_op(S3Operation::GetObjectRetention, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_get_object_retention(req).await
}
@@ -505,14 +485,12 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
record_s3_op(S3Operation::HeadBucket, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_head_bucket(req).await
}
#[instrument(level = "debug", skip(self, req))]
async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
record_s3_op(S3Operation::HeadObject, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_head_object(req).await
}
@@ -565,14 +543,12 @@ impl S3 for FS {
}
async fn put_bucket_acl(&self, req: S3Request<PutBucketAclInput>) -> S3Result<S3Response<PutBucketAclOutput>> {
record_s3_op(S3Operation::PutBucketAcl, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_acl(req).await
}
#[instrument(level = "debug", skip(self))]
async fn put_bucket_cors(&self, req: S3Request<PutBucketCorsInput>) -> S3Result<S3Response<PutBucketCorsOutput>> {
record_s3_op(S3Operation::PutBucketCors, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_cors(req).await
}
@@ -605,7 +581,6 @@ impl S3 for FS {
&self,
req: S3Request<PutBucketEncryptionInput>,
) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
record_s3_op(S3Operation::PutBucketEncryption, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_encryption(req).await
}
@@ -615,7 +590,6 @@ impl S3 for FS {
&self,
req: S3Request<PutBucketLifecycleConfigurationInput>,
) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
record_s3_op(S3Operation::PutBucketLifecycleConfiguration, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_lifecycle_configuration(req).await
}
@@ -624,13 +598,11 @@ impl S3 for FS {
&self,
req: S3Request<PutBucketNotificationConfigurationInput>,
) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
record_s3_op(S3Operation::PutBucketNotificationConfiguration, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_notification_configuration(req).await
}
async fn put_bucket_policy(&self, req: S3Request<PutBucketPolicyInput>) -> S3Result<S3Response<PutBucketPolicyOutput>> {
record_s3_op(S3Operation::PutBucketPolicy, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_policy(req).await
}
@@ -639,7 +611,6 @@ impl S3 for FS {
&self,
req: S3Request<PutBucketReplicationInput>,
) -> S3Result<S3Response<PutBucketReplicationOutput>> {
record_s3_op(S3Operation::PutBucketReplication, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_replication(req).await
}
@@ -649,14 +620,12 @@ impl S3 for FS {
&self,
req: S3Request<PutPublicAccessBlockInput>,
) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
record_s3_op(S3Operation::PutPublicAccessBlock, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_public_access_block(req).await
}
#[instrument(level = "debug", skip(self))]
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
record_s3_op(S3Operation::PutBucketTagging, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_tagging(req).await
}
@@ -666,14 +635,12 @@ impl S3 for FS {
&self,
req: S3Request<PutBucketVersioningInput>,
) -> S3Result<S3Response<PutBucketVersioningOutput>> {
record_s3_op(S3Operation::PutBucketVersioning, &req.input.bucket);
let usecase = DefaultBucketUsecase::from_global();
usecase.execute_put_bucket_versioning(req).await
}
#[instrument(level = "debug", skip(self, req))]
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
record_s3_op(S3Operation::PutObject, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_put_object(self, req).await
}
@@ -688,7 +655,6 @@ impl S3 for FS {
&self,
req: S3Request<PutObjectLegalHoldInput>,
) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
record_s3_op(S3Operation::PutObjectLegalHold, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_put_object_legal_hold(req).await
}
@@ -698,7 +664,6 @@ impl S3 for FS {
&self,
req: S3Request<PutObjectLockConfigurationInput>,
) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
record_s3_op(S3Operation::PutObjectLockConfiguration, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_put_object_lock_configuration(req).await
}
@@ -707,20 +672,17 @@ impl S3 for FS {
&self,
req: S3Request<PutObjectRetentionInput>,
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
record_s3_op(S3Operation::PutObjectRetention, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_put_object_retention(req).await
}
#[instrument(level = "debug", skip(self, req))]
async fn put_object_tagging(&self, req: S3Request<PutObjectTaggingInput>) -> S3Result<S3Response<PutObjectTaggingOutput>> {
record_s3_op(S3Operation::PutObjectTagging, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_put_object_tagging(req).await
}
async fn restore_object(&self, req: S3Request<RestoreObjectInput>) -> S3Result<S3Response<RestoreObjectOutput>> {
record_s3_op(S3Operation::RestoreObject, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_restore_object(req).await
}
@@ -729,7 +691,6 @@ impl S3 for FS {
&self,
req: S3Request<SelectObjectContentInput>,
) -> S3Result<S3Response<SelectObjectContentOutput>> {
record_s3_op(S3Operation::SelectObjectContent, &req.input.bucket);
let usecase = DefaultObjectUsecase::from_global();
usecase.execute_select_object_content(req).await
}

View File

@@ -19,7 +19,8 @@ use rustfs_audit::{
};
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_targets::EventName;
use rustfs_s3_common::record_s3_op;
use rustfs_s3_common::{EventName, S3Operation};
use rustfs_utils::{
extract_params_header, extract_req_params, extract_resp_elements, get_request_host, get_request_port, get_request_user_agent,
};
@@ -54,7 +55,7 @@ pub struct OperationHelper {
impl OperationHelper {
/// Create a new OperationHelper for S3 requests.
pub fn new(req: &S3Request<impl Send + Sync>, event: EventName, trigger: &'static str) -> Self {
pub fn new(req: &S3Request<impl Send + Sync>, event: EventName, op: S3Operation) -> Self {
// Parse path -> bucket/object
let path = req.uri.path().trim_start_matches('/');
let mut segs = path.splitn(2, '/');
@@ -70,6 +71,11 @@ impl OperationHelper {
.unwrap_or("")
.to_string();
let trigger = op.as_str();
let bucket_label = if bucket.is_empty() { "*" } else { &bucket };
record_s3_op(op, bucket_label);
// Initialize audit builder
let mut api_builder = ApiDetailsBuilder::new().name(trigger);
if !bucket.is_empty() {

View File

@@ -21,7 +21,6 @@ pub mod options;
pub(crate) mod readers;
pub mod rpc;
pub(crate) mod s3_api;
pub(crate) mod s3_metrics;
mod sse;
pub mod tonic_service;

View File

@@ -1,174 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use metrics::{counter, describe_counter};
use std::sync::OnceLock;
const S3_OPS_METRIC: &str = "rustfs_s3_operations_total";
#[derive(Clone, Copy)]
pub enum S3Operation {
AbortMultipartUpload,
CompleteMultipartUpload,
CopyObject,
CreateBucket,
CreateMultipartUpload,
DeleteBucket,
DeleteBucketCors,
DeleteBucketEncryption,
DeleteBucketLifecycle,
DeleteBucketPolicy,
DeleteBucketReplication,
DeleteBucketTagging,
DeleteObject,
DeleteObjectTagging,
DeleteObjects,
DeletePublicAccessBlock,
GetBucketAcl,
GetBucketCors,
GetBucketEncryption,
GetBucketLifecycleConfiguration,
GetBucketLocation,
GetBucketLogging,
GetBucketNotificationConfiguration,
GetBucketPolicy,
GetBucketPolicyStatus,
GetBucketReplication,
GetBucketTagging,
GetBucketVersioning,
GetObject,
GetObjectAcl,
GetObjectAttributes,
GetObjectLegalHold,
GetObjectLockConfiguration,
GetObjectRetention,
GetObjectTagging,
GetObjectTorrent,
GetPublicAccessBlock,
HeadBucket,
HeadObject,
ListBuckets,
ListMultipartUploads,
ListObjectVersions,
ListObjects,
ListObjectsV2,
ListParts,
PutBucketAcl,
PutBucketCors,
PutBucketEncryption,
PutBucketLifecycleConfiguration,
PutBucketLogging,
PutBucketNotificationConfiguration,
PutBucketPolicy,
PutBucketReplication,
PutBucketTagging,
PutBucketVersioning,
PutObject,
PutObjectAcl,
PutObjectLegalHold,
PutObjectLockConfiguration,
PutObjectRetention,
PutObjectTagging,
PutPublicAccessBlock,
RestoreObject,
SelectObjectContent,
UploadPart,
UploadPartCopy,
}
impl S3Operation {
pub fn as_str(self) -> &'static str {
match self {
Self::AbortMultipartUpload => "AbortMultipartUpload",
Self::CompleteMultipartUpload => "CompleteMultipartUpload",
Self::CopyObject => "CopyObject",
Self::CreateBucket => "CreateBucket",
Self::CreateMultipartUpload => "CreateMultipartUpload",
Self::DeleteBucket => "DeleteBucket",
Self::DeleteBucketCors => "DeleteBucketCors",
Self::DeleteBucketEncryption => "DeleteBucketEncryption",
Self::DeleteBucketLifecycle => "DeleteBucketLifecycle",
Self::DeleteBucketPolicy => "DeleteBucketPolicy",
Self::DeleteBucketReplication => "DeleteBucketReplication",
Self::DeleteBucketTagging => "DeleteBucketTagging",
Self::DeleteObject => "DeleteObject",
Self::DeleteObjectTagging => "DeleteObjectTagging",
Self::DeleteObjects => "DeleteObjects",
Self::DeletePublicAccessBlock => "DeletePublicAccessBlock",
Self::GetBucketAcl => "GetBucketAcl",
Self::GetBucketCors => "GetBucketCors",
Self::GetBucketEncryption => "GetBucketEncryption",
Self::GetBucketLifecycleConfiguration => "GetBucketLifecycleConfiguration",
Self::GetBucketLocation => "GetBucketLocation",
Self::GetBucketLogging => "GetBucketLogging",
Self::GetBucketNotificationConfiguration => "GetBucketNotificationConfiguration",
Self::GetBucketPolicy => "GetBucketPolicy",
Self::GetBucketPolicyStatus => "GetBucketPolicyStatus",
Self::GetBucketReplication => "GetBucketReplication",
Self::GetBucketTagging => "GetBucketTagging",
Self::GetBucketVersioning => "GetBucketVersioning",
Self::GetObject => "GetObject",
Self::GetObjectAcl => "GetObjectAcl",
Self::GetObjectAttributes => "GetObjectAttributes",
Self::GetObjectLegalHold => "GetObjectLegalHold",
Self::GetObjectLockConfiguration => "GetObjectLockConfiguration",
Self::GetObjectRetention => "GetObjectRetention",
Self::GetObjectTagging => "GetObjectTagging",
Self::GetObjectTorrent => "GetObjectTorrent",
Self::GetPublicAccessBlock => "GetPublicAccessBlock",
Self::HeadBucket => "HeadBucket",
Self::HeadObject => "HeadObject",
Self::ListBuckets => "ListBuckets",
Self::ListMultipartUploads => "ListMultipartUploads",
Self::ListObjectVersions => "ListObjectVersions",
Self::ListObjects => "ListObjects",
Self::ListObjectsV2 => "ListObjectsV2",
Self::ListParts => "ListParts",
Self::PutBucketAcl => "PutBucketAcl",
Self::PutBucketCors => "PutBucketCors",
Self::PutBucketEncryption => "PutBucketEncryption",
Self::PutBucketLifecycleConfiguration => "PutBucketLifecycleConfiguration",
Self::PutBucketLogging => "PutBucketLogging",
Self::PutBucketNotificationConfiguration => "PutBucketNotificationConfiguration",
Self::PutBucketPolicy => "PutBucketPolicy",
Self::PutBucketReplication => "PutBucketReplication",
Self::PutBucketTagging => "PutBucketTagging",
Self::PutBucketVersioning => "PutBucketVersioning",
Self::PutObject => "PutObject",
Self::PutObjectAcl => "PutObjectAcl",
Self::PutObjectLegalHold => "PutObjectLegalHold",
Self::PutObjectLockConfiguration => "PutObjectLockConfiguration",
Self::PutObjectRetention => "PutObjectRetention",
Self::PutObjectTagging => "PutObjectTagging",
Self::PutPublicAccessBlock => "PutPublicAccessBlock",
Self::RestoreObject => "RestoreObject",
Self::SelectObjectContent => "SelectObjectContent",
Self::UploadPart => "UploadPart",
Self::UploadPartCopy => "UploadPartCopy",
}
}
}
pub fn record_s3_op(op: S3Operation, bucket: &str) {
counter!(S3_OPS_METRIC, "op" => op.as_str(), "bucket" => bucket.to_owned()).increment(1);
}
/// One-time registration of indicator meta information
/// This function ensures that metric descriptors are registered only once.
pub fn init_s3_metrics() {
static METRICS_DESC_INIT: OnceLock<()> = OnceLock::new();
METRICS_DESC_INIT.get_or_init(|| {
describe_counter!(S3_OPS_METRIC, "Total number of S3 API operations handled");
});
}