diff --git a/TODO.md b/TODO.md index cbcab874..b158b0aa 100644 --- a/TODO.md +++ b/TODO.md @@ -2,18 +2,18 @@ ## 基础存储 -- [ ] EC可用读写数量判断 Read/WriteQuorum -- [ ] 优化并发执行,边读边取,可中断 +- [x] EC可用读写数量判断 Read/WriteQuorum +- [ ] 优化后台并发执行,可中断 - [ ] 小文件存储到metafile, inlinedata - [ ] 完善bucketmeta -- [ ] 对象锁 -- [ ] 代码优化 使用范型? -- [ ] 抽象出metafile存储 -- [ ] 边读写边hash +- [x] 对象锁 +- [ ] 边读写边hash,实现reader嵌套 - [x] 远程rpc - [x] 错误类型判断,程序中判断错误类型,如何统一错误 - [x] 优化xlmeta, 自定义msg数据结构 -- [x] appendFile, createFile, readFile, walk_dir sync io +- [ ] 优化io.reader 参考 GetObjectNInfo 方便io copy 如果 异步写,再平衡 +- [ ] 代码优化 使用范型? +- [ ] 抽象出metafile存储 ## 基础功能 diff --git a/ecstore/src/bucket/encryption/mod.rs b/ecstore/src/bucket/encryption/mod.rs index f5fee7c7..20e7d315 100644 --- a/ecstore/src/bucket/encryption/mod.rs +++ b/ecstore/src/bucket/encryption/mod.rs @@ -21,20 +21,20 @@ impl std::str::FromStr for Algorithm { } // 定义EncryptionAction结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct EncryptionAction { algorithm: Option, master_key_id: Option, } // 定义Rule结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Rule { default_encryption_action: EncryptionAction, } // 定义BucketSSEConfig结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct BucketSSEConfig { xml_ns: String, xml_name: String, diff --git a/ecstore/src/bucket/error.rs b/ecstore/src/bucket/error.rs new file mode 100644 index 00000000..781f7f11 --- /dev/null +++ b/ecstore/src/bucket/error.rs @@ -0,0 +1,5 @@ +#[derive(Debug, thiserror::Error)] +pub enum BucketMetadataError { + #[error("tagging not found")] + TaggingNotFound, +} diff --git a/ecstore/src/bucket/event/mod.rs b/ecstore/src/bucket/event/mod.rs index 48944fee..d1cb49fb 100644 --- a/ecstore/src/bucket/event/mod.rs +++ b/ecstore/src/bucket/event/mod.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use name::Name; // 定义common结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] struct Common { pub id: String, pub filter: S3Key, @@ -13,57 +13,57 @@ struct Common { } // 定义Queue结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] struct Queue { pub common: Common, pub arn: ARN, } // 定义ARN结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ARN { pub target_id: TargetID, pub region: String, } // 定义TargetID结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct TargetID { pub id: String, pub name: String, } // 定义FilterRule结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct FilterRule { pub name: String, pub value: String, } // 定义FilterRuleList结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct FilterRuleList { pub rules: Vec, } // 定义S3Key结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct S3Key { pub rule_list: FilterRuleList, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Lambda { arn: String, } // 定义Topic结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Topic { arn: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Config { queue_list: Vec, lambda_list: Vec, diff --git a/ecstore/src/bucket/lifecycle/and.rs b/ecstore/src/bucket/lifecycle/and.rs index 38bee917..70b36607 100644 --- a/ecstore/src/bucket/lifecycle/and.rs +++ b/ecstore/src/bucket/lifecycle/and.rs @@ -1,6 +1,6 @@ use super::{prefix::Prefix, tag::Tag}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct And { pub object_size_greater_than: i64, pub object_size_less_than: i64, diff --git a/ecstore/src/bucket/lifecycle/delmarker.rs b/ecstore/src/bucket/lifecycle/delmarker.rs index 06102f21..5ea1d3bb 100644 --- a/ecstore/src/bucket/lifecycle/delmarker.rs +++ b/ecstore/src/bucket/lifecycle/delmarker.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct DelMarkerExpiration { pub days: usize, } diff --git a/ecstore/src/bucket/lifecycle/expiration.rs b/ecstore/src/bucket/lifecycle/expiration.rs index 8fa2e08c..490b454b 100644 --- a/ecstore/src/bucket/lifecycle/expiration.rs +++ b/ecstore/src/bucket/lifecycle/expiration.rs @@ -3,21 +3,21 @@ use time::OffsetDateTime; // ExpirationDays is a type alias to unmarshal Days in Expiration pub type ExpirationDays = usize; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExpirationDate(Option); -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExpireDeleteMarker { pub marker: Boolean, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Boolean { pub val: bool, pub set: bool, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Expiration { pub days: Option, pub date: Option, diff --git a/ecstore/src/bucket/lifecycle/fileter.rs b/ecstore/src/bucket/lifecycle/fileter.rs index 3604384b..e7b35be8 100644 --- a/ecstore/src/bucket/lifecycle/fileter.rs +++ b/ecstore/src/bucket/lifecycle/fileter.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use super::{and::And, prefix::Prefix, tag::Tag}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Filter { pub set: bool, diff --git a/ecstore/src/bucket/lifecycle/lifecycle.rs b/ecstore/src/bucket/lifecycle/lifecycle.rs index b4ef834c..e4f2fa6f 100644 --- a/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -2,7 +2,7 @@ use super::rule::Rule; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Lifecycle { pub rules: Vec, pub expiry_updated_at: Option, diff --git a/ecstore/src/bucket/lifecycle/noncurrentversion.rs b/ecstore/src/bucket/lifecycle/noncurrentversion.rs index 5f39d7d8..3441746f 100644 --- a/ecstore/src/bucket/lifecycle/noncurrentversion.rs +++ b/ecstore/src/bucket/lifecycle/noncurrentversion.rs @@ -1,14 +1,14 @@ use super::{expiration::ExpirationDays, transition::TransitionDays}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct NoncurrentVersionExpiration { pub noncurrent_days: ExpirationDays, pub newer_noncurrent_versions: usize, set: bool, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct NoncurrentVersionTransition { pub noncurrent_days: TransitionDays, pub storage_class: String, diff --git a/ecstore/src/bucket/lifecycle/prefix.rs b/ecstore/src/bucket/lifecycle/prefix.rs index abc09320..657038dc 100644 --- a/ecstore/src/bucket/lifecycle/prefix.rs +++ b/ecstore/src/bucket/lifecycle/prefix.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Prefix { pub val: String, pub set: bool, diff --git a/ecstore/src/bucket/lifecycle/rule.rs b/ecstore/src/bucket/lifecycle/rule.rs index 4c69c7bb..27f6e729 100644 --- a/ecstore/src/bucket/lifecycle/rule.rs +++ b/ecstore/src/bucket/lifecycle/rule.rs @@ -8,14 +8,14 @@ use super::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub enum Status { #[default] Enabled, Disabled, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Rule { pub id: String, pub status: Status, diff --git a/ecstore/src/bucket/lifecycle/tag.rs b/ecstore/src/bucket/lifecycle/tag.rs index 95687a49..5d80bb2c 100644 --- a/ecstore/src/bucket/lifecycle/tag.rs +++ b/ecstore/src/bucket/lifecycle/tag.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Tag { pub key: String, pub value: String, diff --git a/ecstore/src/bucket/lifecycle/transition.rs b/ecstore/src/bucket/lifecycle/transition.rs index 5950de29..54534a4a 100644 --- a/ecstore/src/bucket/lifecycle/transition.rs +++ b/ecstore/src/bucket/lifecycle/transition.rs @@ -3,10 +3,10 @@ use time::OffsetDateTime; pub type TransitionDays = usize; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct TransitionDate(Option); -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Transition { pub days: Option, pub date: Option, diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index 316e5f5b..5f6dd388 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -1,32 +1,47 @@ -use byteorder::{BigEndian, ByteOrder}; +use super::{ + encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy, + quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning, +}; + +use byteorder::{BigEndian, ByteOrder, LittleEndian}; use rmp_serde::Serializer as rmpSerializer; use serde::Serializer; use serde::{Deserialize, Deserializer, Serialize}; use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; -use time::macros::datetime; use time::OffsetDateTime; +use tracing::{error, warn}; -use super::{ - encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy, - quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning, -}; - -use crate::error::Result; +use crate::bucket::tags; +use crate::config; +use crate::config::common::{read_config, save_config}; +use crate::error::{Error, Result}; use crate::disk::BUCKET_META_PREFIX; -use crate::utils::crypto::hex; +use crate::store::ECStore; + +type TypeConfigFile = &'static str; pub const BUCKET_METADATA_FILE: &str = ".metadata.bin"; pub const BUCKET_METADATA_FORMAT: u16 = 1; pub const BUCKET_METADATA_VERSION: u16 = 1; -#[derive(Debug, Deserialize, Serialize)] +pub const BUCKET_POLICY_CONFIG: &str = "policy.json"; +pub const BUCKET_NOTIFICATION_CONFIG: &str = "notification.xml"; +pub const BUCKET_LIFECYCLE_CONFIG: &str = "lifecycle.xml"; +pub const BUCKET_SSECONFIG: &str = "bucket-encryption.xml"; +pub const BUCKET_TAGGING_CONFIG: &str = "tagging.xml"; +pub const BUCKET_QUOTA_CONFIG_FILE: &str = "quota.json"; +pub const OBJECT_LOCK_CONFIG: &str = "object-lock.xml"; +pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml"; +pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml"; +pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json"; + +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "PascalCase", default)] pub struct BucketMetadata { pub name: String, - #[serde(serialize_with = "write_times")] pub created: OffsetDateTime, pub lock_enabled: bool, // 虽然标记为不使用,但可能需要保留 pub policy_config_json: Vec, @@ -41,27 +56,16 @@ pub struct BucketMetadata { pub bucket_targets_config_json: Vec, pub bucket_targets_config_meta_json: Vec, - #[serde(serialize_with = "write_times")] pub policy_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub object_lock_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub encryption_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub tagging_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub quota_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub replication_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub versioning_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub lifecycle_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub notification_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub bucket_targets_config_updated_at: OffsetDateTime, - #[serde(serialize_with = "write_times")] pub bucket_targets_config_meta_updated_at: OffsetDateTime, #[serde(skip)] @@ -147,9 +151,9 @@ impl BucketMetadata { format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE) } - fn msg_size(&self) -> usize { - unimplemented!() - } + // fn msg_size(&self) -> usize { + // unimplemented!() + // } pub fn marshal_msg(&self) -> Result> { let mut buf = Vec::new(); @@ -159,12 +163,162 @@ impl BucketMetadata { Ok(buf) } - pub fn unmarshal(_buf: &[u8]) -> Result { - unimplemented!() + pub fn unmarshal(buf: &[u8]) -> Result { + let t: BucketMetadata = rmp_serde::from_slice(buf)?; + Ok(t) + } + + pub fn check_header(buf: &[u8]) -> Result<()> { + if buf.len() <= 4 { + return Err(Error::msg("read_bucket_metadata: data invalid")); + } + + let format = LittleEndian::read_u16(&buf[0..2]); + let version = LittleEndian::read_u16(&buf[2..4]); + + match format { + BUCKET_METADATA_FORMAT => {} + _ => return Err(Error::msg("read_bucket_metadata: format invalid")), + } + + match version { + BUCKET_METADATA_VERSION => {} + _ => return Err(Error::msg("read_bucket_metadata: version invalid")), + } + + Ok(()) + } + + fn default_timestamps(&mut self) { + if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.tagging_config_updated_at = self.created + } + } + + pub fn update_config(&mut self, config_file: &str, data: Vec) -> Result { + let updated = OffsetDateTime::now_utc(); + + match config_file { + BUCKET_POLICY_CONFIG => { + self.policy_config_json = data; + self.policy_config_updated_at = updated; + } + BUCKET_NOTIFICATION_CONFIG => { + self.notification_config_xml = data; + self.notification_config_updated_at = updated; + } + BUCKET_LIFECYCLE_CONFIG => { + self.lifecycle_config_xml = data; + self.lifecycle_config_updated_at = updated; + } + BUCKET_SSECONFIG => { + self.encryption_config_xml = data; + self.encryption_config_updated_at = updated; + } + BUCKET_TAGGING_CONFIG => { + self.tagging_config_xml = data; + self.tagging_config_updated_at = updated; + } + BUCKET_QUOTA_CONFIG_FILE => { + self.quota_config_json = data; + self.quota_config_updated_at = updated; + } + OBJECT_LOCK_CONFIG => { + self.object_lock_config_xml = data; + self.object_lock_config_updated_at = updated; + } + BUCKET_VERSIONING_CONFIG => { + self.versioning_config_xml = data; + self.versioning_config_updated_at = updated; + } + BUCKET_REPLICATION_CONFIG => { + self.replication_config_xml = data; + self.replication_config_updated_at = updated; + } + BUCKET_TARGETS_FILE => { + self.tagging_config_xml = data; + self.tagging_config_updated_at = updated; + } + _ => return Err(Error::msg(format!("config file not found : {}", config_file))), + } + + Ok(updated) + } + + pub async fn save(&mut self, api: &ECStore) -> Result<()> { + self.parse_all_configs(api)?; + + let mut buf: Vec = vec![0; 4]; + + LittleEndian::write_u16(&mut buf[0..2], BUCKET_METADATA_FORMAT); + + LittleEndian::write_u16(&mut buf[2..4], BUCKET_METADATA_VERSION); + + let data = self.marshal_msg()?; + + buf.extend_from_slice(&data); + + save_config(api, self.save_file_path().as_str(), &buf).await?; + + Ok(()) + } + + fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> { + if !self.tagging_config_xml.is_empty() { + self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?); + } + + Ok(()) } } -fn deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result +pub async fn load_bucket_metadata(api: &ECStore, bucket: &str) -> Result { + load_bucket_metadata_parse(api, bucket, true).await +} + +pub async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result { + let mut bm = match read_bucket_metadata(api, bucket).await { + Ok(res) => res, + Err(err) => { + warn!("load_bucket_metadata_parse err {:?}", &err); + if !config::error::is_not_found(&err) { + return Err(err); + } + + BucketMetadata::new(bucket) + } + }; + + bm.default_timestamps(); + + if parse { + bm.parse_all_configs(api)?; + } + + // TODO: parse_all_configs + + Ok(bm) +} + +async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result { + if bucket.is_empty() { + error!("bucket name empty"); + return Err(Error::msg("invalid argument")); + } + + let bm = BucketMetadata::new(&bucket); + let file_path = bm.save_file_path(); + + let data = read_config(api, &file_path).await?; + + BucketMetadata::check_header(&data)?; + + let bm = BucketMetadata::unmarshal(&data[4..])?; + + Ok(bm) +} + +fn _deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result where S: FromStr, S::Err: Display, @@ -175,7 +329,7 @@ where unimplemented!() } -fn write_times(t: &OffsetDateTime, s: S) -> Result +fn _write_time(t: &OffsetDateTime, s: S) -> Result where S: Serializer, { @@ -183,47 +337,16 @@ where let sec = t.unix_timestamp() - 62135596800; let nsec = t.nanosecond(); - buf[0] = 0xc7; - buf[1] = 0x0c; - buf[2] = 0x05; + buf[0] = 0xc7; // mext8 + buf[1] = 0x0c; // 长度 + buf[2] = 0x05; // 时间扩展类型 BigEndian::write_u64(&mut buf[3..], sec as u64); BigEndian::write_u32(&mut buf[11..], nsec as u32); s.serialize_bytes(&buf) } -fn write_time(t: OffsetDateTime) -> Result<(), String> { - // let t = t.saturating_sub(0); // 转换为自 UNIX_EPOCH 以来的时间 - println!("t:{:?}", t); - println!("offset:{:?}", datetime!(0-01-01 0:00 UTC)); - - let mut buf = vec![0x0; 15]; - - let sec = t.unix_timestamp() - 62135596800; - let nsec = t.nanosecond(); - - println!("sec:{:?}", sec); - println!("nsec:{:?}", nsec); - - buf[0] = 0xc7; - buf[1] = 0x0c; - buf[2] = 0x05; - // 扩展格式的时间类型 - // buf.push(0xc7); // mext8 - // buf.push(12); // 长度 - // buf.push(0x05); // 时间扩展类型 - - // 将 Unix 时间戳和纳秒部分写入缓冲区 - BigEndian::write_u64(&mut buf[3..], sec as u64); - BigEndian::write_u32(&mut buf[11..], nsec as u32); - - println!("hex:{:?}", hex(buf)); - - Ok(()) -} - #[cfg(test)] mod test { - use crate::utils::crypto::hex; use super::*; @@ -235,6 +358,8 @@ mod test { let buf = bm.marshal_msg().unwrap(); - println!("{:?}", hex(buf)) + let new = BucketMetadata::unmarshal(&buf).unwrap(); + + assert_eq!(bm.name, new.name); } } diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs new file mode 100644 index 00000000..2e0711ce --- /dev/null +++ b/ecstore/src/bucket/metadata_sys.rs @@ -0,0 +1,258 @@ +use std::collections::HashSet; +use std::{collections::HashMap, sync::Arc}; + +use crate::bucket::error::BucketMetadataError; +use crate::bucket::metadata::load_bucket_metadata_parse; +use crate::bucket::utils::is_meta_bucketname; +use crate::config; +use crate::config::error::ConfigError; +use crate::disk::error::DiskError; +use crate::error::{Error, Result}; +use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints}; +use crate::store::ECStore; +use futures::future::join_all; +use lazy_static::lazy_static; +use time::OffsetDateTime; +use tokio::sync::RwLock; +use tracing::{error, info, warn}; + +use super::metadata::{load_bucket_metadata, BucketMetadata}; +use super::tags; + +lazy_static! { + static ref GLOBAL_BucketMetadataSys: Arc> = Arc::new(RwLock::new(BucketMetadataSys::new())); +} + +pub async fn init_bucket_metadata_sys(api: ECStore, buckets: Vec) { + let mut sys = GLOBAL_BucketMetadataSys.write().await; + sys.init(api, buckets).await +} +pub async fn get_bucket_metadata_sys() -> Arc> { + GLOBAL_BucketMetadataSys.clone() +} + +pub async fn bucket_metadata_sys_set(bucket: String, bm: BucketMetadata) { + let sys = GLOBAL_BucketMetadataSys.write().await; + sys.set(bucket, bm).await +} + +#[derive(Debug, Default)] +pub struct BucketMetadataSys { + metadata_map: RwLock>, + api: Option, + initialized: RwLock, +} + +impl BucketMetadataSys { + fn new() -> Self { + Self::default() + } + + pub async fn init(&mut self, api: ECStore, buckets: Vec) { + // if api.is_none() { + // return Err(Error::msg("errServerNotInitialized")); + // } + self.api = Some(api); + + let _ = self.init_internal(buckets).await; + } + async fn init_internal(&self, buckets: Vec) -> Result<()> { + let count = { + let endpoints = GLOBAL_Endpoints.read().await; + endpoints.es_count() * 10 + }; + + let mut failed_buckets: HashSet = HashSet::new(); + let mut buckets = buckets.as_slice(); + + loop { + if buckets.len() < count { + self.concurrent_load(buckets, &mut failed_buckets).await; + break; + } + + self.concurrent_load(&buckets[..count], &mut failed_buckets).await; + + buckets = &buckets[count..] + } + + let mut initialized = self.initialized.write().await; + *initialized = true; + + if is_dist_erasure().await { + // TODO: refresh_buckets_metadata_loop + } + + Ok(()) + } + + async fn concurrent_load(&self, buckets: &[String], failed_buckets: &mut HashSet) { + let mut futures = Vec::new(); + + for bucket in buckets.iter() { + futures.push(load_bucket_metadata(self.api.as_ref().unwrap(), bucket.as_str())); + } + + let results = join_all(futures).await; + + let mut idx = 0; + + let mut mp = self.metadata_map.write().await; + + for res in results { + match res { + Ok(res) => { + if let Some(bucket) = buckets.get(idx) { + mp.insert(bucket.clone(), res); + } + } + Err(e) => { + error!("Unable to load bucket metadata, will be retried: {:?}", e); + if let Some(bucket) = buckets.get(idx) { + failed_buckets.insert(bucket.clone()); + } + } + } + + idx += 1; + } + } + + async fn refresh_buckets_metadata_loop(&self, failed_buckets: &HashSet) -> Result<()> { + unimplemented!() + } + + pub async fn get(&self, bucket: &str) -> Result { + if is_meta_bucketname(bucket) { + return Err(Error::new(ConfigError::NotFound)); + } + + let map = self.metadata_map.read().await; + if let Some(bm) = map.get(bucket) { + Ok(bm.clone()) + } else { + Err(Error::new(ConfigError::NotFound)) + } + } + + pub async fn set(&self, bucket: String, bm: BucketMetadata) { + if !is_meta_bucketname(&bucket) { + let mut map = self.metadata_map.write().await; + map.insert(bucket, bm); + } + } + + async fn reset(&mut self) { + let mut map = self.metadata_map.write().await; + map.clear(); + } + + pub async fn update(&mut self, bucket: &str, config_file: &str, data: Vec) -> Result { + self.update_and_parse(bucket, config_file, data, true).await + } + + async fn update_and_parse(&mut self, bucket: &str, config_file: &str, data: Vec, parse: bool) -> Result { + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(Error::msg("errServerNotInitialized")), + }; + + if is_meta_bucketname(&bucket) { + return Err(Error::msg("errInvalidArgument")); + } + + let mut bm = match load_bucket_metadata_parse(store, &bucket, parse).await { + Ok(res) => res, + Err(err) => { + if !is_erasure().await && !is_dist_erasure().await && DiskError::VolumeNotFound.is(&err) { + BucketMetadata::new(&bucket) + } else { + return Err(err); + } + } + }; + + let updated = bm.update_config(config_file, data)?; + + self.save(&mut bm).await?; + + Ok(updated) + } + + async fn save(&self, bm: &mut BucketMetadata) -> Result<()> { + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(Error::msg("errServerNotInitialized")), + }; + + if is_meta_bucketname(&bm.name) { + return Err(Error::msg("errInvalidArgument")); + } + + bm.save(store).await?; + + self.set(bm.name.clone(), bm.clone()).await; + + Ok(()) + } + + pub async fn get_config(&self, bucket: &str) -> Result<(BucketMetadata, bool)> { + if let Some(api) = self.api.as_ref() { + let has_bm = { + let map = self.metadata_map.read().await; + if let Some(bm) = map.get(&bucket.to_string()) { + Some(bm.clone()) + } else { + None + } + }; + + if let Some(bm) = has_bm { + return Ok((bm, false)); + } else { + let bm = match load_bucket_metadata(&api, bucket).await { + Ok(res) => res, + Err(err) => { + if *self.initialized.read().await { + return Err(Error::msg("errBucketMetadataNotInitialized")); + } else { + return Err(err); + } + } + }; + + let mut map = self.metadata_map.write().await; + + map.insert(bucket.to_string(), bm.clone()); + + Ok((bm, true)) + } + } else { + Err(Error::msg("errBucketMetadataNotInitialized")) + } + } + + pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_tagging_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::TaggingNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.tagging_config { + Ok((config, bm.tagging_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::TaggingNotFound)) + } + } +} diff --git a/ecstore/src/bucket/mod.rs b/ecstore/src/bucket/mod.rs index 9667266f..a905fba1 100644 --- a/ecstore/src/bucket/mod.rs +++ b/ecstore/src/bucket/mod.rs @@ -1,13 +1,16 @@ mod encryption; +mod error; mod event; mod lifecycle; -mod metadata; +pub mod metadata; +mod metadata_sys; mod objectlock; mod policy; mod quota; mod replication; -mod tags; +pub mod tags; mod target; +pub mod utils; mod versioning; -pub use metadata::BucketMetadata; +pub use metadata_sys::{bucket_metadata_sys_set, get_bucket_metadata_sys, init_bucket_metadata_sys}; diff --git a/ecstore/src/bucket/objectlock/mod.rs b/ecstore/src/bucket/objectlock/mod.rs index c8838398..74fa0f3b 100644 --- a/ecstore/src/bucket/objectlock/mod.rs +++ b/ecstore/src/bucket/objectlock/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)] +#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)] pub enum RetMode { #[default] Govenance, @@ -20,19 +20,19 @@ impl std::str::FromStr for RetMode { } } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct DefaultRetention { pub mode: RetMode, pub days: Option, pub years: Option, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Rule { pub default_retention: DefaultRetention, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Config { pub object_lock_enabled: String, pub rule: Option, diff --git a/ecstore/src/bucket/policy/action.rs b/ecstore/src/bucket/policy/action.rs index d448727c..34b6dc8a 100644 --- a/ecstore/src/bucket/policy/action.rs +++ b/ecstore/src/bucket/policy/action.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ActionSet(HashSet); impl ActionSet {} diff --git a/ecstore/src/bucket/policy/bucket_policy.rs b/ecstore/src/bucket/policy/bucket_policy.rs index 6e0345c5..b769ec66 100644 --- a/ecstore/src/bucket/policy/bucket_policy.rs +++ b/ecstore/src/bucket/policy/bucket_policy.rs @@ -9,7 +9,7 @@ use super::{ resource::ResourceSet, }; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketPolicyArgs { account_name: String, groups: Vec, @@ -20,7 +20,7 @@ pub struct BucketPolicyArgs { object_name: String, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BPStatement { sid: String, effect: Effect, @@ -32,7 +32,7 @@ pub struct BPStatement { conditions: Option, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketPolicy { pub id: String, pub version: String, diff --git a/ecstore/src/bucket/policy/condition/function.rs b/ecstore/src/bucket/policy/condition/function.rs index 7863c5c7..61c9090f 100644 --- a/ecstore/src/bucket/policy/condition/function.rs +++ b/ecstore/src/bucket/policy/condition/function.rs @@ -41,14 +41,14 @@ pub trait FunctionApi { // } // } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] enum Function { #[default] Test, } // 定义Functions类型 -#[derive(Deserialize, Serialize, Default)] +#[derive(Deserialize, Serialize, Default, Clone)] pub struct Functions(Vec); impl Debug for Functions { diff --git a/ecstore/src/bucket/policy/resource.rs b/ecstore/src/bucket/policy/resource.rs index c02fd7dc..32ebc5a3 100644 --- a/ecstore/src/bucket/policy/resource.rs +++ b/ecstore/src/bucket/policy/resource.rs @@ -15,11 +15,11 @@ const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::"; const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::"; // 定义Resource结构体 -#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)] +#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)] pub struct Resource { pattern: String, r#type: ResourceARNType, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct ResourceSet(HashSet); diff --git a/ecstore/src/bucket/quota/mod.rs b/ecstore/src/bucket/quota/mod.rs index c68eb8cb..12c50dc9 100644 --- a/ecstore/src/bucket/quota/mod.rs +++ b/ecstore/src/bucket/quota/mod.rs @@ -7,7 +7,7 @@ pub enum QuotaType { } // 定义BucketQuota结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketQuota { quota: Option, // 使用Option来表示可能不存在的字段 diff --git a/ecstore/src/bucket/replication/and.rs b/ecstore/src/bucket/replication/and.rs index 812dd29e..b9751f0f 100644 --- a/ecstore/src/bucket/replication/and.rs +++ b/ecstore/src/bucket/replication/and.rs @@ -2,7 +2,7 @@ use super::tag::Tag; use serde::{Deserialize, Serialize}; // 定义And结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct And { prefix: Option, tags: Option>, diff --git a/ecstore/src/bucket/replication/filter.rs b/ecstore/src/bucket/replication/filter.rs index a194ec34..0f06b3ef 100644 --- a/ecstore/src/bucket/replication/filter.rs +++ b/ecstore/src/bucket/replication/filter.rs @@ -3,7 +3,7 @@ use super::tag::Tag; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Filter { prefix: String, and: And, diff --git a/ecstore/src/bucket/replication/mod.rs b/ecstore/src/bucket/replication/mod.rs index 56abce48..a50bf7c6 100644 --- a/ecstore/src/bucket/replication/mod.rs +++ b/ecstore/src/bucket/replication/mod.rs @@ -6,7 +6,7 @@ mod tag; use rule::Rule; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Config { rules: Vec, role_arn: String, diff --git a/ecstore/src/bucket/replication/rule.rs b/ecstore/src/bucket/replication/rule.rs index ca688bb9..9930dd60 100644 --- a/ecstore/src/bucket/replication/rule.rs +++ b/ecstore/src/bucket/replication/rule.rs @@ -1,29 +1,29 @@ use super::filter::Filter; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub enum Status { #[default] Enabled, Disabled, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct DeleteMarkerReplication { pub status: Status, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct DeleteReplication { pub status: Status, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExistingObjectReplication { pub status: Status, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Destination { pub bucket: String, pub storage_class: String, @@ -31,18 +31,18 @@ pub struct Destination { } // 定义ReplicaModifications结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ReplicaModifications { status: Status, } // 定义SourceSelectionCriteria结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct SourceSelectionCriteria { replica_modifications: ReplicaModifications, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Rule { pub id: String, pub status: Status, diff --git a/ecstore/src/bucket/replication/tag.rs b/ecstore/src/bucket/replication/tag.rs index 565aeeea..d0853c50 100644 --- a/ecstore/src/bucket/replication/tag.rs +++ b/ecstore/src/bucket/replication/tag.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Tag { pub key: Option, pub value: Option, diff --git a/ecstore/src/bucket/tags/mod.rs b/ecstore/src/bucket/tags/mod.rs index e615af24..34c0ca79 100644 --- a/ecstore/src/bucket/tags/mod.rs +++ b/ecstore/src/bucket/tags/mod.rs @@ -1,19 +1,37 @@ +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; use std::collections::HashMap; // 定义tagSet结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct TagSet { - #[serde(rename = "Tag")] - tag_map: HashMap, - is_object: bool, + pub tag_map: HashMap, + pub is_object: bool, } // 定义tagging结构体 -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Tags { - #[serde(rename = "Tagging")] - xml_name: String, - #[serde(rename = "TagSet")] - tag_set: Option, + pub tag_set: TagSet, +} + +impl Tags { + pub fn new(tag_map: HashMap, is_object: bool) -> Self { + Self { + tag_set: TagSet { tag_map, is_object }, + } + } + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Tags = rmp_serde::from_slice(buf)?; + Ok(t) + } } diff --git a/ecstore/src/bucket/target/mod.rs b/ecstore/src/bucket/target/mod.rs index cb4fcbde..e296423b 100644 --- a/ecstore/src/bucket/target/mod.rs +++ b/ecstore/src/bucket/target/mod.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; use time::OffsetDateTime; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Credentials { access_key: String, secret_key: String, @@ -10,13 +10,13 @@ pub struct Credentials { expiration: Option, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub enum ServiceType { #[default] Replication, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct LatencyStat { curr: Duration, // 当前延迟 avg: Duration, // 平均延迟 @@ -24,7 +24,7 @@ pub struct LatencyStat { } // 定义BucketTarget结构体 -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct BucketTarget { source_bucket: String, @@ -73,7 +73,7 @@ pub struct BucketTarget { edge: bool, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct BucketTargets { pub targets: Vec, } diff --git a/ecstore/src/bucket/utils.rs b/ecstore/src/bucket/utils.rs new file mode 100644 index 00000000..a8cb97b6 --- /dev/null +++ b/ecstore/src/bucket/utils.rs @@ -0,0 +1,5 @@ +use crate::disk::RUSTFS_META_BUCKET; + +pub fn is_meta_bucketname(name: &str) -> bool { + name.starts_with(RUSTFS_META_BUCKET) +} diff --git a/ecstore/src/bucket/versioning/mod.rs b/ecstore/src/bucket/versioning/mod.rs index 45d114c2..37fdcdfa 100644 --- a/ecstore/src/bucket/versioning/mod.rs +++ b/ecstore/src/bucket/versioning/mod.rs @@ -25,12 +25,12 @@ impl std::fmt::Display for State { } } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct ExcludedPrefix { pub prefix: String, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default,Clone)] pub struct Versioning { pub status: State, pub excluded_prefixes: Vec, diff --git a/ecstore/src/bucket_meta.rs b/ecstore/src/bucket_meta.rs index 6ad9d9cc..4a559b75 100644 --- a/ecstore/src/bucket_meta.rs +++ b/ecstore/src/bucket_meta.rs @@ -14,12 +14,10 @@ pub const BUCKET_METADATA_VERSION: u16 = 1; #[derive(Debug, PartialEq, Deserialize, Serialize, Default)] pub struct BucketMetadata { - format: u16, - version: u16, + pub format: u16, + pub version: u16, pub name: String, - #[serde(skip_serializing_if = "Option::is_none", default)] pub tagging: Option>, - #[serde(skip_serializing_if = "Option::is_none", default)] pub created: Option, } @@ -60,7 +58,8 @@ impl BucketMetadata { Ok(buf) } - pub fn unmarshal_from(buffer: &[u8]) -> Result { - Ok(rmp_serde::from_slice(buffer)?) + pub fn unmarshal_from(buf: &[u8]) -> Result { + let t: BucketMetadata = rmp_serde::from_slice(buf)?; + Ok(t) } } diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs new file mode 100644 index 00000000..62e53252 --- /dev/null +++ b/ecstore/src/config/common.rs @@ -0,0 +1,55 @@ +use crate::disk::RUSTFS_META_BUCKET; +use crate::error::{Error, Result}; +use crate::store::ECStore; +use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader}; +use http::HeaderMap; +use s3s::dto::StreamingBlob; +use s3s::Body; +use tracing::warn; + +use super::error::ConfigError; + +pub async fn read_config(api: &ECStore, file: &str) -> Result> { + let (data, _obj) = read_config_with_metadata(api, file, &ObjectOptions::default()).await?; + + Ok(data) +} + +async fn read_config_with_metadata(api: &ECStore, file: &str, opts: &ObjectOptions) -> Result<(Vec, ObjectInfo)> { + let range = HTTPRangeSpec::nil(); + let h = HeaderMap::new(); + let mut rd = api.get_object_reader(RUSTFS_META_BUCKET, file, range, h, &opts).await?; + + let data = rd.read_all().await?; + + if data.is_empty() { + return Err(Error::new(ConfigError::NotFound)); + } + + Ok((data, rd.object_info)) +} + +pub async fn save_config(api: &ECStore, file: &str, data: &[u8]) -> Result<()> { + save_config_with_opts( + api, + file, + data, + &ObjectOptions { + max_parity: true, + ..Default::default() + }, + ) + .await +} + +async fn save_config_with_opts(api: &ECStore, file: &str, data: &[u8], opts: &ObjectOptions) -> Result<()> { + let _ = api + .put_object( + RUSTFS_META_BUCKET, + file, + PutObjReader::new(StreamingBlob::from(Body::from(data.to_vec())), data.len()), + opts, + ) + .await?; + Ok(()) +} diff --git a/ecstore/src/config/error.rs b/ecstore/src/config/error.rs new file mode 100644 index 00000000..1628f31d --- /dev/null +++ b/ecstore/src/config/error.rs @@ -0,0 +1,25 @@ +use crate::error::Error; + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("config not found")] + NotFound, +} + +impl ConfigError { + /// Returns `true` if the config error is [`NotFound`]. + /// + /// [`NotFound`]: ConfigError::NotFound + #[must_use] + pub fn is_not_found(&self) -> bool { + matches!(self, Self::NotFound) + } +} + +pub fn is_not_found(err: &Error) -> bool { + if let Some(e) = err.downcast_ref::() { + ConfigError::is_not_found(&e) + } else { + false + } +} diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs new file mode 100644 index 00000000..3c22a0c5 --- /dev/null +++ b/ecstore/src/config/mod.rs @@ -0,0 +1,2 @@ +pub mod common; +pub mod error; diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 9af5b963..14547bbf 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -10,7 +10,7 @@ use std::{ }; /// enum for setup type. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum SetupType { /// starts with unknown setup type. Unknown, @@ -111,6 +111,7 @@ impl Endpoints { } } +#[derive(Debug)] /// a temporary type to holds the list of endpoints struct PoolEndpointList { inner: Vec, @@ -387,7 +388,7 @@ pub struct PoolEndpoints { /// list of list of endpoints #[derive(Debug, Clone)] -pub struct EndpointServerPools(Vec); +pub struct EndpointServerPools(pub Vec); impl From> for EndpointServerPools { fn from(v: Vec) -> Self { @@ -408,6 +409,9 @@ impl AsMut> for EndpointServerPools { } impl EndpointServerPools { + pub fn reset(&mut self, eps: Vec) { + self.0 = eps; + } pub fn from_volumes(server_addr: &str, endpoints: Vec) -> Result<(EndpointServerPools, SetupType)> { let layouts = DisksLayout::try_from(endpoints.as_slice())?; @@ -439,6 +443,10 @@ impl EndpointServerPools { Ok((ret, pool_eps.setup_type)) } + pub fn es_count(&self) -> usize { + self.0.iter().map(|v| v.set_count).count() + } + /// add pool endpoints pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> { let mut exits = HashSet::new(); diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs new file mode 100644 index 00000000..1b24924c --- /dev/null +++ b/ecstore/src/global.rs @@ -0,0 +1,61 @@ +use lazy_static::lazy_static; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; + +use crate::{ + disk::DiskStore, + endpoints::{EndpointServerPools, PoolEndpoints, SetupType}, + store::ECStore, +}; + +lazy_static! { + pub static ref GLOBAL_OBJECT_API: Arc>> = Arc::new(RwLock::new(None)); + pub static ref GLOBAL_LOCAL_DISK: Arc>>> = Arc::new(RwLock::new(Vec::new())); + static ref GLOBAL_IsErasure: RwLock = RwLock::new(false); + static ref GLOBAL_IsDistErasure: RwLock = RwLock::new(false); + static ref GLOBAL_IsErasureSD: RwLock = RwLock::new(false); + pub static ref GLOBAL_LOCAL_DISK_MAP: Arc>>> = Arc::new(RwLock::new(HashMap::new())); + pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); + pub static ref GLOBAL_Endpoints: RwLock = RwLock::new(EndpointServerPools(Vec::new())); +} + +pub async fn set_global_endpoints(eps: Vec) { + let mut endpoints = GLOBAL_Endpoints.write().await; + endpoints.reset(eps); +} + +pub fn new_object_layer_fn() -> Arc>> { + GLOBAL_OBJECT_API.clone() +} + +pub async fn set_object_layer(o: ECStore) { + let mut global_object_api = GLOBAL_OBJECT_API.write().await; + *global_object_api = Some(o); +} + +pub async fn is_dist_erasure() -> bool { + let lock = GLOBAL_IsDistErasure.read().await; + *lock == true +} + +pub async fn is_erasure() -> bool { + let lock = GLOBAL_IsErasure.read().await; + *lock == true +} + +pub async fn update_erasure_type(setup_type: SetupType) { + let mut is_erasure = GLOBAL_IsErasure.write().await; + *is_erasure = setup_type == SetupType::Erasure; + + let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await; + *is_dist_erasure = setup_type == SetupType::DistErasure; + + if *is_dist_erasure { + *is_erasure = true + } + + let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await; + *is_erasure_sd = setup_type == SetupType::ErasureSD; +} + +type TypeLocalDiskSetDrives = Vec>>>; diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 059c3f41..bd8bced9 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -1,11 +1,13 @@ pub mod bucket_meta; mod chunk_stream; +mod config; pub mod disk; pub mod disks_layout; pub mod endpoints; pub mod erasure; pub mod error; mod file_meta; +mod global; pub mod peer; mod quorum; pub mod set_disk; @@ -17,3 +19,7 @@ mod store_init; mod utils; pub mod bucket; + +pub use global::new_object_layer_fn; +pub use global::set_global_endpoints; +pub use global::update_erasure_type; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 27256de8..a1b6deb0 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -26,7 +26,7 @@ pub trait PeerS3Client: Debug + Sync + Send + 'static { fn get_pools(&self) -> Option>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct S3PeerSys { pub clients: Vec, pub pools_count: usize, diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 783a5dd6..e3236fe0 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -15,12 +15,12 @@ use crate::{ }, endpoints::PoolEndpoints, error::{Error, Result}, + global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, set_disk::SetDisks, - store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, store_api::{ BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, - ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, - PutObjReader, StorageAPI, + ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, + PartInfo, PutObjReader, StorageAPI, }, utils::hash, }; @@ -94,7 +94,7 @@ impl Sets { continue; } - if disk.as_ref().unwrap().is_local() && *GLOBAL_IsDistErasure.read().await { + if disk.as_ref().unwrap().is_local() && is_dist_erasure().await { let local_disk = { let local_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.read().await; local_set_drives[pool_idx][i][j].clone() @@ -124,7 +124,7 @@ impl Sets { let set_disks = SetDisks { lockers: lockers[i].clone(), locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(), - ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))), + ns_mutex: Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))), disks: RwLock::new(set_drive), set_drive_count, default_parity_count: partiy_count, @@ -258,6 +258,25 @@ struct DelObj { obj: ObjectToDelete, } +#[async_trait::async_trait] +impl ObjectIO for Sets { + async fn get_object_reader( + &self, + bucket: &str, + object: &str, + range: HTTPRangeSpec, + h: HeaderMap, + opts: &ObjectOptions, + ) -> Result { + self.get_disks_by_key(object) + .get_object_reader(bucket, object, range, h, opts) + .await + } + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { + self.get_disks_by_key(object).put_object(bucket, object, data, opts).await + } +} + #[async_trait::async_trait] impl StorageAPI for Sets { async fn list_bucket(&self, _opts: &BucketOptions) -> Result> { @@ -383,22 +402,6 @@ impl StorageAPI for Sets { .await } - async fn get_object_reader( - &self, - bucket: &str, - object: &str, - range: HTTPRangeSpec, - h: HeaderMap, - opts: &ObjectOptions, - ) -> Result { - self.get_disks_by_key(object) - .get_object_reader(bucket, object, range, h, opts) - .await - } - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { - self.get_disks_by_key(object).put_object(bucket, object, data, opts).await - } - async fn put_object_part( &self, bucket: &str, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 8ad6a81f..41bcba04 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,9 +1,12 @@ #![allow(clippy::map_entry)] +use crate::bucket::bucket_metadata_sys_set; use crate::disk::endpoint::EndpointType; +use crate::global::{is_dist_erasure, set_object_layer, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES}; +use crate::store_api::ObjectIO; use crate::{ - bucket::BucketMetadata, + bucket::metadata::BucketMetadata, disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, - endpoints::{EndpointServerPools, SetupType}, + endpoints::EndpointServerPools, error::{Error, Result}, peer::S3PeerSys, sets::Sets, @@ -19,137 +22,19 @@ use backon::{ExponentialBuilder, Retryable}; use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port}; use futures::future::join_all; use http::HeaderMap; -use s3s::{dto::StreamingBlob, Body}; use std::{ collections::{HashMap, HashSet}, sync::Arc, time::Duration, }; use time::OffsetDateTime; +use tokio::fs; use tokio::sync::Semaphore; -use tokio::{fs, sync::RwLock}; + use tracing::{debug, info}; use uuid::Uuid; -use lazy_static::lazy_static; - -lazy_static! { - pub static ref GLOBAL_IsErasure: RwLock = RwLock::new(false); - pub static ref GLOBAL_IsDistErasure: RwLock = RwLock::new(false); - pub static ref GLOBAL_IsErasureSD: RwLock = RwLock::new(false); -} - -pub async fn update_erasure_type(setup_type: SetupType) { - let mut is_erasure = GLOBAL_IsErasure.write().await; - *is_erasure = setup_type == SetupType::Erasure; - - let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await; - *is_dist_erasure = setup_type == SetupType::DistErasure; - - if *is_dist_erasure { - *is_erasure = true - } - - let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await; - *is_erasure_sd = setup_type == SetupType::ErasureSD; -} - -type TypeLocalDiskSetDrives = Vec>>>; - -lazy_static! { - pub static ref GLOBAL_LOCAL_DISK_MAP: Arc>>> = Arc::new(RwLock::new(HashMap::new())); - pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); -} - -pub async fn find_local_disk(disk_path: &String) -> Option { - let disk_path = match fs::canonicalize(disk_path).await { - Ok(disk_path) => disk_path, - Err(_) => return None, - }; - - let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - - let path = disk_path.to_string_lossy().to_string(); - if disk_map.contains_key(&path) { - let a = disk_map[&path].as_ref().cloned(); - - return a; - } - None -} - -pub async fn all_local_disk_path() -> Vec { - let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - disk_map.keys().cloned().collect() -} - -pub async fn all_local_disk() -> Vec { - let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - disk_map - .values() - .filter(|v| v.is_some()) - .map(|v| v.as_ref().unwrap().clone()) - .collect() -} - -// init_local_disks 初始化本地磁盘,server启动前必须初始化成功 -pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> { - let opt = &DiskOption { - cleanup: true, - health_check: true, - }; - - let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await; - for pool_eps in endpoint_pools.as_ref().iter() { - let mut set_count_drives = Vec::with_capacity(pool_eps.set_count); - for _ in 0..pool_eps.set_count { - set_count_drives.push(vec![None; pool_eps.drives_per_set]); - } - - global_set_drives.push(set_count_drives); - } - - let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await; - - for pool_eps in endpoint_pools.as_ref().iter() { - let mut set_drives = HashMap::new(); - for ep in pool_eps.endpoints.as_ref().iter() { - if !ep.is_local { - continue; - } - - let disk = new_disk(ep, opt).await?; - - let path = disk.path().to_string_lossy().to_string(); - - global_local_disk_map.insert(path, Some(disk.clone())); - - set_drives.insert(ep.disk_idx, Some(disk.clone())); - - if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() { - global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone()); - } - } - } - - Ok(()) -} - -lazy_static! { - pub static ref GLOBAL_OBJECT_API: Arc>> = Arc::new(RwLock::new(None)); - pub static ref GLOBAL_LOCAL_DISK: Arc>>> = Arc::new(RwLock::new(Vec::new())); -} - -pub fn new_object_layer_fn() -> Arc>> { - GLOBAL_OBJECT_API.clone() -} - -async fn set_object_layer(o: ECStore) { - let mut global_object_api = GLOBAL_OBJECT_API.write().await; - *global_object_api = Some(o); -} - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ECStore { pub id: uuid::Uuid, // pub disks: Vec, @@ -161,7 +46,7 @@ pub struct ECStore { impl ECStore { #[allow(clippy::new_ret_no_self)] - pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<()> { + pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result { // let layouts = DisksLayout::try_from(endpoints.as_slice())?; let mut deployment_id = None; @@ -243,7 +128,7 @@ impl ECStore { } // 替换本地磁盘 - if !*GLOBAL_IsDistErasure.read().await { + if !is_dist_erasure().await { let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await; for disk in local_disks { let path = disk.path().to_string_lossy().to_string(); @@ -260,9 +145,9 @@ impl ECStore { peer_sys, }; - set_object_layer(ec).await; + set_object_layer(ec.clone()).await; - Ok(()) + Ok(ec) } pub fn init_local_disks() {} @@ -402,6 +287,80 @@ impl ECStore { } } +pub async fn find_local_disk(disk_path: &String) -> Option { + let disk_path = match fs::canonicalize(disk_path).await { + Ok(disk_path) => disk_path, + Err(_) => return None, + }; + + let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; + + let path = disk_path.to_string_lossy().to_string(); + if disk_map.contains_key(&path) { + let a = disk_map[&path].as_ref().cloned(); + + return a; + } + None +} + +pub async fn all_local_disk_path() -> Vec { + let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; + disk_map.keys().cloned().collect() +} + +pub async fn all_local_disk() -> Vec { + let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; + disk_map + .values() + .filter(|v| v.is_some()) + .map(|v| v.as_ref().unwrap().clone()) + .collect() +} + +// init_local_disks 初始化本地磁盘,server启动前必须初始化成功 +pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> { + let opt = &DiskOption { + cleanup: true, + health_check: true, + }; + + let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await; + for pool_eps in endpoint_pools.as_ref().iter() { + let mut set_count_drives = Vec::with_capacity(pool_eps.set_count); + for _ in 0..pool_eps.set_count { + set_count_drives.push(vec![None; pool_eps.drives_per_set]); + } + + global_set_drives.push(set_count_drives); + } + + let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await; + + for pool_eps in endpoint_pools.as_ref().iter() { + let mut set_drives = HashMap::new(); + for ep in pool_eps.endpoints.as_ref().iter() { + if !ep.is_local { + continue; + } + + let disk = new_disk(ep, opt).await?; + + let path = disk.path().to_string_lossy().to_string(); + + global_local_disk_map.insert(path, Some(disk.clone())); + + set_drives.insert(ep.disk_idx, Some(disk.clone())); + + if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() { + global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone()); + } + } + } + + Ok(()) +} + async fn internal_get_pool_info_existing_with_opts( pools: &[Arc], bucket: &str, @@ -494,6 +453,38 @@ pub struct ListPathOptions { pub limit: i32, } +#[async_trait::async_trait] +impl ObjectIO for ECStore { + #[tracing::instrument(level = "debug", skip(self))] + async fn get_object_reader( + &self, + bucket: &str, + object: &str, + range: HTTPRangeSpec, + h: HeaderMap, + opts: &ObjectOptions, + ) -> Result { + let object = utils::path::encode_dir_object(object); + + if self.single_pool() { + return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await; + } + + unimplemented!() + } + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { + // checkPutObjectArgs + + let object = utils::path::encode_dir_object(object); + + if self.single_pool() { + return self.pools[0].put_object(bucket, object.as_str(), data, opts).await; + } + + unimplemented!() + } +} + #[async_trait::async_trait] impl StorageAPI for ECStore { async fn list_bucket(&self, opts: &BucketOptions) -> Result> { @@ -516,28 +507,10 @@ impl StorageAPI for ECStore { // TODO: delete created bucket when error self.peer_sys.make_bucket(bucket, opts).await?; - let meta = BucketMetadata::new(bucket); - let data = meta.marshal_msg()?; - let file_path = meta.save_file_path(); + let mut meta = BucketMetadata::new(bucket); + meta.save(self).await?; - // TODO: wrap hash reader - - let content_len = data.len(); - - let body = Body::from(data); - - let reader = PutObjReader::new(StreamingBlob::from(body), content_len); - - self.put_object( - RUSTFS_META_BUCKET, - &file_path, - reader, - &ObjectOptions { - max_parity: true, - ..Default::default() - }, - ) - .await?; + bucket_metadata_sys_set(bucket.to_string(), meta).await; // TODO: toObjectErr @@ -740,34 +713,6 @@ impl StorageAPI for ECStore { unimplemented!() } - async fn get_object_reader( - &self, - bucket: &str, - object: &str, - range: HTTPRangeSpec, - h: HeaderMap, - opts: &ObjectOptions, - ) -> Result { - let object = utils::path::encode_dir_object(object); - - if self.single_pool() { - return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await; - } - - unimplemented!() - } - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result { - // checkPutObjectArgs - - let object = utils::path::encode_dir_object(object); - - if self.single_pool() { - return self.pools[0].put_object(bucket, object.as_str(), data, opts).await; - } - - unimplemented!() - } - async fn put_object_part( &self, bucket: &str, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 8673ffd3..8289ec22 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use crate::error::{Error, Result}; +use futures::StreamExt; use http::HeaderMap; use rmp_serde::Serializer; use s3s::dto::StreamingBlob; @@ -281,12 +282,26 @@ pub struct GetObjectReader { pub object_info: ObjectInfo, } -// impl GetObjectReader { -// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self { -// GetObjectReader { stream, object_info } -// } -// } +impl GetObjectReader { + // pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self { + // GetObjectReader { stream, object_info } + // } + pub async fn read_all(&mut self) -> Result> { + let mut data = Vec::new(); + while let Some(x) = self.stream.next().await { + let buf = match x { + Ok(res) => res, + Err(e) => return Err(Error::msg(e.to_string())), + }; + data.extend_from_slice(buf.as_ref()); + } + + Ok(data) + } +} + +#[derive(Debug)] pub struct HTTPRangeSpec { pub is_suffix_length: bool, pub start: i64, @@ -399,7 +414,11 @@ pub struct ObjectOptions { // } #[derive(Debug, Default, Serialize, Deserialize)] -pub struct BucketOptions {} +pub struct BucketOptions { + pub deleted: bool, // true only when site replication is enabled + pub cached: bool, // true only when we are requesting a cached response instead of hitting the disk for example ListBuckets() call. + pub no_metadata: bool, +} #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BucketInfo { @@ -510,7 +529,20 @@ pub struct DeletedObject { } #[async_trait::async_trait] -pub trait StorageAPI { +pub trait ObjectIO: Send + Sync + 'static { + async fn get_object_reader( + &self, + bucket: &str, + object: &str, + range: HTTPRangeSpec, + h: HeaderMap, + opts: &ObjectOptions, + ) -> Result; + async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result; +} + +#[async_trait::async_trait] +pub trait StorageAPI: ObjectIO { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>; async fn list_bucket(&self, opts: &BucketOptions) -> Result>; @@ -536,15 +568,15 @@ pub trait StorageAPI { async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>; - async fn get_object_reader( - &self, - bucket: &str, - object: &str, - range: HTTPRangeSpec, - h: HeaderMap, - opts: &ObjectOptions, - ) -> Result; - async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result; + // async fn get_object_reader( + // &self, + // bucket: &str, + // object: &str, + // range: HTTPRangeSpec, + // h: HeaderMap, + // opts: &ObjectOptions, + // ) -> Result; + // async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result; async fn put_object_part( &self, bucket: &str, diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 0c32c263..c23e04a6 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -6,8 +6,12 @@ mod storage; use clap::Parser; use common::error::{Error, Result}; use ecstore::{ + bucket::init_bucket_metadata_sys, endpoints::EndpointServerPools, - store::{init_local_disks, update_erasure_type, ECStore}, + set_global_endpoints, + store::{init_local_disks, ECStore}, + store_api::{BucketOptions, StorageAPI}, + update_erasure_type, }; use grpc::make_server; use hyper_util::{ @@ -89,7 +93,7 @@ async fn run(opt: config::Opt) -> Result<()> { // 用于rpc let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone()) .map_err(|err| Error::from_string(err.to_string()))?; - + set_global_endpoints(endpoint_pools.as_ref().clone()).await; update_erasure_type(setup_type).await; // 初始化本地磁盘 @@ -179,11 +183,23 @@ async fn run(opt: config::Opt) -> Result<()> { }); // init store - ECStore::new(opt.address.clone(), endpoint_pools.clone()) + let store = ECStore::new(opt.address.clone(), endpoint_pools.clone()) .await .map_err(|err| Error::from_string(err.to_string()))?; info!(" init store success!"); + let buckets_list = store + .list_bucket(&BucketOptions { + no_metadata: true, + ..Default::default() + }) + .await + .map_err(|err| Error::from_string(err.to_string()))?; + + let buckets = buckets_list.iter().map(|v| v.name.clone()).collect(); + + init_bucket_metadata_sys(store.clone(), buckets).await; + tokio::select! { _ = tokio::signal::ctrl_c() => { diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 3df2b6d9..2cbb3dcb 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,15 +1,19 @@ use bytes::BufMut; use bytes::Bytes; +use ecstore::bucket::get_bucket_metadata_sys; +use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG; +use ecstore::bucket::tags::Tags; use ecstore::bucket_meta::BucketMetadata; use ecstore::disk::error::DiskError; use ecstore::disk::RUSTFS_META_BUCKET; -use ecstore::store::new_object_layer_fn; +use ecstore::new_object_layer_fn; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; use ecstore::store_api::DeleteBucketOptions; use ecstore::store_api::HTTPRangeSpec; use ecstore::store_api::MakeBucketOptions; use ecstore::store_api::MultipartUploadResult; +use ecstore::store_api::ObjectIO; use ecstore::store_api::ObjectOptions; use ecstore::store_api::ObjectToDelete; use ecstore::store_api::PutObjReader; @@ -17,6 +21,7 @@ use ecstore::store_api::StorageAPI; use futures::pin_mut; use futures::{Stream, StreamExt}; use http::HeaderMap; +use log::warn; use s3s::dto::*; use s3s::s3_error; use s3s::Body; @@ -25,6 +30,7 @@ use s3s::S3ErrorCode; use s3s::S3Result; use s3s::S3; use s3s::{S3Request, S3Response}; +use std::collections::HashMap; use std::fmt::Debug; use std::str::FromStr; use transform_stream::AsyncTryStream; @@ -248,7 +254,7 @@ impl S3 for FS { None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), }; - if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions {}).await { + if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions::default()).await { if DiskError::VolumeNotFound.is(&e) { return Err(s3_error!(NoSuchBucket)); } else { @@ -322,7 +328,7 @@ impl S3 for FS { None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), }; - if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions {}).await { + if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions::default()).await { if DiskError::VolumeNotFound.is(&e) { return Err(s3_error!(NoSuchBucket)); } else { @@ -373,7 +379,7 @@ impl S3 for FS { None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), }; - let bucket_infos = try_!(store.list_bucket(&BucketOptions {}).await); + let bucket_infos = try_!(store.list_bucket(&BucketOptions::default()).await); let buckets: Vec = bucket_infos .iter() @@ -701,53 +707,19 @@ impl S3 for FS { })) .await?; - let layer = new_object_layer_fn(); - let lock = layer.read().await; - let store = lock - .as_ref() - .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; + let bucket_meta_sys_lock = get_bucket_metadata_sys().await; + let mut bucket_meta_sys = bucket_meta_sys_lock.write().await; - let meta_obj = try_!( - store - .get_object_reader( - RUSTFS_META_BUCKET, - BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), - HTTPRangeSpec::nil(), - Default::default(), - &ObjectOptions::default(), - ) - .await - ); - - let stream = meta_obj.stream; - - let mut data = vec![]; - pin_mut!(stream); - - while let Some(x) = stream.next().await { - let x = try_!(x); - data.put_slice(&x[..]); + let mut tag_map = HashMap::new(); + for tag in tagging.tag_set.iter() { + tag_map.insert(tag.key.clone(), tag.value.clone()); } - let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..])); - if tagging.tag_set.is_empty() { - meta.tagging = None; - } else { - meta.tagging = Some(tagging.tag_set.into_iter().map(|x| (x.key, x.value)).collect()) - } + let tags = Tags::new(tag_map, false); - let data = try_!(meta.marshal_msg()); - let len = data.len(); - try_!( - store - .put_object( - RUSTFS_META_BUCKET, - BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), - PutObjReader::new(StreamingBlob::from(Body::from(data)), len), - &ObjectOptions::default(), - ) - .await - ); + let data = try_!(tags.marshal_msg()); + + let _updated = try_!(bucket_meta_sys.update(&bucket, BUCKET_TAGGING_CONFIG, data).await); Ok(S3Response::new(Default::default())) } @@ -763,51 +735,23 @@ impl S3 for FS { })) .await?; - let layer = new_object_layer_fn(); - let lock = layer.read().await; - let store = lock - .as_ref() - .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; - - let meta_obj = try_!( - store - .get_object_reader( - RUSTFS_META_BUCKET, - BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), - HTTPRangeSpec::nil(), - Default::default(), - &ObjectOptions::default(), - ) - .await - ); - - let stream = meta_obj.stream; - - let mut data = vec![]; - pin_mut!(stream); - - while let Some(x) = stream.next().await { - let x = try_!(x); - data.put_slice(&x[..]); - } - - let meta = try_!(BucketMetadata::unmarshal_from(&data[..])); - if meta.tagging.is_none() { - return Err({ - let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist"); - err.set_status_code("404".try_into().unwrap()); - err - }); - } - - Ok(S3Response::new(GetBucketTaggingOutput { - tag_set: meta - .tagging - .unwrap() + let bucket_meta_sys_lock = get_bucket_metadata_sys().await; + let bucket_meta_sys = bucket_meta_sys_lock.read().await; + let tag_set: Vec = match bucket_meta_sys.get_tagging_config(&bucket).await { + Ok((tags, _)) => tags + .tag_set + .tag_map .into_iter() .map(|(key, value)| Tag { key, value }) .collect(), - })) + Err(err) => { + warn!("get_tagging_config err {:?}", &err); + // TODO: check not found + Vec::new() + } + }; + + Ok(S3Response::new(GetBucketTaggingOutput { tag_set })) } #[tracing::instrument(level = "debug", skip(self))] @@ -824,48 +768,16 @@ impl S3 for FS { })) .await?; - let layer = new_object_layer_fn(); - let lock = layer.read().await; - let store = lock - .as_ref() - .ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?; + let bucket_meta_sys_lock = get_bucket_metadata_sys().await; + let mut bucket_meta_sys = bucket_meta_sys_lock.write().await; - let meta_obj = try_!( - store - .get_object_reader( - RUSTFS_META_BUCKET, - BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), - HTTPRangeSpec::nil(), - Default::default(), - &ObjectOptions::default(), - ) - .await - ); + let tag_map = HashMap::new(); - let stream = meta_obj.stream; + let tags = Tags::new(tag_map, false); - let mut data = vec![]; - pin_mut!(stream); + let data = try_!(tags.marshal_msg()); - while let Some(x) = stream.next().await { - let x = try_!(x); - data.put_slice(&x[..]); - } - - let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..])); - meta.tagging = None; - let data = try_!(meta.marshal_msg()); - let len = data.len(); - try_!( - store - .put_object( - RUSTFS_META_BUCKET, - BucketMetadata::new(bucket.as_str()).save_file_path().as_str(), - PutObjReader::new(StreamingBlob::from(Body::from(data)), len), - &ObjectOptions::default(), - ) - .await - ); + let _updated = try_!(bucket_meta_sys.update(&bucket, BUCKET_TAGGING_CONFIG, data).await); Ok(S3Response::new(DeleteBucketTaggingOutput {})) }