diff --git a/Cargo.lock b/Cargo.lock index a30a9562..22b19bba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -957,9 +957,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lru" @@ -1582,6 +1582,7 @@ dependencies = [ "http-body", "hyper", "hyper-util", + "log", "mime", "netif", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 003fce5d..8d9bcdfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,4 +54,5 @@ tower = { version = "0.4.13", features = ["timeout"] } tracing = "0.1.40" tracing-error = "0.2.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } -transform-stream = "0.3.0" \ No newline at end of file +transform-stream = "0.3.0" +log = "0.4.22" diff --git a/ecstore/src/bucket_meta.rs b/ecstore/src/bucket_meta.rs index aedeb9cf..6ad9d9cc 100644 --- a/ecstore/src/bucket_meta.rs +++ b/ecstore/src/bucket_meta.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -15,6 +17,9 @@ pub struct BucketMetadata { format: u16, version: u16, pub name: String, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub tagging: Option>, + #[serde(skip_serializing_if = "Option::is_none", default)] pub created: Option, } @@ -54,4 +59,8 @@ impl BucketMetadata { Ok(buf) } + + pub fn unmarshal_from(buffer: &[u8]) -> Result { + Ok(rmp_serde::from_slice(buffer)?) + } } diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 85e1c901..5ac3eaa8 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -1,4 +1,4 @@ -mod bucket_meta; +pub mod bucket_meta; mod chunk_stream; pub mod disk; pub mod disks_layout; diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index d2d1a989..f1d4025d 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -9,6 +9,7 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +log.workspace = true async-trait.workspace = true bytes.workspace = true clap.workspace = true diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index fd8bcc9a..da44a221 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,5 +1,8 @@ +use bytes::BufMut; use bytes::Bytes; +use ecstore::bucket_meta::BucketMetadata; use ecstore::disk::error::DiskError; +use ecstore::disk::RUSTFS_META_BUCKET; use ecstore::store::new_object_layer_fn; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; @@ -16,6 +19,7 @@ use futures::{Stream, StreamExt}; use http::HeaderMap; use s3s::dto::*; use s3s::s3_error; +use s3s::Body; use s3s::S3Error; use s3s::S3ErrorCode; use s3s::S3Result; @@ -681,6 +685,130 @@ impl S3 for FS { ); Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() })) } + + #[tracing::instrument(level = "debug", skip(self))] + async fn put_bucket_tagging(&self, req: S3Request) -> S3Result> { + let PutBucketTaggingInput { bucket, tagging, .. } = req.input; + log::debug!("bucket: {bucket}, tagging: {tagging:?}"); + + // check bucket exists. + let _bucket = self + .head_bucket(S3Request::new(HeadBucketInput { + bucket: bucket.clone(), + expected_bucket_owner: None, + })) + .await?; + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), + }; + + let meta_obj = try_!( + store + .get_object_reader( + RUSTFS_META_BUCKET, + BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), + HTTPRangeSpec::nil(), + Default::default(), + &ObjectOptions::default(), + ) + .await + ); + + let stream = meta_obj.stream; + + let mut data = vec![]; + pin_mut!(stream); + + while let Some(x) = stream.next().await { + let x = try_!(x); + data.put_slice(&x[..]); + } + + let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..])); + if tagging.tag_set.is_empty() { + meta.tagging = None; + } else { + meta.tagging = Some(tagging.tag_set.into_iter().map(|x| (x.key, x.value)).collect()) + } + + let data = try_!(meta.marshal_msg()); + let len = data.len(); + try_!( + store + .put_object( + RUSTFS_META_BUCKET, + BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), + PutObjReader::new(StreamingBlob::from(Body::from(data)), len), + &ObjectOptions::default(), + ) + .await + ); + + Ok(S3Response::new(Default::default())) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn get_bucket_tagging(&self, req: S3Request) -> S3Result> { + let GetBucketTaggingInput { bucket, .. } = req.input; + // check bucket exists. + let _bucket = self + .head_bucket(S3Request::new(HeadBucketInput { + bucket: bucket.clone(), + expected_bucket_owner: None, + })) + .await?; + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), + }; + + let meta_obj = try_!( + store + .get_object_reader( + RUSTFS_META_BUCKET, + BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), + HTTPRangeSpec::nil(), + Default::default(), + &ObjectOptions::default(), + ) + .await + ); + + let stream = meta_obj.stream; + + let mut data = vec![]; + pin_mut!(stream); + + while let Some(x) = stream.next().await { + let x = try_!(x); + data.put_slice(&x[..]); + } + + let meta = try_!(BucketMetadata::unmarshal_from(&data[..])); + if meta.tagging.is_none() { + return Err({ + let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist"); + err.set_status_code("404".try_into().unwrap()); + err + }); + } + + Ok(S3Response::new(GetBucketTaggingOutput { + tag_set: meta + .tagging + .unwrap() + .into_iter() + .map(|(key, value)| Tag { key, value }) + .collect(), + })) + } } #[allow(dead_code)]