From 3e34718bfecf6aaae5d55095702678eac0efc3f7 Mon Sep 17 00:00:00 2001 From: bestgopher <84328409@qq.com> Date: Thu, 19 Sep 2024 18:32:39 +0800 Subject: [PATCH] feat: object tagging Closes: #54 Signed-off-by: bestgopher <84328409@qq.com> --- ecstore/src/sets.rs | 7 +++ ecstore/src/store.rs | 11 ++++ ecstore/src/store_api.rs | 9 ++++ rustfs/src/storage/ecfs.rs | 105 +++++++++++++++++++++++++++++++------ 4 files changed, 116 insertions(+), 16 deletions(-) diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 8698b210..9d1decf5 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -275,6 +275,13 @@ impl StorageAPI for Sets { async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { self.get_disks_by_key(object).get_object_info(bucket, object, opts).await } + + async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()> { + self.get_disks_by_key(object) + .put_object_info(bucket, object, info, opts) + .await + } + async fn get_object_reader( &self, bucket: &str, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 44473c3b..b70e381e 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -717,6 +717,17 @@ impl StorageAPI for ECStore { unimplemented!() } + + async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()> { + let object = utils::path::encode_dir_object(object); + + if self.single_pool() { + return self.pools[0].put_object_info(bucket, object.as_str(), info, opts).await; + } + + unimplemented!() + } + async fn get_object_reader( &self, bucket: &str, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 0638ec19..49bc7dd2 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::error::{Error, Result}; use http::HeaderMap; use rmp_serde::Serializer; @@ -25,6 +27,8 @@ pub struct FileInfo { pub fresh: bool, // indicates this is a first time call to write FileInfo. pub parts: Vec, pub is_latest: bool, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub tags: Option>, } // impl Default for FileInfo { @@ -150,6 +154,7 @@ impl FileInfo { size: self.size, parts: self.parts.clone(), is_latest: self.is_latest, + tags: self.tags.clone(), } } // to_part_offset 取offset 所在的part index, 返回part index, offset @@ -429,6 +434,7 @@ pub struct ObjectInfo { pub delete_marker: bool, pub parts: Vec, pub is_latest: bool, + pub tags: Option>, } #[derive(Debug, Default)] @@ -516,6 +522,9 @@ pub trait StorageAPI { start_after: &str, ) -> Result; async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; + + async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>; + async fn get_object_reader( &self, bucket: &str, diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 68787c7e..ae44da1e 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -687,7 +687,7 @@ impl S3 for FS { Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() })) } - #[tracing::instrument(level = "debug", skip(self, req))] + #[tracing::instrument(level = "debug", skip(self))] async fn put_bucket_tagging(&self, req: S3Request) -> S3Result> { let PutBucketTaggingInput { bucket, tagging, .. } = req.input; log::debug!("bucket: {bucket}, tagging: {tagging:?}"); @@ -702,10 +702,9 @@ impl S3 for FS { let layer = new_object_layer_fn(); let lock = layer.read().await; - let store = match lock.as_ref() { - Some(s) => s, - None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), - }; + let store = lock + .as_ref() + .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; let meta_obj = try_!( store @@ -752,7 +751,7 @@ impl S3 for FS { Ok(S3Response::new(Default::default())) } - #[tracing::instrument(level = "debug", skip(self, req))] + #[tracing::instrument(level = "debug", skip(self))] async fn get_bucket_tagging(&self, req: S3Request) -> S3Result> { let GetBucketTaggingInput { bucket, .. } = req.input; // check bucket exists. @@ -765,10 +764,9 @@ impl S3 for FS { let layer = new_object_layer_fn(); let lock = layer.read().await; - let store = match lock.as_ref() { - Some(s) => s, - None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), - }; + let store = lock + .as_ref() + .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; let meta_obj = try_!( store @@ -811,8 +809,11 @@ impl S3 for FS { })) } - #[tracing::instrument(level = "debug", skip(self, req))] - async fn delete_bucket_tagging(&self, req: S3Request) -> S3Result> { + #[tracing::instrument(level = "debug", skip(self))] + async fn delete_bucket_tagging( + &self, + req: S3Request, + ) -> S3Result> { let DeleteBucketTaggingInput { bucket, .. } = req.input; // check bucket exists. let _bucket = self @@ -824,10 +825,9 @@ impl S3 for FS { let layer = new_object_layer_fn(); let lock = layer.read().await; - let store = match lock.as_ref() { - Some(s) => s, - None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), - }; + let store = lock + .as_ref() + .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; let meta_obj = try_!( store @@ -868,6 +868,79 @@ impl S3 for FS { Ok(S3Response::new(DeleteBucketTaggingOutput {})) } + + #[tracing::instrument(level = "debug", skip(self))] + async fn put_object_tagging(&self, req: S3Request) -> S3Result> { + let PutObjectTaggingInput { + bucket, + key: object, + tagging, + .. + } = req.input; + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = lock + .as_ref() + .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; + + let mut object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await); + object_info.tags = Some(tagging.tag_set.into_iter().map(|Tag { key, value }| (key, value)).collect()); + + try_!( + store + .put_object_info(&bucket, &object, object_info, &ObjectOptions::default()) + .await + ); + + Ok(S3Response::new(PutObjectTaggingOutput { version_id: None })) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn get_object_tagging(&self, req: S3Request) -> S3Result> { + let GetObjectTaggingInput { bucket, key: object, .. } = req.input; + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = lock + .as_ref() + .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; + + let object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await); + + Ok(S3Response::new(GetObjectTaggingOutput { + tag_set: object_info + .tags + .map(|tags| tags.into_iter().map(|(key, value)| Tag { key, value }).collect()) + .unwrap_or_else(|| vec![]), + version_id: None, + })) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn delete_object_tagging( + &self, + req: S3Request, + ) -> S3Result> { + let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input; + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = lock + .as_ref() + .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; + + let mut object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await); + object_info.tags = None; + + try_!( + store + .put_object_info(&bucket, &object, object_info, &ObjectOptions::default()) + .await + ); + + Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None })) + } } #[allow(dead_code)]