mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
feat: support bucket tagging
Closes #41 Signed-off-by: bestgopher <84328409@qq.com>
This commit is contained in:
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -957,9 +957,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.21"
|
||||
version = "0.4.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
||||
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
@@ -1582,6 +1582,7 @@ dependencies = [
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"log",
|
||||
"mime",
|
||||
"netif",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -54,4 +54,5 @@ tower = { version = "0.4.13", features = ["timeout"] }
|
||||
tracing = "0.1.40"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
|
||||
transform-stream = "0.3.0"
|
||||
transform-stream = "0.3.0"
|
||||
log = "0.4.22"
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use rmp_serde::Serializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
@@ -15,6 +17,9 @@ pub struct BucketMetadata {
|
||||
format: u16,
|
||||
version: u16,
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub tagging: Option<HashMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub created: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
@@ -54,4 +59,8 @@ impl BucketMetadata {
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub fn unmarshal_from(buffer: &[u8]) -> Result<Self> {
|
||||
Ok(rmp_serde::from_slice(buffer)?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
mod bucket_meta;
|
||||
pub mod bucket_meta;
|
||||
mod chunk_stream;
|
||||
pub mod disk;
|
||||
pub mod disks_layout;
|
||||
|
||||
@@ -9,6 +9,7 @@ rust-version.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
log.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
clap.workspace = true
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use bytes::BufMut;
|
||||
use bytes::Bytes;
|
||||
use ecstore::bucket_meta::BucketMetadata;
|
||||
use ecstore::disk::error::DiskError;
|
||||
use ecstore::disk::RUSTFS_META_BUCKET;
|
||||
use ecstore::store::new_object_layer_fn;
|
||||
use ecstore::store_api::BucketOptions;
|
||||
use ecstore::store_api::CompletePart;
|
||||
@@ -16,6 +19,7 @@ use futures::{Stream, StreamExt};
|
||||
use http::HeaderMap;
|
||||
use s3s::dto::*;
|
||||
use s3s::s3_error;
|
||||
use s3s::Body;
|
||||
use s3s::S3Error;
|
||||
use s3s::S3ErrorCode;
|
||||
use s3s::S3Result;
|
||||
@@ -681,6 +685,130 @@ impl S3 for FS {
|
||||
);
|
||||
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
|
||||
let PutBucketTaggingInput { bucket, tagging, .. } = req.input;
|
||||
log::debug!("bucket: {bucket}, tagging: {tagging:?}");
|
||||
|
||||
// check bucket exists.
|
||||
let _bucket = self
|
||||
.head_bucket(S3Request::new(HeadBucketInput {
|
||||
bucket: bucket.clone(),
|
||||
expected_bucket_owner: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
};
|
||||
|
||||
let meta_obj = try_!(
|
||||
store
|
||||
.get_object_reader(
|
||||
RUSTFS_META_BUCKET,
|
||||
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
|
||||
HTTPRangeSpec::nil(),
|
||||
Default::default(),
|
||||
&ObjectOptions::default(),
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let stream = meta_obj.stream;
|
||||
|
||||
let mut data = vec![];
|
||||
pin_mut!(stream);
|
||||
|
||||
while let Some(x) = stream.next().await {
|
||||
let x = try_!(x);
|
||||
data.put_slice(&x[..]);
|
||||
}
|
||||
|
||||
let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
|
||||
if tagging.tag_set.is_empty() {
|
||||
meta.tagging = None;
|
||||
} else {
|
||||
meta.tagging = Some(tagging.tag_set.into_iter().map(|x| (x.key, x.value)).collect())
|
||||
}
|
||||
|
||||
let data = try_!(meta.marshal_msg());
|
||||
let len = data.len();
|
||||
try_!(
|
||||
store
|
||||
.put_object(
|
||||
RUSTFS_META_BUCKET,
|
||||
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
|
||||
PutObjReader::new(StreamingBlob::from(Body::from(data)), len),
|
||||
&ObjectOptions::default(),
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
Ok(S3Response::new(Default::default()))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn get_bucket_tagging(&self, req: S3Request<GetBucketTaggingInput>) -> S3Result<S3Response<GetBucketTaggingOutput>> {
|
||||
let GetBucketTaggingInput { bucket, .. } = req.input;
|
||||
// check bucket exists.
|
||||
let _bucket = self
|
||||
.head_bucket(S3Request::new(HeadBucketInput {
|
||||
bucket: bucket.clone(),
|
||||
expected_bucket_owner: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let layer = new_object_layer_fn();
|
||||
let lock = layer.read().await;
|
||||
let store = match lock.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
|
||||
};
|
||||
|
||||
let meta_obj = try_!(
|
||||
store
|
||||
.get_object_reader(
|
||||
RUSTFS_META_BUCKET,
|
||||
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
|
||||
HTTPRangeSpec::nil(),
|
||||
Default::default(),
|
||||
&ObjectOptions::default(),
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let stream = meta_obj.stream;
|
||||
|
||||
let mut data = vec![];
|
||||
pin_mut!(stream);
|
||||
|
||||
while let Some(x) = stream.next().await {
|
||||
let x = try_!(x);
|
||||
data.put_slice(&x[..]);
|
||||
}
|
||||
|
||||
let meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
|
||||
if meta.tagging.is_none() {
|
||||
return Err({
|
||||
let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist");
|
||||
err.set_status_code("404".try_into().unwrap());
|
||||
err
|
||||
});
|
||||
}
|
||||
|
||||
Ok(S3Response::new(GetBucketTaggingOutput {
|
||||
tag_set: meta
|
||||
.tagging
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(key, value)| Tag { key, value })
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
||||
Reference in New Issue
Block a user