fix: format of bucket event notifications (#1138)

This commit is contained in:
mythrnr
2025-12-16 21:44:57 +09:00
committed by GitHub
parent 0bca1fbd56
commit 94d5b1c1e4
4 changed files with 90 additions and 4 deletions

View File

@@ -20,6 +20,7 @@ use url::form_urlencoded;
/// Represents the identity of the user who triggered the event
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Identity {
/// The principal ID of the user
pub principal_id: String,
@@ -27,6 +28,7 @@ pub struct Identity {
/// Represents the bucket that the object is in
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Bucket {
/// The name of the bucket
pub name: String,
@@ -38,6 +40,7 @@ pub struct Bucket {
/// Represents the object that the event occurred on
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct Object {
/// The key (name) of the object
pub key: String,
@@ -62,6 +65,7 @@ pub struct Object {
/// Metadata about the event
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
/// The schema version of the event
#[serde(rename = "s3SchemaVersion")]
@@ -76,13 +80,13 @@ pub struct Metadata {
/// Information about the source of the event
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Source {
/// The host where the event originated
pub host: String,
/// The port on the host
pub port: String,
/// The user agent that caused the event
#[serde(rename = "userAgent")]
pub user_agent: String,
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::fmt;
/// Error returned when parsing event name string fails.
@@ -29,7 +28,7 @@ impl std::error::Error for ParseEventNameError {}
/// Represents the type of event that occurs on the object.
/// Based on AWS S3 event type and includes RustFS extension.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum EventName {
// Single event type (values are 1-32 for compatible mask logic)
ObjectAccessedGet = 1,
@@ -289,3 +288,79 @@ impl From<&str> for EventName {
EventName::parse(event_str).unwrap_or_else(|e| panic!("{}", e))
}
}
impl serde::ser::Serialize for EventName {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
serializer.serialize_str(self.as_str())
}
}
impl<'de> serde::de::Deserialize<'de> for EventName {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let s = Self::parse(&s).map_err(serde::de::Error::custom)?;
Ok(s)
}
}
#[cfg(test)]
mod tests {
use super::*;
// test serialization
#[test]
fn test_event_name_serialization_and_deserialization() {
struct TestCase {
event: EventName,
serialized_str: &'static str,
}
let test_cases = vec![
TestCase {
event: EventName::BucketCreated,
serialized_str: "\"s3:BucketCreated:*\"",
},
TestCase {
event: EventName::ObjectCreatedAll,
serialized_str: "\"s3:ObjectCreated:*\"",
},
TestCase {
event: EventName::ObjectCreatedPut,
serialized_str: "\"s3:ObjectCreated:Put\"",
},
];
for case in &test_cases {
let serialized = serde_json::to_string(&case.event);
assert!(serialized.is_ok(), "Serialization failed for `{}`", case.serialized_str);
assert_eq!(serialized.unwrap(), case.serialized_str);
let deserialized = serde_json::from_str::<EventName>(case.serialized_str);
assert!(deserialized.is_ok(), "Deserialization failed for `{}`", case.serialized_str);
assert_eq!(deserialized.unwrap(), case.event);
}
}
#[test]
fn test_invalid_event_name_deserialization() {
let invalid_str = "\"s3:InvalidEvent:Test\"";
let deserialized = serde_json::from_str::<EventName>(invalid_str);
assert!(deserialized.is_err(), "Deserialization should fail for invalid event name");
// empty string should be successful only serialization
let event_name = EventName::Everything;
let serialized_str = "\"\"";
let serialized = serde_json::to_string(&event_name);
assert!(serialized.is_ok(), "Serialization failed for `{serialized_str}`");
assert_eq!(serialized.unwrap(), serialized_str);
let deserialized = serde_json::from_str::<EventName>(serialized_str);
assert!(deserialized.is_err(), "Deserialization should fail for empty string");
}
}

View File

@@ -27,6 +27,7 @@ pub use target::Target;
/// Represents a log of events for sending to targets
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct TargetLog<E> {
/// The event name
pub event_name: EventName,

View File

@@ -2820,7 +2820,7 @@ impl S3 for FS {
// #[instrument(level = "debug", skip(self, req))]
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject");
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject");
if req
.headers
.get("X-Amz-Meta-Snowball-Auto-Extract")
@@ -3142,6 +3142,12 @@ impl S3 for FS {
let put_bucket = bucket.clone();
let put_key = key.clone();
let put_version = obj_info.version_id.map(|v| v.to_string());
helper = helper.object(obj_info.clone());
if let Some(version_id) = &put_version {
helper = helper.version_id(version_id.clone());
}
tokio::spawn(async move {
manager
.invalidate_cache_versioned(&put_bucket, &put_key, put_version.as_deref())