diff --git a/TODO.md b/TODO.md index cbcab874..b158b0aa 100644 --- a/TODO.md +++ b/TODO.md @@ -2,18 +2,18 @@ ## 基础存储 -- [ ] EC可用读写数量判断 Read/WriteQuorum -- [ ] 优化并发执行,边读边取,可中断 +- [x] EC可用读写数量判断 Read/WriteQuorum +- [ ] 优化后台并发执行,可中断 - [ ] 小文件存储到metafile, inlinedata - [ ] 完善bucketmeta -- [ ] 对象锁 -- [ ] 代码优化 使用范型? -- [ ] 抽象出metafile存储 -- [ ] 边读写边hash +- [x] 对象锁 +- [ ] 边读写边hash,实现reader嵌套 - [x] 远程rpc - [x] 错误类型判断,程序中判断错误类型,如何统一错误 - [x] 优化xlmeta, 自定义msg数据结构 -- [x] appendFile, createFile, readFile, walk_dir sync io +- [ ] 优化io.reader 参考 GetObjectNInfo 方便io copy 如果 异步写,再平衡 +- [ ] 代码优化 使用范型? +- [ ] 抽象出metafile存储 ## 基础功能 diff --git a/ecstore/src/bucket/encryption/mod.rs b/ecstore/src/bucket/encryption/mod.rs index f5fee7c7..20e7d315 100644 --- a/ecstore/src/bucket/encryption/mod.rs +++ b/ecstore/src/bucket/encryption/mod.rs @@ -21,20 +21,20 @@ impl std::str::FromStr for Algorithm { } // 定义EncryptionAction结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct EncryptionAction { algorithm: Option, master_key_id: Option, } // 定义Rule结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Rule { default_encryption_action: EncryptionAction, } // 定义BucketSSEConfig结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct BucketSSEConfig { xml_ns: String, xml_name: String, diff --git a/ecstore/src/bucket/event/mod.rs b/ecstore/src/bucket/event/mod.rs index 48944fee..d1cb49fb 100644 --- a/ecstore/src/bucket/event/mod.rs +++ b/ecstore/src/bucket/event/mod.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use name::Name; // 定义common结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] struct Common { pub id: String, pub filter: S3Key, @@ -13,57 +13,57 @@ struct Common { } // 定义Queue结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] struct Queue { pub common: Common, pub arn: ARN, } // 定义ARN结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ARN { pub target_id: TargetID, pub region: String, } // 定义TargetID结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct TargetID { pub id: String, pub name: String, } // 定义FilterRule结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct FilterRule { pub name: String, pub value: String, } // 定义FilterRuleList结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct FilterRuleList { pub rules: Vec, } // 定义S3Key结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct S3Key { pub rule_list: FilterRuleList, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Lambda { arn: String, } // 定义Topic结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Topic { arn: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Config { queue_list: Vec, lambda_list: Vec, diff --git a/ecstore/src/bucket/lifecycle/and.rs b/ecstore/src/bucket/lifecycle/and.rs index 38bee917..70b36607 100644 --- a/ecstore/src/bucket/lifecycle/and.rs +++ b/ecstore/src/bucket/lifecycle/and.rs @@ -1,6 +1,6 @@ use super::{prefix::Prefix, tag::Tag}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct And { pub object_size_greater_than: i64, pub object_size_less_than: i64, diff --git a/ecstore/src/bucket/lifecycle/delmarker.rs b/ecstore/src/bucket/lifecycle/delmarker.rs index 06102f21..5ea1d3bb 100644 --- a/ecstore/src/bucket/lifecycle/delmarker.rs +++ b/ecstore/src/bucket/lifecycle/delmarker.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct DelMarkerExpiration { pub days: usize, } diff --git a/ecstore/src/bucket/lifecycle/expiration.rs b/ecstore/src/bucket/lifecycle/expiration.rs index 8fa2e08c..490b454b 100644 --- a/ecstore/src/bucket/lifecycle/expiration.rs +++ b/ecstore/src/bucket/lifecycle/expiration.rs @@ -3,21 +3,21 @@ use time::OffsetDateTime; // ExpirationDays is a type alias to unmarshal Days in Expiration pub type ExpirationDays = usize; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExpirationDate(Option); -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExpireDeleteMarker { pub marker: Boolean, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Boolean { pub val: bool, pub set: bool, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Expiration { pub days: Option, pub date: Option, diff --git a/ecstore/src/bucket/lifecycle/fileter.rs b/ecstore/src/bucket/lifecycle/fileter.rs index 3604384b..e7b35be8 100644 --- a/ecstore/src/bucket/lifecycle/fileter.rs +++ b/ecstore/src/bucket/lifecycle/fileter.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use super::{and::And, prefix::Prefix, tag::Tag}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Filter { pub set: bool, diff --git a/ecstore/src/bucket/lifecycle/lifecycle.rs b/ecstore/src/bucket/lifecycle/lifecycle.rs index b4ef834c..e4f2fa6f 100644 --- a/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -2,7 +2,7 @@ use super::rule::Rule; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Lifecycle { pub rules: Vec, pub expiry_updated_at: Option, diff --git a/ecstore/src/bucket/lifecycle/noncurrentversion.rs b/ecstore/src/bucket/lifecycle/noncurrentversion.rs index 5f39d7d8..3441746f 100644 --- a/ecstore/src/bucket/lifecycle/noncurrentversion.rs +++ b/ecstore/src/bucket/lifecycle/noncurrentversion.rs @@ -1,14 +1,14 @@ use super::{expiration::ExpirationDays, transition::TransitionDays}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct NoncurrentVersionExpiration { pub noncurrent_days: ExpirationDays, pub newer_noncurrent_versions: usize, set: bool, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct NoncurrentVersionTransition { pub noncurrent_days: TransitionDays, pub storage_class: String, diff --git a/ecstore/src/bucket/lifecycle/prefix.rs b/ecstore/src/bucket/lifecycle/prefix.rs index abc09320..657038dc 100644 --- a/ecstore/src/bucket/lifecycle/prefix.rs +++ b/ecstore/src/bucket/lifecycle/prefix.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Prefix { pub val: String, pub set: bool, diff --git a/ecstore/src/bucket/lifecycle/rule.rs b/ecstore/src/bucket/lifecycle/rule.rs index 4c69c7bb..27f6e729 100644 --- a/ecstore/src/bucket/lifecycle/rule.rs +++ b/ecstore/src/bucket/lifecycle/rule.rs @@ -8,14 +8,14 @@ use super::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub enum Status { #[default] Enabled, Disabled, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Rule { pub id: String, pub status: Status, diff --git a/ecstore/src/bucket/lifecycle/tag.rs b/ecstore/src/bucket/lifecycle/tag.rs index 95687a49..5d80bb2c 100644 --- a/ecstore/src/bucket/lifecycle/tag.rs +++ b/ecstore/src/bucket/lifecycle/tag.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Tag { pub key: String, pub value: String, diff --git a/ecstore/src/bucket/lifecycle/transition.rs b/ecstore/src/bucket/lifecycle/transition.rs index 5950de29..54534a4a 100644 --- a/ecstore/src/bucket/lifecycle/transition.rs +++ b/ecstore/src/bucket/lifecycle/transition.rs @@ -3,10 +3,10 @@ use time::OffsetDateTime; pub type TransitionDays = usize; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct TransitionDate(Option); -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Transition { pub days: Option, pub date: Option, diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index 316e5f5b..251ff304 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -1,32 +1,34 @@ -use byteorder::{BigEndian, ByteOrder}; +use super::{ + encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy, + quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning, +}; +use byteorder::{BigEndian, ByteOrder, LittleEndian}; use rmp_serde::Serializer as rmpSerializer; use serde::Serializer; use serde::{Deserialize, Deserializer, Serialize}; use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; -use time::macros::datetime; use time::OffsetDateTime; +use tracing::error; -use super::{ - encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy, - quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning, -}; - -use crate::error::Result; +use crate::bucket::tags; +use crate::config::common::{read_config, save_config}; +use crate::config::error::ConfigError; +use crate::error::{Error, Result}; use crate::disk::BUCKET_META_PREFIX; -use crate::utils::crypto::hex; +use crate::store::ECStore; +use crate::store_api::StorageAPI; pub const BUCKET_METADATA_FILE: &str = ".metadata.bin"; pub const BUCKET_METADATA_FORMAT: u16 = 1; pub const BUCKET_METADATA_VERSION: u16 = 1; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "PascalCase", default)] pub struct BucketMetadata { pub name: String, - #[serde(serialize_with = "write_times")] pub created: OffsetDateTime, pub lock_enabled: bool, // 虽然标记为不使用,但可能需要保留 pub policy_config_json: Vec, @@ -41,27 +43,16 @@ pub struct BucketMetadata { pub bucket_targets_config_json: Vec, pub bucket_targets_config_meta_json: Vec, - #[serde(serialize_with = "write_times")] pub policy_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub object_lock_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub encryption_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub tagging_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub quota_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub replication_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub versioning_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub lifecycle_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub notification_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub bucket_targets_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub bucket_targets_config_meta_updated_at: OffsetDateTime, #[serde(skip)] @@ -147,9 +138,9 @@ impl BucketMetadata { format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE) } - fn msg_size(&self) -> usize { - unimplemented!() - } + // fn msg_size(&self) -> usize { + // unimplemented!() + // } pub fn marshal_msg(&self) -> Result> { let mut buf = Vec::new(); @@ -159,12 +150,99 @@ impl BucketMetadata { Ok(buf) } - pub fn unmarshal(_buf: &[u8]) -> Result { - unimplemented!() + pub fn unmarshal(buf: &[u8]) -> Result { + let t: BucketMetadata = rmp_serde::from_slice(buf)?; + Ok(t) + } + + pub fn check_header(buf: &[u8]) -> Result<()> { + if buf.len() <= 4 { + return Err(Error::msg("read_bucket_metadata: data invalid")); + } + + // TODO: check version + Ok(()) + } + + fn default_timestamps(&mut self) { + if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.tagging_config_updated_at = self.created + } + } + + async fn save(&mut self, api: &ECStore) -> Result<()> { + self.parse_all_configs(api)?; + + let mut buf: Vec = vec![0; 4]; + + LittleEndian::write_u16(&mut buf[0..2], BUCKET_METADATA_FORMAT); + + LittleEndian::write_u16(&mut buf[2..4], BUCKET_METADATA_VERSION); + + let data = self.marshal_msg()?; + + buf.extend_from_slice(&data); + + save_config(api, self.save_file_path().as_str(), &buf).await?; + + Ok(()) + } + + fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> { + if !self.tagging_config_xml.is_empty() { + self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?); + } + + Ok(()) } } -fn deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result +pub async fn load_bucket_metadata(api: &ECStore, bucket: &str) -> Result { + load_bucket_metadata_parse(api, bucket, true).await +} + +async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result { + let mut bm = match read_bucket_metadata(api, bucket).await { + Ok(res) => res, + Err(err) => { + if let Some(e) = err.downcast_ref::() { + if !ConfigError::is_not_found(&e) { + return Err(err); + } + } + + BucketMetadata::new(bucket) + } + }; + + bm.default_timestamps(); + + if parse { + bm.parse_all_configs(api)?; + } + + // TODO: parse_all_configs + + Ok(bm) +} + +async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result { + if bucket.is_empty() { + error!("bucket name empty"); + return Err(Error::msg("invalid argument")); + } + + let bm = BucketMetadata::new(&bucket); + let file_path = bm.save_file_path(); + + let data = read_config(api, &file_path).await?; + + BucketMetadata::check_header(&data)?; + + BucketMetadata::unmarshal(&data[4..]) +} + +fn _deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result where S: FromStr, S::Err: Display, @@ -175,7 +253,7 @@ where unimplemented!() } -fn write_times(t: &OffsetDateTime, s: S) -> Result +fn _write_time(t: &OffsetDateTime, s: S) -> Result where S: Serializer, { @@ -183,47 +261,16 @@ where let sec = t.unix_timestamp() - 62135596800; let nsec = t.nanosecond(); - buf[0] = 0xc7; - buf[1] = 0x0c; - buf[2] = 0x05; + buf[0] = 0xc7; // mext8 + buf[1] = 0x0c; // 长度 + buf[2] = 0x05; // 时间扩展类型 BigEndian::write_u64(&mut buf[3..], sec as u64); BigEndian::write_u32(&mut buf[11..], nsec as u32); s.serialize_bytes(&buf) } -fn write_time(t: OffsetDateTime) -> Result<(), String> { - // let t = t.saturating_sub(0); // 转换为自 UNIX_EPOCH 以来的时间 - println!("t:{:?}", t); - println!("offset:{:?}", datetime!(0-01-01 0:00 UTC)); - - let mut buf = vec![0x0; 15]; - - let sec = t.unix_timestamp() - 62135596800; - let nsec = t.nanosecond(); - - println!("sec:{:?}", sec); - println!("nsec:{:?}", nsec); - - buf[0] = 0xc7; - buf[1] = 0x0c; - buf[2] = 0x05; - // 扩展格式的时间类型 - // buf.push(0xc7); // mext8 - // buf.push(12); // 长度 - // buf.push(0x05); // 时间扩展类型 - - // 将 Unix 时间戳和纳秒部分写入缓冲区 - BigEndian::write_u64(&mut buf[3..], sec as u64); - BigEndian::write_u32(&mut buf[11..], nsec as u32); - - println!("hex:{:?}", hex(buf)); - - Ok(()) -} - #[cfg(test)] mod test { - use crate::utils::crypto::hex; use super::*; @@ -235,6 +282,8 @@ mod test { let buf = bm.marshal_msg().unwrap(); - println!("{:?}", hex(buf)) + let new = BucketMetadata::unmarshal(&buf).unwrap(); + + assert_eq!(bm.name, new.name); } } diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs new file mode 100644 index 00000000..92d24a4e --- /dev/null +++ b/ecstore/src/bucket/metadata_sys.rs @@ -0,0 +1,82 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::error::{Error, Result}; +use crate::store::ECStore; +use lazy_static::lazy_static; +use time::OffsetDateTime; +use tokio::sync::RwLock; + +use super::metadata::{load_bucket_metadata, BucketMetadata}; +use super::tags; + +lazy_static! { + pub static ref GLOBAL_BucketMetadataSys: Arc> = Arc::new(Some(BucketMetadataSys::new())); +} + +pub fn get_bucket_metadata_sys() -> Arc> { + GLOBAL_BucketMetadataSys.clone() +} + +#[derive(Debug, Default)] +pub struct BucketMetadataSys { + metadata_map: RwLock>, + api: Option>, + initialized: RwLock, +} + +impl BucketMetadataSys { + fn new() -> Self { + Self { ..Default::default() } + } + + pub fn init(&mut self, api: Arc, buckets: Vec) { + self.api = Some(api); + + unimplemented!() + } + + async fn reset(&mut self) { + let mut map = self.metadata_map.write().await; + map.clear(); + } + + pub async fn get_config(&self, bucket: String) -> Result<(BucketMetadata, bool)> { + if let Some(api) = self.api.as_ref() { + let has_bm = { + let map = self.metadata_map.read().await; + if let Some(bm) = map.get(&bucket) { + Some(bm.clone()) + } else { + None + } + }; + + if let Some(bm) = has_bm { + return Ok((bm, false)); + } else { + let bm = match load_bucket_metadata(&api, bucket.as_str()).await { + Ok(res) => res, + Err(err) => { + if *self.initialized.read().await { + return Err(Error::msg("errBucketMetadataNotInitialized")); + } else { + return Err(err); + } + } + }; + + let mut map = self.metadata_map.write().await; + + map.insert(bucket, bm.clone()); + + Ok((bm, true)) + } + } else { + Err(Error::msg("errBucketMetadataNotInitialized")) + } + } + + pub async fn get_tagging_config(&self, bucket: String) -> Result<(tags::Tags, Option)> { + unimplemented!() + } +} diff --git a/ecstore/src/bucket/mod.rs b/ecstore/src/bucket/mod.rs index 9667266f..48d3c5f5 100644 --- a/ecstore/src/bucket/mod.rs +++ b/ecstore/src/bucket/mod.rs @@ -1,7 +1,8 @@ mod encryption; mod event; mod lifecycle; -mod metadata; +pub mod metadata; +mod metadata_sys; mod objectlock; mod policy; mod quota; @@ -10,4 +11,4 @@ mod tags; mod target; mod versioning; -pub use metadata::BucketMetadata; +pub use metadata_sys::get_bucket_metadata_sys; diff --git a/ecstore/src/bucket/objectlock/mod.rs b/ecstore/src/bucket/objectlock/mod.rs index c8838398..74fa0f3b 100644 --- a/ecstore/src/bucket/objectlock/mod.rs +++ b/ecstore/src/bucket/objectlock/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)] +#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)] pub enum RetMode { #[default] Govenance, @@ -20,19 +20,19 @@ impl std::str::FromStr for RetMode { } } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct DefaultRetention { pub mode: RetMode, pub days: Option, pub years: Option, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Rule { pub default_retention: DefaultRetention, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Config { pub object_lock_enabled: String, pub rule: Option, diff --git a/ecstore/src/bucket/policy/action.rs b/ecstore/src/bucket/policy/action.rs index d448727c..34b6dc8a 100644 --- a/ecstore/src/bucket/policy/action.rs +++ b/ecstore/src/bucket/policy/action.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ActionSet(HashSet); impl ActionSet {} diff --git a/ecstore/src/bucket/policy/bucket_policy.rs b/ecstore/src/bucket/policy/bucket_policy.rs index 6e0345c5..b769ec66 100644 --- a/ecstore/src/bucket/policy/bucket_policy.rs +++ b/ecstore/src/bucket/policy/bucket_policy.rs @@ -9,7 +9,7 @@ use super::{ resource::ResourceSet, }; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketPolicyArgs { account_name: String, groups: Vec, @@ -20,7 +20,7 @@ pub struct BucketPolicyArgs { object_name: String, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BPStatement { sid: String, effect: Effect, @@ -32,7 +32,7 @@ pub struct BPStatement { conditions: Option, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketPolicy { pub id: String, pub version: String, diff --git a/ecstore/src/bucket/policy/condition/function.rs b/ecstore/src/bucket/policy/condition/function.rs index 7863c5c7..61c9090f 100644 --- a/ecstore/src/bucket/policy/condition/function.rs +++ b/ecstore/src/bucket/policy/condition/function.rs @@ -41,14 +41,14 @@ pub trait FunctionApi { // } // } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] enum Function { #[default] Test, } // 定义Functions类型 -#[derive(Deserialize, Serialize, Default)] +#[derive(Deserialize, Serialize, Default, Clone)] pub struct Functions(Vec); impl Debug for Functions { diff --git a/ecstore/src/bucket/policy/resource.rs b/ecstore/src/bucket/policy/resource.rs index c02fd7dc..32ebc5a3 100644 --- a/ecstore/src/bucket/policy/resource.rs +++ b/ecstore/src/bucket/policy/resource.rs @@ -15,11 +15,11 @@ const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::"; const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::"; // 定义Resource结构体 -#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)] +#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)] pub struct Resource { pattern: String, r#type: ResourceARNType, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct ResourceSet(HashSet); diff --git a/ecstore/src/bucket/quota/mod.rs b/ecstore/src/bucket/quota/mod.rs index c68eb8cb..12c50dc9 100644 --- a/ecstore/src/bucket/quota/mod.rs +++ b/ecstore/src/bucket/quota/mod.rs @@ -7,7 +7,7 @@ pub enum QuotaType { } // 定义BucketQuota结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketQuota { quota: Option, // 使用Option来表示可能不存在的字段 diff --git a/ecstore/src/bucket/replication/and.rs b/ecstore/src/bucket/replication/and.rs index 812dd29e..b9751f0f 100644 --- a/ecstore/src/bucket/replication/and.rs +++ b/ecstore/src/bucket/replication/and.rs @@ -2,7 +2,7 @@ use super::tag::Tag; use serde::{Deserialize, Serialize}; // 定义And结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct And { prefix: Option, tags: Option>, diff --git a/ecstore/src/bucket/replication/filter.rs b/ecstore/src/bucket/replication/filter.rs index a194ec34..0f06b3ef 100644 --- a/ecstore/src/bucket/replication/filter.rs +++ b/ecstore/src/bucket/replication/filter.rs @@ -3,7 +3,7 @@ use super::tag::Tag; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Filter { prefix: String, and: And, diff --git a/ecstore/src/bucket/replication/mod.rs b/ecstore/src/bucket/replication/mod.rs index 56abce48..a50bf7c6 100644 --- a/ecstore/src/bucket/replication/mod.rs +++ b/ecstore/src/bucket/replication/mod.rs @@ -6,7 +6,7 @@ mod tag; use rule::Rule; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Config { rules: Vec, role_arn: String, diff --git a/ecstore/src/bucket/replication/rule.rs b/ecstore/src/bucket/replication/rule.rs index ca688bb9..9930dd60 100644 --- a/ecstore/src/bucket/replication/rule.rs +++ b/ecstore/src/bucket/replication/rule.rs @@ -1,29 +1,29 @@ use super::filter::Filter; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub enum Status { #[default] Enabled, Disabled, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct DeleteMarkerReplication { pub status: Status, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct DeleteReplication { pub status: Status, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExistingObjectReplication { pub status: Status, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Destination { pub bucket: String, pub storage_class: String, @@ -31,18 +31,18 @@ pub struct Destination { } // 定义ReplicaModifications结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ReplicaModifications { status: Status, } // 定义SourceSelectionCriteria结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct SourceSelectionCriteria { replica_modifications: ReplicaModifications, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Rule { pub id: String, pub status: Status, diff --git a/ecstore/src/bucket/replication/tag.rs b/ecstore/src/bucket/replication/tag.rs index 565aeeea..d0853c50 100644 --- a/ecstore/src/bucket/replication/tag.rs +++ b/ecstore/src/bucket/replication/tag.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Tag { pub key: Option, pub value: Option, diff --git a/ecstore/src/bucket/tags/mod.rs b/ecstore/src/bucket/tags/mod.rs index e615af24..62cfb984 100644 --- a/ecstore/src/bucket/tags/mod.rs +++ b/ecstore/src/bucket/tags/mod.rs @@ -1,8 +1,10 @@ +use crate::error::{Error, Result}; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; use std::collections::HashMap; // 定义tagSet结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct TagSet { #[serde(rename = "Tag")] tag_map: HashMap, @@ -10,10 +12,25 @@ pub struct TagSet { } // 定义tagging结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Tags { #[serde(rename = "Tagging")] xml_name: String, #[serde(rename = "TagSet")] tag_set: Option, } + +impl Tags { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Tags = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/target/mod.rs b/ecstore/src/bucket/target/mod.rs index cb4fcbde..e296423b 100644 --- a/ecstore/src/bucket/target/mod.rs +++ b/ecstore/src/bucket/target/mod.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; use time::OffsetDateTime; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Credentials { access_key: String, secret_key: String, @@ -10,13 +10,13 @@ pub struct Credentials { expiration: Option, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub enum ServiceType { #[default] Replication, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct LatencyStat { curr: Duration, // 当前延迟 avg: Duration, // 平均延迟 @@ -24,7 +24,7 @@ pub struct LatencyStat { } // 定义BucketTarget结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct BucketTarget { source_bucket: String, @@ -73,7 +73,7 @@ pub struct BucketTarget { edge: bool, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct BucketTargets { pub targets: Vec, } diff --git a/ecstore/src/bucket/versioning/mod.rs b/ecstore/src/bucket/versioning/mod.rs index 45d114c2..37fdcdfa 100644 --- a/ecstore/src/bucket/versioning/mod.rs +++ b/ecstore/src/bucket/versioning/mod.rs @@ -25,12 +25,12 @@ impl std::fmt::Display for State { } } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExcludedPrefix { pub prefix: String, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Versioning { pub status: State, pub excluded_prefixes: Vec, diff --git a/ecstore/src/bucket_meta.rs b/ecstore/src/bucket_meta.rs index 6ad9d9cc..4a559b75 100644 --- a/ecstore/src/bucket_meta.rs +++ b/ecstore/src/bucket_meta.rs @@ -14,12 +14,10 @@ pub const BUCKET_METADATA_VERSION: u16 = 1; #[derive(Debug, PartialEq, Deserialize, Serialize, Default)] pub struct BucketMetadata { - format: u16, - version: u16, + pub format: u16, + pub version: u16, pub name: String, - #[serde(skip_serializing_if = "Option::is_none", default)] pub tagging: Option>, - #[serde(skip_serializing_if = "Option::is_none", default)] pub created: Option, } @@ -60,7 +58,8 @@ impl BucketMetadata { Ok(buf) } - pub fn unmarshal_from(buffer: &[u8]) -> Result { - Ok(rmp_serde::from_slice(buffer)?) + pub fn unmarshal_from(buf: &[u8]) -> Result { + let t: BucketMetadata = rmp_serde::from_slice(buf)?; + Ok(t) } } diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs new file mode 100644 index 00000000..e7132ed6 --- /dev/null +++ b/ecstore/src/config/common.rs @@ -0,0 +1,54 @@ +use crate::disk::RUSTFS_META_BUCKET; +use crate::error::{Error, Result}; +use crate::store::ECStore; +use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader}; +use http::HeaderMap; +use s3s::dto::StreamingBlob; +use s3s::Body; + +use super::error::ConfigError; + +pub async fn read_config(api: &ECStore, file: &str) -> Result> { + let (data, _obj) = read_config_with_metadata(api, file, &ObjectOptions::default()).await?; + + Ok(data) +} + +async fn read_config_with_metadata(api: &ECStore, file: &str, opts: &ObjectOptions) -> Result<(Vec, ObjectInfo)> { + let range = HTTPRangeSpec::nil(); + let h = HeaderMap::new(); + let mut rd = api.get_object_reader(RUSTFS_META_BUCKET, file, range, h, &opts).await?; + + let data = rd.read_all().await?; + + if data.is_empty() { + return Err(Error::new(ConfigError::NotFound)); + } + + Ok((data, rd.object_info)) +} + +pub async fn save_config(api: &ECStore, file: &str, data: &[u8]) -> Result<()> { + save_config_with_opts( + api, + file, + data, + &ObjectOptions { + max_parity: true, + ..Default::default() + }, + ) + .await +} + +async fn save_config_with_opts(api: &ECStore, file: &str, data: &[u8], opts: &ObjectOptions) -> Result<()> { + let _ = api + .put_object( + RUSTFS_META_BUCKET, + file, + PutObjReader::new(StreamingBlob::from(Body::from(data.to_vec())), data.len()), + &ObjectOptions::default(), + ) + .await?; + Ok(()) +} diff --git a/ecstore/src/config/error.rs b/ecstore/src/config/error.rs new file mode 100644 index 00000000..490b8c50 --- /dev/null +++ b/ecstore/src/config/error.rs @@ -0,0 +1,15 @@ +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("config not found")] + NotFound, +} + +impl ConfigError { + /// Returns `true` if the config error is [`NotFound`]. + /// + /// [`NotFound`]: ConfigError::NotFound + #[must_use] + pub fn is_not_found(&self) -> bool { + matches!(self, Self::NotFound) + } +} diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs new file mode 100644 index 00000000..3c22a0c5 --- /dev/null +++ b/ecstore/src/config/mod.rs @@ -0,0 +1,2 @@ +pub mod common; +pub mod error; diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 059c3f41..d5881427 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -1,5 +1,6 @@ pub mod bucket_meta; mod chunk_stream; +mod config; pub mod disk; pub mod disks_layout; pub mod endpoints; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 783a5dd6..f43564b6 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -19,8 +19,8 @@ use crate::{ store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, store_api::{ BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, - ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, - PutObjReader, StorageAPI, + ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, + PartInfo, PutObjReader, StorageAPI, }, utils::hash, }; @@ -258,6 +258,25 @@ struct DelObj { obj: ObjectToDelete, } +#[async_trait::async_trait] +impl ObjectIO for Sets { + async fn get_object_reader( + &self, + bucket: &str, + object: &str, + range: HTTPRangeSpec, + h: HeaderMap, + opts: &ObjectOptions, + ) -> Result { + self.get_disks_by_key(object) + .get_object_reader(bucket, object, range, h, opts) + .await + } + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { + self.get_disks_by_key(object).put_object(bucket, object, data, opts).await + } +} + #[async_trait::async_trait] impl StorageAPI for Sets { async fn list_bucket(&self, _opts: &BucketOptions) -> Result> { @@ -383,22 +402,6 @@ impl StorageAPI for Sets { .await } - async fn get_object_reader( - &self, - bucket: &str, - object: &str, - range: HTTPRangeSpec, - h: HeaderMap, - opts: &ObjectOptions, - ) -> Result { - self.get_disks_by_key(object) - .get_object_reader(bucket, object, range, h, opts) - .await - } - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { - self.get_disks_by_key(object).put_object(bucket, object, data, opts).await - } - async fn put_object_part( &self, bucket: &str, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 8ad6a81f..f53a8202 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,7 +1,8 @@ #![allow(clippy::map_entry)] use crate::disk::endpoint::EndpointType; +use crate::store_api::ObjectIO; use crate::{ - bucket::BucketMetadata, + bucket::metadata::BucketMetadata, disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, endpoints::{EndpointServerPools, SetupType}, error::{Error, Result}, @@ -494,6 +495,38 @@ pub struct ListPathOptions { pub limit: i32, } +#[async_trait::async_trait] +impl ObjectIO for ECStore { + #[tracing::instrument(level = "debug", skip(self))] + async fn get_object_reader( + &self, + bucket: &str, + object: &str, + range: HTTPRangeSpec, + h: HeaderMap, + opts: &ObjectOptions, + ) -> Result { + let object = utils::path::encode_dir_object(object); + + if self.single_pool() { + return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await; + } + + unimplemented!() + } + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { + // checkPutObjectArgs + + let object = utils::path::encode_dir_object(object); + + if self.single_pool() { + return self.pools[0].put_object(bucket, object.as_str(), data, opts).await; + } + + unimplemented!() + } +} + #[async_trait::async_trait] impl StorageAPI for ECStore { async fn list_bucket(&self, opts: &BucketOptions) -> Result> { @@ -740,34 +773,6 @@ impl StorageAPI for ECStore { unimplemented!() } - async fn get_object_reader( - &self, - bucket: &str, - object: &str, - range: HTTPRangeSpec, - h: HeaderMap, - opts: &ObjectOptions, - ) -> Result { - let object = utils::path::encode_dir_object(object); - - if self.single_pool() { - return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await; - } - - unimplemented!() - } - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { - // checkPutObjectArgs - - let object = utils::path::encode_dir_object(object); - - if self.single_pool() { - return self.pools[0].put_object(bucket, object.as_str(), data, opts).await; - } - - unimplemented!() - } - async fn put_object_part( &self, bucket: &str, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 8673ffd3..d05bfea6 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use crate::error::{Error, Result}; +use futures::StreamExt; use http::HeaderMap; use rmp_serde::Serializer; use s3s::dto::StreamingBlob; @@ -281,12 +282,26 @@ pub struct GetObjectReader { pub object_info: ObjectInfo, } -// impl GetObjectReader { -// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self { -// GetObjectReader { stream, object_info } -// } -// } +impl GetObjectReader { + // pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self { + // GetObjectReader { stream, object_info } + // } + pub async fn read_all(&mut self) -> Result> { + let mut data = Vec::new(); + while let Some(x) = self.stream.next().await { + let buf = match x { + Ok(res) => res, + Err(e) => return Err(Error::msg(e.to_string())), + }; + data.extend_from_slice(buf.as_ref()); + } + + Ok(data) + } +} + +#[derive(Debug)] pub struct HTTPRangeSpec { pub is_suffix_length: bool, pub start: i64, @@ -510,7 +525,20 @@ pub struct DeletedObject { } #[async_trait::async_trait] -pub trait StorageAPI { +pub trait ObjectIO: Send + Sync + 'static { + async fn get_object_reader( + &self, + bucket: &str, + object: &str, + range: HTTPRangeSpec, + h: HeaderMap, + opts: &ObjectOptions, + ) -> Result; + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result; +} + +#[async_trait::async_trait] +pub trait StorageAPI: ObjectIO { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>; async fn list_bucket(&self, opts: &BucketOptions) -> Result>; @@ -536,15 +564,15 @@ pub trait StorageAPI { async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>; - async fn get_object_reader( - &self, - bucket: &str, - object: &str, - range: HTTPRangeSpec, - h: HeaderMap, - opts: &ObjectOptions, - ) -> Result; - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result; + // async fn get_object_reader( + // &self, + // bucket: &str, + // object: &str, + // range: HTTPRangeSpec, + // h: HeaderMap, + // opts: &ObjectOptions, + // ) -> Result; + // async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result; async fn put_object_part( &self, bucket: &str, diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 3df2b6d9..2ab8aedf 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,5 +1,6 @@ use bytes::BufMut; use bytes::Bytes; +use ecstore::bucket::get_bucket_metadata_sys; use ecstore::bucket_meta::BucketMetadata; use ecstore::disk::error::DiskError; use ecstore::disk::RUSTFS_META_BUCKET; @@ -10,6 +11,7 @@ use ecstore::store_api::DeleteBucketOptions; use ecstore::store_api::HTTPRangeSpec; use ecstore::store_api::MakeBucketOptions; use ecstore::store_api::MultipartUploadResult; +use ecstore::store_api::ObjectIO; use ecstore::store_api::ObjectOptions; use ecstore::store_api::ObjectToDelete; use ecstore::store_api::PutObjReader; @@ -769,45 +771,62 @@ impl S3 for FS { .as_ref() .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; - let meta_obj = try_!( - store - .get_object_reader( - RUSTFS_META_BUCKET, - BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), - HTTPRangeSpec::nil(), - Default::default(), - &ObjectOptions::default(), - ) - .await - ); + // let a = get_bucket_metadata_sys(); - let stream = meta_obj.stream; + Ok(S3Response::new(GetBucketTaggingOutput { ..Default::default() })) - let mut data = vec![]; - pin_mut!(stream); + // let meta_obj = try_!( + // store + // .get_object_reader( + // RUSTFS_META_BUCKET, + // BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), + // HTTPRangeSpec::nil(), + // Default::default(), + // &ObjectOptions::default(), + // ) + // .await + // ); - while let Some(x) = stream.next().await { - let x = try_!(x); - data.put_slice(&x[..]); - } + // let stream = meta_obj.stream; - let meta = try_!(BucketMetadata::unmarshal_from(&data[..])); - if meta.tagging.is_none() { - return Err({ - let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist"); - err.set_status_code("404".try_into().unwrap()); - err - }); - } + // let mut data = vec![]; + // pin_mut!(stream); - Ok(S3Response::new(GetBucketTaggingOutput { - tag_set: meta - .tagging - .unwrap() - .into_iter() - .map(|(key, value)| Tag { key, value }) - .collect(), - })) + // while let Some(x) = stream.next().await { + // let x = try_!(x); + // data.put_slice(&x[..]); + // } + + // if data.is_empty() { + // return Err({ + // let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist"); + // err.set_status_code("404".try_into().unwrap()); + // err + // }); + // } + + // warn!("bm: 1111"); + + // let meta = try_!(BucketMetadata::unmarshal_from(&data[..])); + + // warn!("bm 33333"); + // if meta.tagging.is_none() { + // return Err({ + // let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist"); + // err.set_status_code("404".try_into().unwrap()); + // err + // }); + // } + + // warn!("bm:4444"); + // Ok(S3Response::new(GetBucketTaggingOutput { + // tag_set: meta + // .tagging + // .unwrap() + // .into_iter() + // .map(|(key, value)| Tag { key, value }) + // .collect(), + // })) } #[tracing::instrument(level = "debug", skip(self))]