feat: support bucket tagging

Closes #41
Signed-off-by: bestgopher <84328409@qq.com>
This commit is contained in:
bestgopher
2024-08-26 23:29:31 +08:00
parent 953f31662b
commit ccde55c4e3
6 changed files with 144 additions and 4 deletions

5
Cargo.lock generated
View File

@@ -957,9 +957,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.21"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lru"
@@ -1582,6 +1582,7 @@ dependencies = [
"http-body",
"hyper",
"hyper-util",
"log",
"mime",
"netif",
"pin-project-lite",

View File

@@ -54,4 +54,5 @@ tower = { version = "0.4.13", features = ["timeout"] }
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
transform-stream = "0.3.0"
transform-stream = "0.3.0"
log = "0.4.22"

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
@@ -15,6 +17,9 @@ pub struct BucketMetadata {
format: u16,
version: u16,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub tagging: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub created: Option<OffsetDateTime>,
}
@@ -54,4 +59,8 @@ impl BucketMetadata {
Ok(buf)
}
pub fn unmarshal_from(buffer: &[u8]) -> Result<Self> {
Ok(rmp_serde::from_slice(buffer)?)
}
}

View File

@@ -1,4 +1,4 @@
mod bucket_meta;
pub mod bucket_meta;
mod chunk_stream;
pub mod disk;
pub mod disks_layout;

View File

@@ -9,6 +9,7 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log.workspace = true
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true

View File

@@ -1,5 +1,8 @@
use bytes::BufMut;
use bytes::Bytes;
use ecstore::bucket_meta::BucketMetadata;
use ecstore::disk::error::DiskError;
use ecstore::disk::RUSTFS_META_BUCKET;
use ecstore::store::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
@@ -16,6 +19,7 @@ use futures::{Stream, StreamExt};
use http::HeaderMap;
use s3s::dto::*;
use s3s::s3_error;
use s3s::Body;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
@@ -681,6 +685,130 @@ impl S3 for FS {
);
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
let PutBucketTaggingInput { bucket, tagging, .. } = req.input;
log::debug!("bucket: {bucket}, tagging: {tagging:?}");
// check bucket exists.
let _bucket = self
.head_bucket(S3Request::new(HeadBucketInput {
bucket: bucket.clone(),
expected_bucket_owner: None,
}))
.await?;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let meta_obj = try_!(
store
.get_object_reader(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
HTTPRangeSpec::nil(),
Default::default(),
&ObjectOptions::default(),
)
.await
);
let stream = meta_obj.stream;
let mut data = vec![];
pin_mut!(stream);
while let Some(x) = stream.next().await {
let x = try_!(x);
data.put_slice(&x[..]);
}
let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
if tagging.tag_set.is_empty() {
meta.tagging = None;
} else {
meta.tagging = Some(tagging.tag_set.into_iter().map(|x| (x.key, x.value)).collect())
}
let data = try_!(meta.marshal_msg());
let len = data.len();
try_!(
store
.put_object(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
PutObjReader::new(StreamingBlob::from(Body::from(data)), len),
&ObjectOptions::default(),
)
.await
);
Ok(S3Response::new(Default::default()))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn get_bucket_tagging(&self, req: S3Request<GetBucketTaggingInput>) -> S3Result<S3Response<GetBucketTaggingOutput>> {
let GetBucketTaggingInput { bucket, .. } = req.input;
// check bucket exists.
let _bucket = self
.head_bucket(S3Request::new(HeadBucketInput {
bucket: bucket.clone(),
expected_bucket_owner: None,
}))
.await?;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let meta_obj = try_!(
store
.get_object_reader(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
HTTPRangeSpec::nil(),
Default::default(),
&ObjectOptions::default(),
)
.await
);
let stream = meta_obj.stream;
let mut data = vec![];
pin_mut!(stream);
while let Some(x) = stream.next().await {
let x = try_!(x);
data.put_slice(&x[..]);
}
let meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
if meta.tagging.is_none() {
return Err({
let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist");
err.set_status_code("404".try_into().unwrap());
err
});
}
Ok(S3Response::new(GetBucketTaggingOutput {
tag_set: meta
.tagging
.unwrap()
.into_iter()
.map(|(key, value)| Tag { key, value })
.collect(),
}))
}
}
#[allow(dead_code)]