mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
feat: object tagging
Closes: #54 Signed-off-by: bestgopher <84328409@qq.com>
This commit is contained in:
@@ -275,6 +275,13 @@ impl StorageAPI for Sets {
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
|
||||
self.get_disks_by_key(object).get_object_info(bucket, object, opts).await
|
||||
}
|
||||
|
||||
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()> {
|
||||
self.get_disks_by_key(object)
|
||||
.put_object_info(bucket, object, info, opts)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_object_reader(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
@@ -717,6 +717,17 @@ impl StorageAPI for ECStore {
|
||||
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()> {
|
||||
let object = utils::path::encode_dir_object(object);
|
||||
|
||||
if self.single_pool() {
|
||||
return self.pools[0].put_object_info(bucket, object.as_str(), info, opts).await;
|
||||
}
|
||||
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get_object_reader(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use http::HeaderMap;
|
||||
use rmp_serde::Serializer;
|
||||
@@ -25,6 +27,8 @@ pub struct FileInfo {
|
||||
pub fresh: bool, // indicates this is a first time call to write FileInfo.
|
||||
pub parts: Vec<ObjectPartInfo>,
|
||||
pub is_latest: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub tags: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
// impl Default for FileInfo {
|
||||
@@ -150,6 +154,7 @@ impl FileInfo {
|
||||
size: self.size,
|
||||
parts: self.parts.clone(),
|
||||
is_latest: self.is_latest,
|
||||
tags: self.tags.clone(),
|
||||
}
|
||||
}
|
||||
// to_part_offset 取offset 所在的part index, 返回part index, offset
|
||||
@@ -429,6 +434,7 @@ pub struct ObjectInfo {
|
||||
pub delete_marker: bool,
|
||||
pub parts: Vec<ObjectPartInfo>,
|
||||
pub is_latest: bool,
|
||||
pub tags: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -516,6 +522,9 @@ pub trait StorageAPI {
|
||||
start_after: &str,
|
||||
) -> Result<ListObjectsV2Info>;
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
|
||||
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>;
|
||||
|
||||
async fn get_object_reader(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
@@ -687,7 +687,7 @@ impl S3 for FS {
|
||||
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, req))]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
|
||||
let PutBucketTaggingInput { bucket, tagging, .. } = req.input;
|
||||
log::debug!("bucket: {bucket}, tagging: {tagging:?}");
|
||||
@@ -702,10 +702,9 @@ impl S3 for FS {
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
};
|
||||
let store = lock
|
||||
.as_ref()
|
||||
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
|
||||
|
||||
let meta_obj = try_!(
|
||||
store
|
||||
@@ -752,7 +751,7 @@ impl S3 for FS {
|
||||
Ok(S3Response::new(Default::default()))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, req))]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn get_bucket_tagging(&self, req: S3Request<GetBucketTaggingInput>) -> S3Result<S3Response<GetBucketTaggingOutput>> {
|
||||
let GetBucketTaggingInput { bucket, .. } = req.input;
|
||||
// check bucket exists.
|
||||
@@ -765,10 +764,9 @@ impl S3 for FS {
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
};
|
||||
let store = lock
|
||||
.as_ref()
|
||||
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
|
||||
|
||||
let meta_obj = try_!(
|
||||
store
|
||||
@@ -811,8 +809,11 @@ impl S3 for FS {
|
||||
}))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, req))]
|
||||
async fn delete_bucket_tagging(&self, req: S3Request<DeleteBucketTaggingInput>) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn delete_bucket_tagging(
|
||||
&self,
|
||||
req: S3Request<DeleteBucketTaggingInput>,
|
||||
) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
|
||||
let DeleteBucketTaggingInput { bucket, .. } = req.input;
|
||||
// check bucket exists.
|
||||
let _bucket = self
|
||||
@@ -824,10 +825,9 @@ impl S3 for FS {
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
};
|
||||
let store = lock
|
||||
.as_ref()
|
||||
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
|
||||
|
||||
let meta_obj = try_!(
|
||||
store
|
||||
@@ -868,6 +868,79 @@ impl S3 for FS {
|
||||
|
||||
Ok(S3Response::new(DeleteBucketTaggingOutput {}))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn put_object_tagging(&self, req: S3Request<PutObjectTaggingInput>) -> S3Result<S3Response<PutObjectTaggingOutput>> {
|
||||
let PutObjectTaggingInput {
|
||||
bucket,
|
||||
key: object,
|
||||
tagging,
|
||||
..
|
||||
} = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = lock
|
||||
.as_ref()
|
||||
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
|
||||
|
||||
let mut object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await);
|
||||
object_info.tags = Some(tagging.tag_set.into_iter().map(|Tag { key, value }| (key, value)).collect());
|
||||
|
||||
try_!(
|
||||
store
|
||||
.put_object_info(&bucket, &object, object_info, &ObjectOptions::default())
|
||||
.await
|
||||
);
|
||||
|
||||
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn get_object_tagging(&self, req: S3Request<GetObjectTaggingInput>) -> S3Result<S3Response<GetObjectTaggingOutput>> {
|
||||
let GetObjectTaggingInput { bucket, key: object, .. } = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = lock
|
||||
.as_ref()
|
||||
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
|
||||
|
||||
let object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await);
|
||||
|
||||
Ok(S3Response::new(GetObjectTaggingOutput {
|
||||
tag_set: object_info
|
||||
.tags
|
||||
.map(|tags| tags.into_iter().map(|(key, value)| Tag { key, value }).collect())
|
||||
.unwrap_or_else(|| vec![]),
|
||||
version_id: None,
|
||||
}))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn delete_object_tagging(
|
||||
&self,
|
||||
req: S3Request<DeleteObjectTaggingInput>,
|
||||
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
|
||||
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = lock
|
||||
.as_ref()
|
||||
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
|
||||
|
||||
let mut object_info = try_!(store.get_object_info(&bucket, &object, &ObjectOptions::default()).await);
|
||||
object_info.tags = None;
|
||||
|
||||
try_!(
|
||||
store
|
||||
.put_object_info(&bucket, &object, object_info, &ObjectOptions::default())
|
||||
.await
|
||||
);
|
||||
|
||||
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
||||
Reference in New Issue
Block a user