add notify

This commit is contained in:
houseme
2025-06-30 22:36:17 +08:00
parent c5358ba62c
commit 57c8c76e58
2 changed files with 421 additions and 54 deletions

View File

@@ -451,6 +451,11 @@ impl Event {
let key_name = form_urlencoded::byte_serialize(args.object.name.as_bytes()).collect::<String>();
let principal_id = args.req_params.get("principalId").unwrap_or(&String::new()).to_string();
let version_id = match args.object.version_id {
Some(id) => Some(id.to_string()),
None => Some(args.version_id.clone()),
};
let mut s3_metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "Config".to_string(), // or from args
@@ -463,7 +468,7 @@ impl Event {
},
object: Object {
key: key_name,
version_id: Some(args.object.version_id.unwrap().to_string()),
version_id,
sequencer: unique_id,
..Default::default()
},
@@ -531,6 +536,7 @@ pub struct EventArgs {
pub object: ecstore::store_api::ObjectInfo,
pub req_params: HashMap<String, String>,
pub resp_elements: HashMap<String, String>,
pub version_id: String,
pub host: String,
pub user_agent: String,
}

View File

@@ -16,8 +16,8 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
@@ -35,13 +35,13 @@ use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::utils::serialize;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::cmd::bucket_replication::get_must_replicate_options;
use ecstore::cmd::bucket_replication::must_replicate;
use ecstore::cmd::bucket_replication::schedule_replication;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::compress::is_compressible;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::error::StorageError;
use ecstore::new_object_layer_fn;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
@@ -60,11 +60,11 @@ use futures::StreamExt;
use http::HeaderMap;
use lazy_static::lazy_static;
use policy::auth;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use query::instance::make_rustfsms;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
@@ -73,23 +73,23 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_utils::CompressionAlgorithm;
use rustfs_zip::CompressionFormat;
use s3s::S3;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
@@ -134,8 +134,14 @@ impl FS {
}
async fn put_object_extract(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let PutObjectInput { body, bucket, key, .. } = req.input;
let PutObjectInput {
body,
bucket,
key,
version_id,
..
} = req.input;
let event_version_id = version_id;
let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::other(e.to_string()))));
@@ -169,7 +175,10 @@ impl FS {
.get("X-Amz-Meta-Rustfs-Snowball-Prefix")
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default();
let version_id = match event_version_id {
Some(v) => v.to_string(),
None => String::new(),
};
while let Some(entry) = entries.next().await {
let f = match entry {
Ok(f) => f,
@@ -231,16 +240,17 @@ impl FS {
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPut, // 或者其他相应的事件类型
event_name: EventName::ObjectCreatedPut,
bucket_name: bucket.clone(),
object: _obj_info.clone(), // clone() 或传递所需字段
req_params: rustfs_utils::extract_req_params_header(&req.headers), // 假设有一个辅助函数来提取请求参数
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), // 假设有一个辅助函数来提取响应元素
host: rustfs_utils::get_request_host(&req.headers), // 假设的辅助函数
user_agent: rustfs_utils::get_request_user_agent(&req.headers), // 假设的辅助函数
object: _obj_info.clone(),
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: version_id.clone(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// 异步调用,不会阻塞当前请求的响应
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
@@ -301,6 +311,23 @@ impl S3 for FS {
.map_err(ApiError::from)?;
let output = CreateBucketOutput::default();
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::BucketCreated,
bucket_name: bucket.clone(),
object: ecstore::store_api::ObjectInfo { ..Default::default() },
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: String::new(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -312,7 +339,7 @@ impl S3 for FS {
bucket,
key,
..
} = req.input;
} = req.input.clone();
let (src_bucket, src_key, version_id) = match copy_source {
CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
CopySource::Bucket {
@@ -414,7 +441,7 @@ impl S3 for FS {
.map_err(ApiError::from)?;
// warn!("copy_object oi {:?}", &oi);
let object_info = oi.clone();
let copy_object_result = CopyObjectResult {
e_tag: oi.etag,
last_modified: oi.mod_time.map(Timestamp::from),
@@ -425,6 +452,28 @@ impl S3 for FS {
copy_object_result: Some(copy_object_result),
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedCopy,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -448,6 +497,22 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::BucketRemoved,
bucket_name: input.bucket,
object: ecstore::store_api::ObjectInfo { ..Default::default() },
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})),
version_id: String::new(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteBucketOutput {}))
}
@@ -456,7 +521,7 @@ impl S3 for FS {
async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let DeleteObjectInput {
bucket, key, version_id, ..
} = req.input;
} = req.input.clone();
let metadata = extract_metadata(&req.headers);
@@ -466,7 +531,7 @@ impl S3 for FS {
let version_id = opts.version_id.as_ref().map(|v| Uuid::parse_str(v).ok()).unwrap_or_default();
let dobj = ObjectToDelete {
object_name: key,
object_name: key.clone(),
version_id,
};
@@ -496,12 +561,33 @@ impl S3 for FS {
(None, None)
}
};
let del_version_id = version_id.as_ref().map(|v| v.to_string()).unwrap_or_default();
let output = DeleteObjectOutput {
delete_marker,
version_id,
..Default::default()
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectRemovedDelete,
bucket_name: bucket.clone(),
object: ecstore::store_api::ObjectInfo {
name: key,
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})),
version_id: del_version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -556,6 +642,38 @@ impl S3 for FS {
// errors,
..Default::default()
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
for dobj in dobjs {
let version_id = match dobj.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let mut event_name = EventName::ObjectRemovedDelete;
if dobj.delete_marker {
event_name = EventName::ObjectRemovedDeleteMarkerCreated;
}
let event_args = rustfs_notify::event::EventArgs {
event_name,
bucket_name: bucket.clone(),
object: ecstore::store_api::ObjectInfo {
name: dobj.object_name,
bucket: bucket.clone(),
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectsOutput {
..Default::default()
})),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
rustfs_notify::global::notifier_instance().notify(event_args).await;
}
});
Ok(S3Response::new(output))
}
@@ -594,7 +712,7 @@ impl S3 for FS {
part_number,
range,
..
} = req.input;
} = req.input.clone();
// TODO: getObjectInArchiveFileHandler object = xxx.zip/xxx/xxx.xxx
@@ -641,7 +759,7 @@ impl S3 for FS {
.map_err(ApiError::from)?;
let info = reader.object_info;
let event_info = info.clone();
let content_type = {
if let Some(content_type) = info.content_type {
match ContentType::from_str(&content_type) {
@@ -671,6 +789,26 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGet,
bucket_name: bucket.clone(),
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(GetObjectOutput { ..Default::default() })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -701,7 +839,7 @@ impl S3 for FS {
part_number,
range,
..
} = req.input;
} = req.input.clone();
let part_number = part_number.map(|v| v as usize);
@@ -739,7 +877,7 @@ impl S3 for FS {
let info = store.get_object_info(&bucket, &key, &opts).await.map_err(ApiError::from)?;
// warn!("head_object info {:?}", &info);
let event_info = info.clone();
let content_type = {
if let Some(content_type) = &info.content_type {
match ContentType::from_str(content_type) {
@@ -772,6 +910,27 @@ impl S3 for FS {
// metadata: object_metadata,
..Default::default()
};
let version_id = match req.input.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGet,
bucket_name: bucket,
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -1015,7 +1174,7 @@ impl S3 for FS {
return Err(s3_error!(InvalidStorageClass));
}
}
let event_version_id = input.version_id.as_ref().map(|v| v.to_string()).unwrap_or_default();
let PutObjectInput {
body,
bucket,
@@ -1108,7 +1267,7 @@ impl S3 for FS {
.put_object(&bucket, &key, &mut reader, &opts)
.await
.map_err(ApiError::from)?;
let event_info = obj_info.clone();
let e_tag = obj_info.etag.clone();
let repoptions =
@@ -1125,6 +1284,23 @@ impl S3 for FS {
e_tag,
..Default::default()
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPut,
bucket_name: bucket,
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: event_version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -1139,7 +1315,7 @@ impl S3 for FS {
tagging,
version_id,
..
} = req.input;
} = req.input.clone();
// mc cp step 3
@@ -1170,7 +1346,8 @@ impl S3 for FS {
.new_multipart_upload(&bucket, &key, &opts)
.await
.map_err(ApiError::from)?;
let object_name = key.clone();
let bucket_name = bucket.clone();
let output = CreateMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),
@@ -1178,6 +1355,30 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedCompleteMultipartUpload,
bucket_name: bucket_name.clone(),
object: ecstore::store_api::ObjectInfo {
name: object_name,
bucket: bucket_name,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
@@ -1257,6 +1458,7 @@ impl S3 for FS {
e_tag: info.etag,
..Default::default()
};
Ok(S3Response::new(output))
}
@@ -1429,7 +1631,7 @@ impl S3 for FS {
key: object,
tagging,
..
} = req.input;
} = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -1445,6 +1647,30 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutTagging,
bucket_name: bucket.clone(),
object: ecstore::store_api::ObjectInfo {
name: object.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(PutObjectTaggingOutput { version_id: None })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
}
@@ -1475,7 +1701,7 @@ impl S3 for FS {
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input;
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -1488,6 +1714,30 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedDeleteTagging,
bucket_name: bucket.clone(),
object: ecstore::store_api::ObjectInfo {
name: object.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectTaggingOutput { version_id: None })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
}
@@ -2196,7 +2446,7 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectAttributesInput>,
) -> S3Result<S3Response<GetObjectAttributesOutput>> {
let GetObjectAttributesInput { bucket, key, .. } = req.input;
let GetObjectAttributesInput { bucket, key, .. } = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -2209,11 +2459,36 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{e}")));
}
Ok(S3Response::new(GetObjectAttributesOutput {
let output = GetObjectAttributesOutput {
delete_marker: None,
object_parts: None,
..Default::default()
}))
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedAttributes,
bucket_name: bucket.clone(),
object: ecstore::store_api::ObjectInfo {
name: key.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
async fn put_object_acl(&self, req: S3Request<PutObjectAclInput>) -> S3Result<S3Response<PutObjectAclOutput>> {
@@ -2328,7 +2603,7 @@ impl S3 for FS {
) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
let GetObjectLegalHoldInput {
bucket, key, version_id, ..
} = req.input;
} = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -2362,11 +2637,33 @@ impl S3 for FS {
ObjectLockLegalHoldStatus::OFF.to_string()
};
Ok(S3Response::new(GetObjectLegalHoldOutput {
let output = GetObjectLegalHoldOutput {
legal_hold: Some(ObjectLockLegalHold {
status: Some(ObjectLockLegalHoldStatus::from(status)),
}),
}))
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGetLegalHold,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
async fn put_object_legal_hold(
@@ -2379,7 +2676,7 @@ impl S3 for FS {
legal_hold,
version_id,
..
} = req.input;
} = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -2417,14 +2714,35 @@ impl S3 for FS {
..Default::default()
};
store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| {
let info = store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| {
error!("put_object_metadata failed, {}", e.to_string());
s3_error!(InternalError, "{}", e.to_string())
})?;
Ok(S3Response::new(PutObjectLegalHoldOutput {
let output = PutObjectLegalHoldOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
}))
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutLegalHold,
bucket_name: bucket.clone(),
object: info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
async fn get_object_retention(
@@ -2433,7 +2751,7 @@ impl S3 for FS {
) -> S3Result<S3Response<GetObjectRetentionOutput>> {
let GetObjectRetentionInput {
bucket, key, version_id, ..
} = req.input;
} = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -2462,9 +2780,30 @@ impl S3 for FS {
.and_then(|v| OffsetDateTime::parse(v.as_str(), &Rfc3339).ok())
.map(Timestamp::from);
Ok(S3Response::new(GetObjectRetentionOutput {
let output = GetObjectRetentionOutput {
retention: Some(ObjectLockRetention { mode, retain_until_date }),
}))
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGetRetention,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
async fn put_object_retention(
@@ -2477,7 +2816,7 @@ impl S3 for FS {
retention,
version_id,
..
} = req.input;
} = req.input.clone();
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -2510,13 +2849,35 @@ impl S3 for FS {
.map_err(ApiError::from)?;
opts.eval_metadata = Some(eval_metadata);
store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| {
let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| {
error!("put_object_metadata failed, {}", e.to_string());
s3_error!(InternalError, "{}", e.to_string())
})?;
Ok(S3Response::new(PutObjectRetentionOutput {
let output = PutObjectRetentionOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
}))
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutRetention,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
}