Merge pull request #55 from rustfs/feat/object-tagging

Feat/object tagging
This commit is contained in:
loverustfs
2024-09-23 10:33:25 +08:00
committed by GitHub
4 changed files with 116 additions and 16 deletions

View File

@@ -275,6 +275,13 @@ impl StorageAPI for Sets {
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).get_object_info(bucket, object, opts).await
}
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()> {
self.get_disks_by_key(object)
.put_object_info(bucket, object, info, opts)
.await
}
async fn get_object_reader(
&self,
bucket: &str,

View File

@@ -717,6 +717,17 @@ impl StorageAPI for ECStore {
unimplemented!()
}
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()> {
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].put_object_info(bucket, object.as_str(), info, opts).await;
}
unimplemented!()
}
async fn get_object_reader(
&self,
bucket: &str,

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
use crate::error::{Error, Result};
use http::HeaderMap;
use rmp_serde::Serializer;
@@ -25,6 +27,8 @@ pub struct FileInfo {
pub fresh: bool, // indicates this is a first time call to write FileInfo.
pub parts: Vec<ObjectPartInfo>,
pub is_latest: bool,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub tags: Option<HashMap<String, String>>,
}
// impl Default for FileInfo {
@@ -150,6 +154,7 @@ impl FileInfo {
size: self.size,
parts: self.parts.clone(),
is_latest: self.is_latest,
tags: self.tags.clone(),
}
}
// to_part_offset 取offset 所在的part index, 返回part index, offset
@@ -429,6 +434,7 @@ pub struct ObjectInfo {
pub delete_marker: bool,
pub parts: Vec<ObjectPartInfo>,
pub is_latest: bool,
pub tags: Option<HashMap<String, String>>,
}
#[derive(Debug, Default)]
@@ -516,6 +522,9 @@ pub trait StorageAPI {
start_after: &str,
) -> Result<ListObjectsV2Info>;
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>;
async fn get_object_reader(
&self,
bucket: &str,

View File

@@ -687,7 +687,7 @@ impl S3 for FS {
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
}
#[tracing::instrument(level = "debug", skip(self, req))]
#[tracing::instrument(level = "debug", skip(self))]
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
let PutBucketTaggingInput { bucket, tagging, .. } = req.input;
log::debug!("bucket: {bucket}, tagging: {tagging:?}");
@@ -702,10 +702,9 @@ impl S3 for FS {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let meta_obj = try_!(
store
@@ -752,7 +751,7 @@ impl S3 for FS {
Ok(S3Response::new(Default::default()))
}
#[tracing::instrument(level = "debug", skip(self, req))]
#[tracing::instrument(level = "debug", skip(self))]
async fn get_bucket_tagging(&self, req: S3Request<GetBucketTaggingInput>) -> S3Result<S3Response<GetBucketTaggingOutput>> {
let GetBucketTaggingInput { bucket, .. } = req.input;
// check bucket exists.
@@ -765,10 +764,9 @@ impl S3 for FS {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let meta_obj = try_!(
store
@@ -811,8 +809,11 @@ impl S3 for FS {
}))
}
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_bucket_tagging(&self, req: S3Request<DeleteBucketTaggingInput>) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
#[tracing::instrument(level = "debug", skip(self))]
async fn delete_bucket_tagging(
&self,
req: S3Request<DeleteBucketTaggingInput>,
) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
let DeleteBucketTaggingInput { bucket, .. } = req.input;
// check bucket exists.
let _bucket = self
@@ -824,10 +825,9 @@ impl S3 for FS {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let meta_obj = try_!(
store
@@ -868,6 +868,79 @@ impl S3 for FS {
Ok(S3Response::new(DeleteBucketTaggingOutput {}))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_object_tagging(&self, req: S3Request<PutObjectTaggingInput>) -> S3Result<S3Response<PutObjectTaggingOutput>> {
let PutObjectTaggingInput {
bucket,
key: object,
tagging,
..
} = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let mut object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await);
object_info.tags = Some(tagging.tag_set.into_iter().map(|Tag { key, value }| (key, value)).collect());
try_!(
store
.put_object_info(&bucket, &object, object_info, &ObjectOptions::default())
.await
);
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn get_object_tagging(&self, req: S3Request<GetObjectTaggingInput>) -> S3Result<S3Response<GetObjectTaggingOutput>> {
let GetObjectTaggingInput { bucket, key: object, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await);
Ok(S3Response::new(GetObjectTaggingOutput {
tag_set: object_info
.tags
.map(|tags| tags.into_iter().map(|(key, value)| Tag { key, value }).collect())
.unwrap_or_else(|| vec![]),
version_id: None,
}))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn delete_object_tagging(
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let mut object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await);
object_info.tags = None;
try_!(
store
.put_object_info(&bucket, &object, object_info, &ObjectOptions::default())
.await
);
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
}
}
#[allow(dead_code)]