diff --git a/crates/notify/src/event.rs b/crates/notify/src/event.rs index 8f2a036f..147db1d1 100644 --- a/crates/notify/src/event.rs +++ b/crates/notify/src/event.rs @@ -451,6 +451,11 @@ impl Event { let key_name = form_urlencoded::byte_serialize(args.object.name.as_bytes()).collect::(); let principal_id = args.req_params.get("principalId").unwrap_or(&String::new()).to_string(); + let version_id = match args.object.version_id { + Some(id) => Some(id.to_string()), + None => Some(args.version_id.clone()), + }; + let mut s3_metadata = Metadata { schema_version: "1.0".to_string(), configuration_id: "Config".to_string(), // or from args @@ -463,7 +468,7 @@ impl Event { }, object: Object { key: key_name, - version_id: Some(args.object.version_id.unwrap().to_string()), + version_id, sequencer: unique_id, ..Default::default() }, @@ -531,6 +536,7 @@ pub struct EventArgs { pub object: ecstore::store_api::ObjectInfo, pub req_params: HashMap, pub resp_elements: HashMap, + pub version_id: String, pub host: String, pub user_agent: String, } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 916a2e1b..17ba383d 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -16,8 +16,8 @@ use bytes::Bytes; use chrono::DateTime; use chrono::Utc; use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; -use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; use datafusion::arrow::json::writer::JsonArray; +use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; // use ecstore::store_api::RESERVED_METADATA_PREFIX; use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier; @@ -35,13 +35,13 @@ use ecstore::bucket::tagging::decode_tags; use ecstore::bucket::tagging::encode_tags; use ecstore::bucket::utils::serialize; use ecstore::bucket::versioning_sys::BucketVersioningSys; -use ecstore::cmd::bucket_replication::ReplicationStatusType; -use ecstore::cmd::bucket_replication::ReplicationType; use ecstore::cmd::bucket_replication::get_must_replicate_options; use ecstore::cmd::bucket_replication::must_replicate; use ecstore::cmd::bucket_replication::schedule_replication; -use ecstore::compress::MIN_COMPRESSIBLE_SIZE; +use ecstore::cmd::bucket_replication::ReplicationStatusType; +use ecstore::cmd::bucket_replication::ReplicationType; use ecstore::compress::is_compressible; +use ecstore::compress::MIN_COMPRESSIBLE_SIZE; use ecstore::error::StorageError; use ecstore::new_object_layer_fn; use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; @@ -60,11 +60,11 @@ use futures::StreamExt; use http::HeaderMap; use lazy_static::lazy_static; use policy::auth; +use policy::policy::action::Action; +use policy::policy::action::S3Action; use policy::policy::BucketPolicy; use policy::policy::BucketPolicyArgs; use policy::policy::Validator; -use policy::policy::action::Action; -use policy::policy::action::S3Action; use query::instance::make_rustfsms; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING}; @@ -73,23 +73,23 @@ use rustfs_rio::EtagReader; use rustfs_rio::HashReader; use rustfs_rio::Reader; use rustfs_rio::WarpReader; -use rustfs_utils::CompressionAlgorithm; use rustfs_utils::path::path_join_buf; +use rustfs_utils::CompressionAlgorithm; use rustfs_zip::CompressionFormat; -use s3s::S3; +use s3s::dto::*; +use s3s::s3_error; use s3s::S3Error; use s3s::S3ErrorCode; use s3s::S3Result; -use s3s::dto::*; -use s3s::s3_error; +use s3s::S3; use s3s::{S3Request, S3Response}; use std::collections::HashMap; use std::fmt::Debug; use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive; @@ -134,8 +134,14 @@ impl FS { } async fn put_object_extract(&self, req: S3Request) -> S3Result> { - let PutObjectInput { body, bucket, key, .. } = req.input; - + let PutObjectInput { + body, + bucket, + key, + version_id, + .. + } = req.input; + let event_version_id = version_id; let Some(body) = body else { return Err(s3_error!(IncompleteBody)) }; let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string())))); @@ -169,7 +175,10 @@ impl FS { .get("X-Amz-Meta-Rustfs-Snowball-Prefix") .map(|v| v.to_str().unwrap_or_default()) .unwrap_or_default(); - + let version_id = match event_version_id { + Some(v) => v.to_string(), + None => String::new(), + }; while let Some(entry) = entries.next().await { let f = match entry { Ok(f) => f, @@ -231,16 +240,17 @@ impl FS { }; let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedPut, // 或者其他相应的事件类型 + event_name: EventName::ObjectCreatedPut, bucket_name: bucket.clone(), - object: _obj_info.clone(), // clone() 或传递所需字段 - req_params: rustfs_utils::extract_req_params_header(&req.headers), // 假设有一个辅助函数来提取请求参数 - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), // 假设有一个辅助函数来提取响应元素 - host: rustfs_utils::get_request_host(&req.headers), // 假设的辅助函数 - user_agent: rustfs_utils::get_request_user_agent(&req.headers), // 假设的辅助函数 + object: _obj_info.clone(), + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id: version_id.clone(), + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), }; - // 异步调用,不会阻塞当前请求的响应 + // Asynchronous call will not block the response of the current request tokio::spawn(async move { rustfs_notify::global::notifier_instance().notify(event_args).await; }); @@ -301,6 +311,23 @@ impl S3 for FS { .map_err(ApiError::from)?; let output = CreateBucketOutput::default(); + + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::BucketCreated, + bucket_name: bucket.clone(), + object: ecstore::store_api::ObjectInfo { ..Default::default() }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id: String::new(), + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -312,7 +339,7 @@ impl S3 for FS { bucket, key, .. - } = req.input; + } = req.input.clone(); let (src_bucket, src_key, version_id) = match copy_source { CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)), CopySource::Bucket { @@ -414,7 +441,7 @@ impl S3 for FS { .map_err(ApiError::from)?; // warn!("copy_object oi {:?}", &oi); - + let object_info = oi.clone(); let copy_object_result = CopyObjectResult { e_tag: oi.etag, last_modified: oi.mod_time.map(Timestamp::from), @@ -425,6 +452,28 @@ impl S3 for FS { copy_object_result: Some(copy_object_result), ..Default::default() }; + + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => String::new(), + }; + + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedCopy, + bucket_name: bucket.clone(), + object: object_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -448,6 +497,22 @@ impl S3 for FS { .await .map_err(ApiError::from)?; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::BucketRemoved, + bucket_name: input.bucket, + object: ecstore::store_api::ObjectInfo { ..Default::default() }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})), + version_id: String::new(), + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(DeleteBucketOutput {})) } @@ -456,7 +521,7 @@ impl S3 for FS { async fn delete_object(&self, req: S3Request) -> S3Result> { let DeleteObjectInput { bucket, key, version_id, .. - } = req.input; + } = req.input.clone(); let metadata = extract_metadata(&req.headers); @@ -466,7 +531,7 @@ impl S3 for FS { let version_id = opts.version_id.as_ref().map(|v| Uuid::parse_str(v).ok()).unwrap_or_default(); let dobj = ObjectToDelete { - object_name: key, + object_name: key.clone(), version_id, }; @@ -496,12 +561,33 @@ impl S3 for FS { (None, None) } }; - + let del_version_id = version_id.as_ref().map(|v| v.to_string()).unwrap_or_default(); let output = DeleteObjectOutput { delete_marker, version_id, ..Default::default() }; + + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectRemovedDelete, + bucket_name: bucket.clone(), + object: ecstore::store_api::ObjectInfo { + name: key, + bucket, + ..Default::default() + }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})), + version_id: del_version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -556,6 +642,38 @@ impl S3 for FS { // errors, ..Default::default() }; + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + for dobj in dobjs { + let version_id = match dobj.version_id { + None => String::new(), + Some(v) => v.to_string(), + }; + let mut event_name = EventName::ObjectRemovedDelete; + if dobj.delete_marker { + event_name = EventName::ObjectRemovedDeleteMarkerCreated; + } + + let event_args = rustfs_notify::event::EventArgs { + event_name, + bucket_name: bucket.clone(), + object: ecstore::store_api::ObjectInfo { + name: dobj.object_name, + bucket: bucket.clone(), + ..Default::default() + }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectsOutput { + ..Default::default() + })), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + rustfs_notify::global::notifier_instance().notify(event_args).await; + } + }); + Ok(S3Response::new(output)) } @@ -594,7 +712,7 @@ impl S3 for FS { part_number, range, .. - } = req.input; + } = req.input.clone(); // TODO: getObjectInArchiveFileHandler object = xxx.zip/xxx/xxx.xxx @@ -641,7 +759,7 @@ impl S3 for FS { .map_err(ApiError::from)?; let info = reader.object_info; - + let event_info = info.clone(); let content_type = { if let Some(content_type) = info.content_type { match ContentType::from_str(&content_type) { @@ -671,6 +789,26 @@ impl S3 for FS { ..Default::default() }; + let version_id = match req.input.version_id { + None => String::new(), + Some(v) => v.to_string(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectAccessedGet, + bucket_name: bucket.clone(), + object: event_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(GetObjectOutput { ..Default::default() })), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -701,7 +839,7 @@ impl S3 for FS { part_number, range, .. - } = req.input; + } = req.input.clone(); let part_number = part_number.map(|v| v as usize); @@ -739,7 +877,7 @@ impl S3 for FS { let info = store.get_object_info(&bucket, &key, &opts).await.map_err(ApiError::from)?; // warn!("head_object info {:?}", &info); - + let event_info = info.clone(); let content_type = { if let Some(content_type) = &info.content_type { match ContentType::from_str(content_type) { @@ -772,6 +910,27 @@ impl S3 for FS { // metadata: object_metadata, ..Default::default() }; + + let version_id = match req.input.version_id { + None => String::new(), + Some(v) => v.to_string(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectAccessedGet, + bucket_name: bucket, + object: event_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -1015,7 +1174,7 @@ impl S3 for FS { return Err(s3_error!(InvalidStorageClass)); } } - + let event_version_id = input.version_id.as_ref().map(|v| v.to_string()).unwrap_or_default(); let PutObjectInput { body, bucket, @@ -1108,7 +1267,7 @@ impl S3 for FS { .put_object(&bucket, &key, &mut reader, &opts) .await .map_err(ApiError::from)?; - + let event_info = obj_info.clone(); let e_tag = obj_info.etag.clone(); let repoptions = @@ -1125,6 +1284,23 @@ impl S3 for FS { e_tag, ..Default::default() }; + + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedPut, + bucket_name: bucket, + object: event_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id: event_version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -1139,7 +1315,7 @@ impl S3 for FS { tagging, version_id, .. - } = req.input; + } = req.input.clone(); // mc cp step 3 @@ -1170,7 +1346,8 @@ impl S3 for FS { .new_multipart_upload(&bucket, &key, &opts) .await .map_err(ApiError::from)?; - + let object_name = key.clone(); + let bucket_name = bucket.clone(); let output = CreateMultipartUploadOutput { bucket: Some(bucket), key: Some(key), @@ -1178,6 +1355,30 @@ impl S3 for FS { ..Default::default() }; + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => String::new(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedCompleteMultipartUpload, + bucket_name: bucket_name.clone(), + object: ecstore::store_api::ObjectInfo { + name: object_name, + bucket: bucket_name, + ..Default::default() + }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(output)) } @@ -1257,6 +1458,7 @@ impl S3 for FS { e_tag: info.etag, ..Default::default() }; + Ok(S3Response::new(output)) } @@ -1429,7 +1631,7 @@ impl S3 for FS { key: object, tagging, .. - } = req.input; + } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -1445,6 +1647,30 @@ impl S3 for FS { .await .map_err(ApiError::from)?; + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => String::new(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedPutTagging, + bucket_name: bucket.clone(), + object: ecstore::store_api::ObjectInfo { + name: object.clone(), + bucket, + ..Default::default() + }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(PutObjectTaggingOutput { version_id: None })), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(PutObjectTaggingOutput { version_id: None })) } @@ -1475,7 +1701,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { - let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input; + let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -1488,6 +1714,30 @@ impl S3 for FS { .await .map_err(ApiError::from)?; + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => Uuid::new_v4().to_string(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedDeleteTagging, + bucket_name: bucket.clone(), + object: ecstore::store_api::ObjectInfo { + name: object.clone(), + bucket, + ..Default::default() + }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectTaggingOutput { version_id: None })), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None })) } @@ -2196,7 +2446,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { - let GetObjectAttributesInput { bucket, key, .. } = req.input; + let GetObjectAttributesInput { bucket, key, .. } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -2209,11 +2459,36 @@ impl S3 for FS { return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{e}"))); } - Ok(S3Response::new(GetObjectAttributesOutput { + let output = GetObjectAttributesOutput { delete_marker: None, object_parts: None, ..Default::default() - })) + }; + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => String::new(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectAccessedAttributes, + bucket_name: bucket.clone(), + object: ecstore::store_api::ObjectInfo { + name: key.clone(), + bucket, + ..Default::default() + }, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + + Ok(S3Response::new(output)) } async fn put_object_acl(&self, req: S3Request) -> S3Result> { @@ -2328,7 +2603,7 @@ impl S3 for FS { ) -> S3Result> { let GetObjectLegalHoldInput { bucket, key, version_id, .. - } = req.input; + } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -2362,11 +2637,33 @@ impl S3 for FS { ObjectLockLegalHoldStatus::OFF.to_string() }; - Ok(S3Response::new(GetObjectLegalHoldOutput { + let output = GetObjectLegalHoldOutput { legal_hold: Some(ObjectLockLegalHold { status: Some(ObjectLockLegalHoldStatus::from(status)), }), - })) + }; + + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => Uuid::new_v4().to_string(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectAccessedGetLegalHold, + bucket_name: bucket.clone(), + object: object_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + + Ok(S3Response::new(output)) } async fn put_object_legal_hold( @@ -2379,7 +2676,7 @@ impl S3 for FS { legal_hold, version_id, .. - } = req.input; + } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -2417,14 +2714,35 @@ impl S3 for FS { ..Default::default() }; - store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| { + let info = store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| { error!("put_object_metadata failed, {}", e.to_string()); s3_error!(InternalError, "{}", e.to_string()) })?; - Ok(S3Response::new(PutObjectLegalHoldOutput { + let output = PutObjectLegalHoldOutput { request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), - })) + }; + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => String::new(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedPutLegalHold, + bucket_name: bucket.clone(), + object: info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + + Ok(S3Response::new(output)) } async fn get_object_retention( @@ -2433,7 +2751,7 @@ impl S3 for FS { ) -> S3Result> { let GetObjectRetentionInput { bucket, key, version_id, .. - } = req.input; + } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -2462,9 +2780,30 @@ impl S3 for FS { .and_then(|v| OffsetDateTime::parse(v.as_str(), &Rfc3339).ok()) .map(Timestamp::from); - Ok(S3Response::new(GetObjectRetentionOutput { + let output = GetObjectRetentionOutput { retention: Some(ObjectLockRetention { mode, retain_until_date }), - })) + }; + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => String::new(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectAccessedGetRetention, + bucket_name: bucket.clone(), + object: object_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + + Ok(S3Response::new(output)) } async fn put_object_retention( @@ -2477,7 +2816,7 @@ impl S3 for FS { retention, version_id, .. - } = req.input; + } = req.input.clone(); let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -2510,13 +2849,35 @@ impl S3 for FS { .map_err(ApiError::from)?; opts.eval_metadata = Some(eval_metadata); - store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| { + let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| { error!("put_object_metadata failed, {}", e.to_string()); s3_error!(InternalError, "{}", e.to_string()) })?; - Ok(S3Response::new(PutObjectRetentionOutput { + let output = PutObjectRetentionOutput { request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), - })) + }; + + let version_id = match req.input.version_id { + Some(v) => v.to_string(), + None => Uuid::new_v4().to_string(), + }; + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedPutRetention, + bucket_name: bucket.clone(), + object: object_info, + req_params: rustfs_utils::extract_req_params_header(&req.headers), + resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + version_id, + host: rustfs_utils::get_request_host(&req.headers), + user_agent: rustfs_utils::get_request_user_agent(&req.headers), + }; + + // Asynchronous call will not block the response of the current request + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); + + Ok(S3Response::new(output)) } }