Squashed commit of the following:

commit ff72105e1ec81b5679401b021fb44e8135444de8
Author: weisd <weishidavip@163.com>
Date:   Sat Oct 5 09:55:09 2024 +0800

    base bucketmetadata_sys done

commit 69d0c7725ae1065d25755c71aa54e42c4f3cdb50
Author: weisd <weishidavip@163.com>
Date:   Sat Oct 5 01:41:19 2024 +0800

    update bucket tagging op use bucketmetadata_sys

commit 39902c73d6f7a817041245af84a23d14115c70dc
Author: weisd <weishidavip@163.com>
Date:   Fri Oct 4 23:32:07 2024 +0800

    test bucketmetadata_sys

commit 15f1dfc765dc383ab26d71aca7c64dd0917b83f3
Author: weisd <weishidavip@163.com>
Date:   Fri Oct 4 23:21:58 2024 +0800

    init bucketmetadata_sys

commit 8c3c5e0f7385fa881b1dc4811d92afe8b86e9a6f
Author: weisd <weishidavip@163.com>
Date:   Tue Oct 1 23:06:09 2024 +0800

    init bucketmetadata_sys

commit e2746d154aa68be9dcf8add0241b38c72ef89b9c
Author: weisd <weishidavip@163.com>
Date:   Tue Oct 1 04:35:25 2024 +0800

    stash
This commit is contained in:
weisd
2024-10-05 10:04:59 +08:00
parent 3a608e5554
commit 10c63effd4
45 changed files with 974 additions and 496 deletions

14
TODO.md
View File

@@ -2,18 +2,18 @@
## 基础存储
- [ ] EC可用读写数量判断 Read/WriteQuorum
- [ ] 优化并发执行,边读边取,可中断
- [x] EC可用读写数量判断 Read/WriteQuorum
- [ ] 优化后台并发执行,可中断
- [ ] 小文件存储到metafile, inlinedata
- [ ] 完善bucketmeta
- [ ] 对象锁
- [ ] 代码优化 使用范型?
- [ ] 抽象出metafile存储
- [ ] 边读写边hash
- [x] 对象锁
- [ ] 边读写边hash实现reader嵌套
- [x] 远程rpc
- [x] 错误类型判断,程序中判断错误类型,如何统一错误
- [x] 优化xlmeta, 自定义msg数据结构
- [x] appendFile, createFile, readFile, walk_dir sync io
- [ ] 优化io.reader 参考 GetObjectNInfo 方便io copy 如果 异步写,再平衡
- [ ] 代码优化 使用范型?
- [ ] 抽象出metafile存储
## 基础功能

View File

@@ -21,20 +21,20 @@ impl std::str::FromStr for Algorithm {
}
// 定义EncryptionAction结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct EncryptionAction {
algorithm: Option<Algorithm>,
master_key_id: Option<String>,
}
// 定义Rule结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Rule {
default_encryption_action: EncryptionAction,
}
// 定义BucketSSEConfig结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct BucketSSEConfig {
xml_ns: String,
xml_name: String,

View File

@@ -0,0 +1,5 @@
#[derive(Debug, thiserror::Error)]
pub enum BucketMetadataError {
#[error("tagging not found")]
TaggingNotFound,
}

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use name::Name;
// 定义common结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
struct Common {
pub id: String,
pub filter: S3Key,
@@ -13,57 +13,57 @@ struct Common {
}
// 定义Queue结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
struct Queue {
pub common: Common,
pub arn: ARN,
}
// 定义ARN结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ARN {
pub target_id: TargetID,
pub region: String,
}
// 定义TargetID结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct TargetID {
pub id: String,
pub name: String,
}
// 定义FilterRule结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct FilterRule {
pub name: String,
pub value: String,
}
// 定义FilterRuleList结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct FilterRuleList {
pub rules: Vec<FilterRule>,
}
// 定义S3Key结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct S3Key {
pub rule_list: FilterRuleList,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Lambda {
arn: String,
}
// 定义Topic结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Topic {
arn: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Config {
queue_list: Vec<Queue>,
lambda_list: Vec<Lambda>,

View File

@@ -1,6 +1,6 @@
use super::{prefix::Prefix, tag::Tag};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct And {
pub object_size_greater_than: i64,
pub object_size_less_than: i64,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct DelMarkerExpiration {
pub days: usize,
}

View File

@@ -3,21 +3,21 @@ use time::OffsetDateTime;
// ExpirationDays is a type alias to unmarshal Days in Expiration
pub type ExpirationDays = usize;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExpirationDate(Option<OffsetDateTime>);
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExpireDeleteMarker {
pub marker: Boolean,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Boolean {
pub val: bool,
pub set: bool,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Expiration {
pub days: Option<ExpirationDays>,
pub date: Option<ExpirationDate>,

View File

@@ -3,7 +3,7 @@ use std::collections::HashMap;
use super::{and::And, prefix::Prefix, tag::Tag};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Filter {
pub set: bool,

View File

@@ -2,7 +2,7 @@ use super::rule::Rule;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Lifecycle {
pub rules: Vec<Rule>,
pub expiry_updated_at: Option<OffsetDateTime>,

View File

@@ -1,14 +1,14 @@
use super::{expiration::ExpirationDays, transition::TransitionDays};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct NoncurrentVersionExpiration {
pub noncurrent_days: ExpirationDays,
pub newer_noncurrent_versions: usize,
set: bool,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct NoncurrentVersionTransition {
pub noncurrent_days: TransitionDays,
pub storage_class: String,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Prefix {
pub val: String,
pub set: bool,

View File

@@ -8,14 +8,14 @@ use super::{
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub enum Status {
#[default]
Enabled,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Rule {
pub id: String,
pub status: Status,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Tag {
pub key: String,
pub value: String,

View File

@@ -3,10 +3,10 @@ use time::OffsetDateTime;
pub type TransitionDays = usize;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct TransitionDate(Option<OffsetDateTime>);
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Transition {
pub days: Option<TransitionDays>,
pub date: Option<TransitionDate>,

View File

@@ -1,32 +1,47 @@
use byteorder::{BigEndian, ByteOrder};
use super::{
encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy,
quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning,
};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use rmp_serde::Serializer as rmpSerializer;
use serde::Serializer;
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
use std::str::FromStr;
use time::macros::datetime;
use time::OffsetDateTime;
use tracing::{error, warn};
use super::{
encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy,
quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning,
};
use crate::error::Result;
use crate::bucket::tags;
use crate::config;
use crate::config::common::{read_config, save_config};
use crate::error::{Error, Result};
use crate::disk::BUCKET_META_PREFIX;
use crate::utils::crypto::hex;
use crate::store::ECStore;
type TypeConfigFile = &'static str;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;
pub const BUCKET_METADATA_VERSION: u16 = 1;
#[derive(Debug, Deserialize, Serialize)]
pub const BUCKET_POLICY_CONFIG: &str = "policy.json";
pub const BUCKET_NOTIFICATION_CONFIG: &str = "notification.xml";
pub const BUCKET_LIFECYCLE_CONFIG: &str = "lifecycle.xml";
pub const BUCKET_SSECONFIG: &str = "bucket-encryption.xml";
pub const BUCKET_TAGGING_CONFIG: &str = "tagging.xml";
pub const BUCKET_QUOTA_CONFIG_FILE: &str = "quota.json";
pub const OBJECT_LOCK_CONFIG: &str = "object-lock.xml";
pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml";
pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml";
pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json";
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "PascalCase", default)]
pub struct BucketMetadata {
pub name: String,
#[serde(serialize_with = "write_times")]
pub created: OffsetDateTime,
pub lock_enabled: bool, // 虽然标记为不使用,但可能需要保留
pub policy_config_json: Vec<u8>,
@@ -41,27 +56,16 @@ pub struct BucketMetadata {
pub bucket_targets_config_json: Vec<u8>,
pub bucket_targets_config_meta_json: Vec<u8>,
#[serde(serialize_with = "write_times")]
pub policy_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub object_lock_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub encryption_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub tagging_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub quota_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub replication_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub versioning_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub lifecycle_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub notification_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub bucket_targets_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub bucket_targets_config_meta_updated_at: OffsetDateTime,
#[serde(skip)]
@@ -147,9 +151,9 @@ impl BucketMetadata {
format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE)
}
fn msg_size(&self) -> usize {
unimplemented!()
}
// fn msg_size(&self) -> usize {
// unimplemented!()
// }
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
@@ -159,12 +163,162 @@ impl BucketMetadata {
Ok(buf)
}
pub fn unmarshal(_buf: &[u8]) -> Result<Self> {
unimplemented!()
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketMetadata = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn check_header(buf: &[u8]) -> Result<()> {
if buf.len() <= 4 {
return Err(Error::msg("read_bucket_metadata: data invalid"));
}
let format = LittleEndian::read_u16(&buf[0..2]);
let version = LittleEndian::read_u16(&buf[2..4]);
match format {
BUCKET_METADATA_FORMAT => {}
_ => return Err(Error::msg("read_bucket_metadata: format invalid")),
}
match version {
BUCKET_METADATA_VERSION => {}
_ => return Err(Error::msg("read_bucket_metadata: version invalid")),
}
Ok(())
}
fn default_timestamps(&mut self) {
if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.tagging_config_updated_at = self.created
}
}
pub fn update_config(&mut self, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
let updated = OffsetDateTime::now_utc();
match config_file {
BUCKET_POLICY_CONFIG => {
self.policy_config_json = data;
self.policy_config_updated_at = updated;
}
BUCKET_NOTIFICATION_CONFIG => {
self.notification_config_xml = data;
self.notification_config_updated_at = updated;
}
BUCKET_LIFECYCLE_CONFIG => {
self.lifecycle_config_xml = data;
self.lifecycle_config_updated_at = updated;
}
BUCKET_SSECONFIG => {
self.encryption_config_xml = data;
self.encryption_config_updated_at = updated;
}
BUCKET_TAGGING_CONFIG => {
self.tagging_config_xml = data;
self.tagging_config_updated_at = updated;
}
BUCKET_QUOTA_CONFIG_FILE => {
self.quota_config_json = data;
self.quota_config_updated_at = updated;
}
OBJECT_LOCK_CONFIG => {
self.object_lock_config_xml = data;
self.object_lock_config_updated_at = updated;
}
BUCKET_VERSIONING_CONFIG => {
self.versioning_config_xml = data;
self.versioning_config_updated_at = updated;
}
BUCKET_REPLICATION_CONFIG => {
self.replication_config_xml = data;
self.replication_config_updated_at = updated;
}
BUCKET_TARGETS_FILE => {
self.tagging_config_xml = data;
self.tagging_config_updated_at = updated;
}
_ => return Err(Error::msg(format!("config file not found : {}", config_file))),
}
Ok(updated)
}
pub async fn save(&mut self, api: &ECStore) -> Result<()> {
self.parse_all_configs(api)?;
let mut buf: Vec<u8> = vec![0; 4];
LittleEndian::write_u16(&mut buf[0..2], BUCKET_METADATA_FORMAT);
LittleEndian::write_u16(&mut buf[2..4], BUCKET_METADATA_VERSION);
let data = self.marshal_msg()?;
buf.extend_from_slice(&data);
save_config(api, self.save_file_path().as_str(), &buf).await?;
Ok(())
}
fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> {
if !self.tagging_config_xml.is_empty() {
self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?);
}
Ok(())
}
}
fn deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result<S, D::Error>
pub async fn load_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketMetadata> {
load_bucket_metadata_parse(api, bucket, true).await
}
pub async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result<BucketMetadata> {
let mut bm = match read_bucket_metadata(api, bucket).await {
Ok(res) => res,
Err(err) => {
warn!("load_bucket_metadata_parse err {:?}", &err);
if !config::error::is_not_found(&err) {
return Err(err);
}
BucketMetadata::new(bucket)
}
};
bm.default_timestamps();
if parse {
bm.parse_all_configs(api)?;
}
// TODO: parse_all_configs
Ok(bm)
}
async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketMetadata> {
if bucket.is_empty() {
error!("bucket name empty");
return Err(Error::msg("invalid argument"));
}
let bm = BucketMetadata::new(&bucket);
let file_path = bm.save_file_path();
let data = read_config(api, &file_path).await?;
BucketMetadata::check_header(&data)?;
let bm = BucketMetadata::unmarshal(&data[4..])?;
Ok(bm)
}
fn _deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result<S, D::Error>
where
S: FromStr,
S::Err: Display,
@@ -175,7 +329,7 @@ where
unimplemented!()
}
fn write_times<S>(t: &OffsetDateTime, s: S) -> Result<S::Ok, S::Error>
fn _write_time<S>(t: &OffsetDateTime, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
@@ -183,47 +337,16 @@ where
let sec = t.unix_timestamp() - 62135596800;
let nsec = t.nanosecond();
buf[0] = 0xc7;
buf[1] = 0x0c;
buf[2] = 0x05;
buf[0] = 0xc7; // mext8
buf[1] = 0x0c; // 长度
buf[2] = 0x05; // 时间扩展类型
BigEndian::write_u64(&mut buf[3..], sec as u64);
BigEndian::write_u32(&mut buf[11..], nsec as u32);
s.serialize_bytes(&buf)
}
fn write_time(t: OffsetDateTime) -> Result<(), String> {
// let t = t.saturating_sub(0); // 转换为自 UNIX_EPOCH 以来的时间
println!("t:{:?}", t);
println!("offset:{:?}", datetime!(0-01-01 0:00 UTC));
let mut buf = vec![0x0; 15];
let sec = t.unix_timestamp() - 62135596800;
let nsec = t.nanosecond();
println!("sec:{:?}", sec);
println!("nsec:{:?}", nsec);
buf[0] = 0xc7;
buf[1] = 0x0c;
buf[2] = 0x05;
// 扩展格式的时间类型
// buf.push(0xc7); // mext8
// buf.push(12); // 长度
// buf.push(0x05); // 时间扩展类型
// 将 Unix 时间戳和纳秒部分写入缓冲区
BigEndian::write_u64(&mut buf[3..], sec as u64);
BigEndian::write_u32(&mut buf[11..], nsec as u32);
println!("hex:{:?}", hex(buf));
Ok(())
}
#[cfg(test)]
mod test {
use crate::utils::crypto::hex;
use super::*;
@@ -235,6 +358,8 @@ mod test {
let buf = bm.marshal_msg().unwrap();
println!("{:?}", hex(buf))
let new = BucketMetadata::unmarshal(&buf).unwrap();
assert_eq!(bm.name, new.name);
}
}

View File

@@ -0,0 +1,258 @@
use std::collections::HashSet;
use std::{collections::HashMap, sync::Arc};
use crate::bucket::error::BucketMetadataError;
use crate::bucket::metadata::load_bucket_metadata_parse;
use crate::bucket::utils::is_meta_bucketname;
use crate::config;
use crate::config::error::ConfigError;
use crate::disk::error::DiskError;
use crate::error::{Error, Result};
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints};
use crate::store::ECStore;
use futures::future::join_all;
use lazy_static::lazy_static;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use super::metadata::{load_bucket_metadata, BucketMetadata};
use super::tags;
lazy_static! {
static ref GLOBAL_BucketMetadataSys: Arc<RwLock<BucketMetadataSys>> = Arc::new(RwLock::new(BucketMetadataSys::new()));
}
pub async fn init_bucket_metadata_sys(api: ECStore, buckets: Vec<String>) {
let mut sys = GLOBAL_BucketMetadataSys.write().await;
sys.init(api, buckets).await
}
pub async fn get_bucket_metadata_sys() -> Arc<RwLock<BucketMetadataSys>> {
GLOBAL_BucketMetadataSys.clone()
}
pub async fn bucket_metadata_sys_set(bucket: String, bm: BucketMetadata) {
let sys = GLOBAL_BucketMetadataSys.write().await;
sys.set(bucket, bm).await
}
#[derive(Debug, Default)]
pub struct BucketMetadataSys {
metadata_map: RwLock<HashMap<String, BucketMetadata>>,
api: Option<ECStore>,
initialized: RwLock<bool>,
}
impl BucketMetadataSys {
fn new() -> Self {
Self::default()
}
pub async fn init(&mut self, api: ECStore, buckets: Vec<String>) {
// if api.is_none() {
// return Err(Error::msg("errServerNotInitialized"));
// }
self.api = Some(api);
let _ = self.init_internal(buckets).await;
}
async fn init_internal(&self, buckets: Vec<String>) -> Result<()> {
let count = {
let endpoints = GLOBAL_Endpoints.read().await;
endpoints.es_count() * 10
};
let mut failed_buckets: HashSet<String> = HashSet::new();
let mut buckets = buckets.as_slice();
loop {
if buckets.len() < count {
self.concurrent_load(buckets, &mut failed_buckets).await;
break;
}
self.concurrent_load(&buckets[..count], &mut failed_buckets).await;
buckets = &buckets[count..]
}
let mut initialized = self.initialized.write().await;
*initialized = true;
if is_dist_erasure().await {
// TODO: refresh_buckets_metadata_loop
}
Ok(())
}
async fn concurrent_load(&self, buckets: &[String], failed_buckets: &mut HashSet<String>) {
let mut futures = Vec::new();
for bucket in buckets.iter() {
futures.push(load_bucket_metadata(self.api.as_ref().unwrap(), bucket.as_str()));
}
let results = join_all(futures).await;
let mut idx = 0;
let mut mp = self.metadata_map.write().await;
for res in results {
match res {
Ok(res) => {
if let Some(bucket) = buckets.get(idx) {
mp.insert(bucket.clone(), res);
}
}
Err(e) => {
error!("Unable to load bucket metadata, will be retried: {:?}", e);
if let Some(bucket) = buckets.get(idx) {
failed_buckets.insert(bucket.clone());
}
}
}
idx += 1;
}
}
async fn refresh_buckets_metadata_loop(&self, failed_buckets: &HashSet<String>) -> Result<()> {
unimplemented!()
}
pub async fn get(&self, bucket: &str) -> Result<BucketMetadata> {
if is_meta_bucketname(bucket) {
return Err(Error::new(ConfigError::NotFound));
}
let map = self.metadata_map.read().await;
if let Some(bm) = map.get(bucket) {
Ok(bm.clone())
} else {
Err(Error::new(ConfigError::NotFound))
}
}
pub async fn set(&self, bucket: String, bm: BucketMetadata) {
if !is_meta_bucketname(&bucket) {
let mut map = self.metadata_map.write().await;
map.insert(bucket, bm);
}
}
async fn reset(&mut self) {
let mut map = self.metadata_map.write().await;
map.clear();
}
pub async fn update(&mut self, bucket: &str, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
self.update_and_parse(bucket, config_file, data, true).await
}
async fn update_and_parse(&mut self, bucket: &str, config_file: &str, data: Vec<u8>, parse: bool) -> Result<OffsetDateTime> {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::msg("errServerNotInitialized")),
};
if is_meta_bucketname(&bucket) {
return Err(Error::msg("errInvalidArgument"));
}
let mut bm = match load_bucket_metadata_parse(store, &bucket, parse).await {
Ok(res) => res,
Err(err) => {
if !is_erasure().await && !is_dist_erasure().await && DiskError::VolumeNotFound.is(&err) {
BucketMetadata::new(&bucket)
} else {
return Err(err);
}
}
};
let updated = bm.update_config(config_file, data)?;
self.save(&mut bm).await?;
Ok(updated)
}
async fn save(&self, bm: &mut BucketMetadata) -> Result<()> {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::msg("errServerNotInitialized")),
};
if is_meta_bucketname(&bm.name) {
return Err(Error::msg("errInvalidArgument"));
}
bm.save(store).await?;
self.set(bm.name.clone(), bm.clone()).await;
Ok(())
}
pub async fn get_config(&self, bucket: &str) -> Result<(BucketMetadata, bool)> {
if let Some(api) = self.api.as_ref() {
let has_bm = {
let map = self.metadata_map.read().await;
if let Some(bm) = map.get(&bucket.to_string()) {
Some(bm.clone())
} else {
None
}
};
if let Some(bm) = has_bm {
return Ok((bm, false));
} else {
let bm = match load_bucket_metadata(&api, bucket).await {
Ok(res) => res,
Err(err) => {
if *self.initialized.read().await {
return Err(Error::msg("errBucketMetadataNotInitialized"));
} else {
return Err(err);
}
}
};
let mut map = self.metadata_map.write().await;
map.insert(bucket.to_string(), bm.clone());
Ok((bm, true))
}
} else {
Err(Error::msg("errBucketMetadataNotInitialized"))
}
}
pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_tagging_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::TaggingNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.tagging_config {
Ok((config, bm.tagging_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::TaggingNotFound))
}
}
}

View File

@@ -1,13 +1,16 @@
mod encryption;
mod error;
mod event;
mod lifecycle;
mod metadata;
pub mod metadata;
mod metadata_sys;
mod objectlock;
mod policy;
mod quota;
mod replication;
mod tags;
pub mod tags;
mod target;
pub mod utils;
mod versioning;
pub use metadata::BucketMetadata;
pub use metadata_sys::{bucket_metadata_sys_set, get_bucket_metadata_sys, init_bucket_metadata_sys};

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)]
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
pub enum RetMode {
#[default]
Govenance,
@@ -20,19 +20,19 @@ impl std::str::FromStr for RetMode {
}
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DefaultRetention {
pub mode: RetMode,
pub days: Option<usize>,
pub years: Option<usize>,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
pub default_retention: DefaultRetention,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
pub object_lock_enabled: String,
pub rule: Option<Rule>,

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ActionSet(HashSet<Action>);
impl ActionSet {}

View File

@@ -9,7 +9,7 @@ use super::{
resource::ResourceSet,
};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketPolicyArgs {
account_name: String,
groups: Vec<String>,
@@ -20,7 +20,7 @@ pub struct BucketPolicyArgs {
object_name: String,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BPStatement {
sid: String,
effect: Effect,
@@ -32,7 +32,7 @@ pub struct BPStatement {
conditions: Option<Functions>,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketPolicy {
pub id: String,
pub version: String,

View File

@@ -41,14 +41,14 @@ pub trait FunctionApi {
// }
// }
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
enum Function {
#[default]
Test,
}
// 定义Functions类型
#[derive(Deserialize, Serialize, Default)]
#[derive(Deserialize, Serialize, Default, Clone)]
pub struct Functions(Vec<Function>);
impl Debug for Functions {

View File

@@ -15,11 +15,11 @@ const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::";
const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::";
// 定义Resource结构体
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)]
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
pub struct Resource {
pattern: String,
r#type: ResourceARNType,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ResourceSet(HashSet<Resource>);

View File

@@ -7,7 +7,7 @@ pub enum QuotaType {
}
// 定义BucketQuota结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketQuota {
quota: Option<u64>, // 使用Option来表示可能不存在的字段

View File

@@ -2,7 +2,7 @@ use super::tag::Tag;
use serde::{Deserialize, Serialize};
// 定义And结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct And {
prefix: Option<String>,
tags: Option<Vec<Tag>>,

View File

@@ -3,7 +3,7 @@ use super::tag::Tag;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Filter {
prefix: String,
and: And,

View File

@@ -6,7 +6,7 @@ mod tag;
use rule::Rule;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Config {
rules: Vec<Rule>,
role_arn: String,

View File

@@ -1,29 +1,29 @@
use super::filter::Filter;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub enum Status {
#[default]
Enabled,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct DeleteMarkerReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct DeleteReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExistingObjectReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Destination {
pub bucket: String,
pub storage_class: String,
@@ -31,18 +31,18 @@ pub struct Destination {
}
// 定义ReplicaModifications结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ReplicaModifications {
status: Status,
}
// 定义SourceSelectionCriteria结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct SourceSelectionCriteria {
replica_modifications: ReplicaModifications,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Rule {
pub id: String,
pub status: Status,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Tag {
pub key: Option<String>,
pub value: Option<String>,

View File

@@ -1,19 +1,37 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
// 定义tagSet结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TagSet {
#[serde(rename = "Tag")]
tag_map: HashMap<String, String>,
is_object: bool,
pub tag_map: HashMap<String, String>,
pub is_object: bool,
}
// 定义tagging结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Tags {
#[serde(rename = "Tagging")]
xml_name: String,
#[serde(rename = "TagSet")]
tag_set: Option<TagSet>,
pub tag_set: TagSet,
}
impl Tags {
pub fn new(tag_map: HashMap<String, String>, is_object: bool) -> Self {
Self {
tag_set: TagSet { tag_map, is_object },
}
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Tags = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Credentials {
access_key: String,
secret_key: String,
@@ -10,13 +10,13 @@ pub struct Credentials {
expiration: Option<OffsetDateTime>,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub enum ServiceType {
#[default]
Replication,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct LatencyStat {
curr: Duration, // 当前延迟
avg: Duration, // 平均延迟
@@ -24,7 +24,7 @@ pub struct LatencyStat {
}
// 定义BucketTarget结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct BucketTarget {
source_bucket: String,
@@ -73,7 +73,7 @@ pub struct BucketTarget {
edge: bool,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct BucketTargets {
pub targets: Vec<BucketTarget>,
}

View File

@@ -0,0 +1,5 @@
use crate::disk::RUSTFS_META_BUCKET;
pub fn is_meta_bucketname(name: &str) -> bool {
name.starts_with(RUSTFS_META_BUCKET)
}

View File

@@ -25,12 +25,12 @@ impl std::fmt::Display for State {
}
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExcludedPrefix {
pub prefix: String,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Versioning {
pub status: State,
pub excluded_prefixes: Vec<ExcludedPrefix>,

View File

@@ -14,12 +14,10 @@ pub const BUCKET_METADATA_VERSION: u16 = 1;
#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
pub struct BucketMetadata {
format: u16,
version: u16,
pub format: u16,
pub version: u16,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub tagging: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub created: Option<OffsetDateTime>,
}
@@ -60,7 +58,8 @@ impl BucketMetadata {
Ok(buf)
}
pub fn unmarshal_from(buffer: &[u8]) -> Result<Self> {
Ok(rmp_serde::from_slice(buffer)?)
pub fn unmarshal_from(buf: &[u8]) -> Result<Self> {
let t: BucketMetadata = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -0,0 +1,55 @@
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use crate::store::ECStore;
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader};
use http::HeaderMap;
use s3s::dto::StreamingBlob;
use s3s::Body;
use tracing::warn;
use super::error::ConfigError;
pub async fn read_config(api: &ECStore, file: &str) -> Result<Vec<u8>> {
let (data, _obj) = read_config_with_metadata(api, file, &ObjectOptions::default()).await?;
Ok(data)
}
async fn read_config_with_metadata(api: &ECStore, file: &str, opts: &ObjectOptions) -> Result<(Vec<u8>, ObjectInfo)> {
let range = HTTPRangeSpec::nil();
let h = HeaderMap::new();
let mut rd = api.get_object_reader(RUSTFS_META_BUCKET, file, range, h, &opts).await?;
let data = rd.read_all().await?;
if data.is_empty() {
return Err(Error::new(ConfigError::NotFound));
}
Ok((data, rd.object_info))
}
pub async fn save_config(api: &ECStore, file: &str, data: &[u8]) -> Result<()> {
save_config_with_opts(
api,
file,
data,
&ObjectOptions {
max_parity: true,
..Default::default()
},
)
.await
}
async fn save_config_with_opts(api: &ECStore, file: &str, data: &[u8], opts: &ObjectOptions) -> Result<()> {
let _ = api
.put_object(
RUSTFS_META_BUCKET,
file,
PutObjReader::new(StreamingBlob::from(Body::from(data.to_vec())), data.len()),
opts,
)
.await?;
Ok(())
}

View File

@@ -0,0 +1,25 @@
use crate::error::Error;
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("config not found")]
NotFound,
}
impl ConfigError {
/// Returns `true` if the config error is [`NotFound`].
///
/// [`NotFound`]: ConfigError::NotFound
#[must_use]
pub fn is_not_found(&self) -> bool {
matches!(self, Self::NotFound)
}
}
pub fn is_not_found(err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<ConfigError>() {
ConfigError::is_not_found(&e)
} else {
false
}
}

View File

@@ -0,0 +1,2 @@
pub mod common;
pub mod error;

View File

@@ -10,7 +10,7 @@ use std::{
};
/// enum for setup type.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum SetupType {
/// starts with unknown setup type.
Unknown,
@@ -111,6 +111,7 @@ impl Endpoints {
}
}
#[derive(Debug)]
/// a temporary type to holds the list of endpoints
struct PoolEndpointList {
inner: Vec<Endpoints>,
@@ -387,7 +388,7 @@ pub struct PoolEndpoints {
/// list of list of endpoints
#[derive(Debug, Clone)]
pub struct EndpointServerPools(Vec<PoolEndpoints>);
pub struct EndpointServerPools(pub Vec<PoolEndpoints>);
impl From<Vec<PoolEndpoints>> for EndpointServerPools {
fn from(v: Vec<PoolEndpoints>) -> Self {
@@ -408,6 +409,9 @@ impl AsMut<Vec<PoolEndpoints>> for EndpointServerPools {
}
impl EndpointServerPools {
pub fn reset(&mut self, eps: Vec<PoolEndpoints>) {
self.0 = eps;
}
pub fn from_volumes(server_addr: &str, endpoints: Vec<String>) -> Result<(EndpointServerPools, SetupType)> {
let layouts = DisksLayout::try_from(endpoints.as_slice())?;
@@ -439,6 +443,10 @@ impl EndpointServerPools {
Ok((ret, pool_eps.setup_type))
}
pub fn es_count(&self) -> usize {
self.0.iter().map(|v| v.set_count).count()
}
/// add pool endpoints
pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> {
let mut exits = HashSet::new();

61
ecstore/src/global.rs Normal file
View File

@@ -0,0 +1,61 @@
use lazy_static::lazy_static;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use crate::{
disk::DiskStore,
endpoints::{EndpointServerPools, PoolEndpoints, SetupType},
store::ECStore,
};
lazy_static! {
pub static ref GLOBAL_OBJECT_API: Arc<RwLock<Option<ECStore>>> = Arc::new(RwLock::new(None));
pub static ref GLOBAL_LOCAL_DISK: Arc<RwLock<Vec<Option<DiskStore>>>> = Arc::new(RwLock::new(Vec::new()));
static ref GLOBAL_IsErasure: RwLock<bool> = RwLock::new(false);
static ref GLOBAL_IsDistErasure: RwLock<bool> = RwLock::new(false);
static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
pub static ref GLOBAL_Endpoints: RwLock<EndpointServerPools> = RwLock::new(EndpointServerPools(Vec::new()));
}
pub async fn set_global_endpoints(eps: Vec<PoolEndpoints>) {
let mut endpoints = GLOBAL_Endpoints.write().await;
endpoints.reset(eps);
}
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
GLOBAL_OBJECT_API.clone()
}
pub async fn set_object_layer(o: ECStore) {
let mut global_object_api = GLOBAL_OBJECT_API.write().await;
*global_object_api = Some(o);
}
pub async fn is_dist_erasure() -> bool {
let lock = GLOBAL_IsDistErasure.read().await;
*lock == true
}
pub async fn is_erasure() -> bool {
let lock = GLOBAL_IsErasure.read().await;
*lock == true
}
pub async fn update_erasure_type(setup_type: SetupType) {
let mut is_erasure = GLOBAL_IsErasure.write().await;
*is_erasure = setup_type == SetupType::Erasure;
let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await;
*is_dist_erasure = setup_type == SetupType::DistErasure;
if *is_dist_erasure {
*is_erasure = true
}
let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await;
*is_erasure_sd = setup_type == SetupType::ErasureSD;
}
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;

View File

@@ -1,11 +1,13 @@
pub mod bucket_meta;
mod chunk_stream;
mod config;
pub mod disk;
pub mod disks_layout;
pub mod endpoints;
pub mod erasure;
pub mod error;
mod file_meta;
mod global;
pub mod peer;
mod quorum;
pub mod set_disk;
@@ -17,3 +19,7 @@ mod store_init;
mod utils;
pub mod bucket;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;
pub use global::update_erasure_type;

View File

@@ -26,7 +26,7 @@ pub trait PeerS3Client: Debug + Sync + Send + 'static {
fn get_pools(&self) -> Option<Vec<usize>>;
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct S3PeerSys {
pub clients: Vec<Client>,
pub pools_count: usize,

View File

@@ -15,12 +15,12 @@ use crate::{
},
endpoints::PoolEndpoints,
error::{Error, Result},
global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
set_disk::SetDisks,
store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
PutObjReader, StorageAPI,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete,
PartInfo, PutObjReader, StorageAPI,
},
utils::hash,
};
@@ -94,7 +94,7 @@ impl Sets {
continue;
}
if disk.as_ref().unwrap().is_local() && *GLOBAL_IsDistErasure.read().await {
if disk.as_ref().unwrap().is_local() && is_dist_erasure().await {
let local_disk = {
let local_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.read().await;
local_set_drives[pool_idx][i][j].clone()
@@ -124,7 +124,7 @@ impl Sets {
let set_disks = SetDisks {
lockers: lockers[i].clone(),
locker_owner: GLOBAL_Local_Node_Name.read().await.to_string(),
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(*GLOBAL_IsDistErasure.read().await))),
ns_mutex: Arc::new(RwLock::new(NsLockMap::new(is_dist_erasure().await))),
disks: RwLock::new(set_drive),
set_drive_count,
default_parity_count: partiy_count,
@@ -258,6 +258,25 @@ struct DelObj {
obj: ObjectToDelete,
}
#[async_trait::async_trait]
impl ObjectIO for Sets {
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
self.get_disks_by_key(object)
.get_object_reader(bucket, object, range, h, opts)
.await
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).put_object(bucket, object, data, opts).await
}
}
#[async_trait::async_trait]
impl StorageAPI for Sets {
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
@@ -383,22 +402,6 @@ impl StorageAPI for Sets {
.await
}
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
self.get_disks_by_key(object)
.get_object_reader(bucket, object, range, h, opts)
.await
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).put_object(bucket, object, data, opts).await
}
async fn put_object_part(
&self,
bucket: &str,

View File

@@ -1,9 +1,12 @@
#![allow(clippy::map_entry)]
use crate::bucket::bucket_metadata_sys_set;
use crate::disk::endpoint::EndpointType;
use crate::global::{is_dist_erasure, set_object_layer, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES};
use crate::store_api::ObjectIO;
use crate::{
bucket::BucketMetadata,
bucket::metadata::BucketMetadata,
disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
endpoints::{EndpointServerPools, SetupType},
endpoints::EndpointServerPools,
error::{Error, Result},
peer::S3PeerSys,
sets::Sets,
@@ -19,137 +22,19 @@ use backon::{ExponentialBuilder, Retryable};
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port};
use futures::future::join_all;
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::{fs, sync::RwLock};
use tracing::{debug, info};
use uuid::Uuid;
use lazy_static::lazy_static;
lazy_static! {
pub static ref GLOBAL_IsErasure: RwLock<bool> = RwLock::new(false);
pub static ref GLOBAL_IsDistErasure: RwLock<bool> = RwLock::new(false);
pub static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
}
pub async fn update_erasure_type(setup_type: SetupType) {
let mut is_erasure = GLOBAL_IsErasure.write().await;
*is_erasure = setup_type == SetupType::Erasure;
let mut is_dist_erasure = GLOBAL_IsDistErasure.write().await;
*is_dist_erasure = setup_type == SetupType::DistErasure;
if *is_dist_erasure {
*is_erasure = true
}
let mut is_erasure_sd = GLOBAL_IsErasureSD.write().await;
*is_erasure_sd = setup_type == SetupType::ErasureSD;
}
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;
lazy_static! {
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
}
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
let disk_path = match fs::canonicalize(disk_path).await {
Ok(disk_path) => disk_path,
Err(_) => return None,
};
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
let path = disk_path.to_string_lossy().to_string();
if disk_map.contains_key(&path) {
let a = disk_map[&path].as_ref().cloned();
return a;
}
None
}
pub async fn all_local_disk_path() -> Vec<String> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map.keys().cloned().collect()
}
pub async fn all_local_disk() -> Vec<DiskStore> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map
.values()
.filter(|v| v.is_some())
.map(|v| v.as_ref().unwrap().clone())
.collect()
}
// init_local_disks 初始化本地磁盘server启动前必须初始化成功
pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> {
let opt = &DiskOption {
cleanup: true,
health_check: true,
};
let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await;
for pool_eps in endpoint_pools.as_ref().iter() {
let mut set_count_drives = Vec::with_capacity(pool_eps.set_count);
for _ in 0..pool_eps.set_count {
set_count_drives.push(vec![None; pool_eps.drives_per_set]);
}
global_set_drives.push(set_count_drives);
}
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
for pool_eps in endpoint_pools.as_ref().iter() {
let mut set_drives = HashMap::new();
for ep in pool_eps.endpoints.as_ref().iter() {
if !ep.is_local {
continue;
}
let disk = new_disk(ep, opt).await?;
let path = disk.path().to_string_lossy().to_string();
global_local_disk_map.insert(path, Some(disk.clone()));
set_drives.insert(ep.disk_idx, Some(disk.clone()));
if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() {
global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone());
}
}
}
Ok(())
}
lazy_static! {
pub static ref GLOBAL_OBJECT_API: Arc<RwLock<Option<ECStore>>> = Arc::new(RwLock::new(None));
pub static ref GLOBAL_LOCAL_DISK: Arc<RwLock<Vec<Option<DiskStore>>>> = Arc::new(RwLock::new(Vec::new()));
}
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
GLOBAL_OBJECT_API.clone()
}
async fn set_object_layer(o: ECStore) {
let mut global_object_api = GLOBAL_OBJECT_API.write().await;
*global_object_api = Some(o);
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ECStore {
pub id: uuid::Uuid,
// pub disks: Vec<DiskStore>,
@@ -161,7 +46,7 @@ pub struct ECStore {
impl ECStore {
#[allow(clippy::new_ret_no_self)]
pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<()> {
pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<Self> {
// let layouts = DisksLayout::try_from(endpoints.as_slice())?;
let mut deployment_id = None;
@@ -243,7 +128,7 @@ impl ECStore {
}
// 替换本地磁盘
if !*GLOBAL_IsDistErasure.read().await {
if !is_dist_erasure().await {
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
for disk in local_disks {
let path = disk.path().to_string_lossy().to_string();
@@ -260,9 +145,9 @@ impl ECStore {
peer_sys,
};
set_object_layer(ec).await;
set_object_layer(ec.clone()).await;
Ok(())
Ok(ec)
}
pub fn init_local_disks() {}
@@ -402,6 +287,80 @@ impl ECStore {
}
}
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
let disk_path = match fs::canonicalize(disk_path).await {
Ok(disk_path) => disk_path,
Err(_) => return None,
};
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
let path = disk_path.to_string_lossy().to_string();
if disk_map.contains_key(&path) {
let a = disk_map[&path].as_ref().cloned();
return a;
}
None
}
pub async fn all_local_disk_path() -> Vec<String> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map.keys().cloned().collect()
}
pub async fn all_local_disk() -> Vec<DiskStore> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map
.values()
.filter(|v| v.is_some())
.map(|v| v.as_ref().unwrap().clone())
.collect()
}
// init_local_disks 初始化本地磁盘server启动前必须初始化成功
pub async fn init_local_disks(endpoint_pools: EndpointServerPools) -> Result<()> {
let opt = &DiskOption {
cleanup: true,
health_check: true,
};
let mut global_set_drives = GLOBAL_LOCAL_DISK_SET_DRIVES.write().await;
for pool_eps in endpoint_pools.as_ref().iter() {
let mut set_count_drives = Vec::with_capacity(pool_eps.set_count);
for _ in 0..pool_eps.set_count {
set_count_drives.push(vec![None; pool_eps.drives_per_set]);
}
global_set_drives.push(set_count_drives);
}
let mut global_local_disk_map = GLOBAL_LOCAL_DISK_MAP.write().await;
for pool_eps in endpoint_pools.as_ref().iter() {
let mut set_drives = HashMap::new();
for ep in pool_eps.endpoints.as_ref().iter() {
if !ep.is_local {
continue;
}
let disk = new_disk(ep, opt).await?;
let path = disk.path().to_string_lossy().to_string();
global_local_disk_map.insert(path, Some(disk.clone()));
set_drives.insert(ep.disk_idx, Some(disk.clone()));
if ep.pool_idx.is_some() && ep.set_idx.is_some() && ep.disk_idx.is_some() {
global_set_drives[ep.pool_idx.unwrap()][ep.set_idx.unwrap()][ep.disk_idx.unwrap()] = Some(disk.clone());
}
}
}
Ok(())
}
async fn internal_get_pool_info_existing_with_opts(
pools: &[Arc<Sets>],
bucket: &str,
@@ -494,6 +453,38 @@ pub struct ListPathOptions {
pub limit: i32,
}
#[async_trait::async_trait]
impl ObjectIO for ECStore {
#[tracing::instrument(level = "debug", skip(self))]
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await;
}
unimplemented!()
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), data, opts).await;
}
unimplemented!()
}
}
#[async_trait::async_trait]
impl StorageAPI for ECStore {
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
@@ -516,28 +507,10 @@ impl StorageAPI for ECStore {
// TODO: delete created bucket when error
self.peer_sys.make_bucket(bucket, opts).await?;
let meta = BucketMetadata::new(bucket);
let data = meta.marshal_msg()?;
let file_path = meta.save_file_path();
let mut meta = BucketMetadata::new(bucket);
meta.save(self).await?;
// TODO: wrap hash reader
let content_len = data.len();
let body = Body::from(data);
let reader = PutObjReader::new(StreamingBlob::from(body), content_len);
self.put_object(
RUSTFS_META_BUCKET,
&file_path,
reader,
&ObjectOptions {
max_parity: true,
..Default::default()
},
)
.await?;
bucket_metadata_sys_set(bucket.to_string(), meta).await;
// TODO: toObjectErr
@@ -740,34 +713,6 @@ impl StorageAPI for ECStore {
unimplemented!()
}
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await;
}
unimplemented!()
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), data, opts).await;
}
unimplemented!()
}
async fn put_object_part(
&self,
bucket: &str,

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use crate::error::{Error, Result};
use futures::StreamExt;
use http::HeaderMap;
use rmp_serde::Serializer;
use s3s::dto::StreamingBlob;
@@ -281,12 +282,26 @@ pub struct GetObjectReader {
pub object_info: ObjectInfo,
}
// impl GetObjectReader {
// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self {
// GetObjectReader { stream, object_info }
// }
// }
impl GetObjectReader {
// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self {
// GetObjectReader { stream, object_info }
// }
pub async fn read_all(&mut self) -> Result<Vec<u8>> {
let mut data = Vec::new();
while let Some(x) = self.stream.next().await {
let buf = match x {
Ok(res) => res,
Err(e) => return Err(Error::msg(e.to_string())),
};
data.extend_from_slice(buf.as_ref());
}
Ok(data)
}
}
#[derive(Debug)]
pub struct HTTPRangeSpec {
pub is_suffix_length: bool,
pub start: i64,
@@ -399,7 +414,11 @@ pub struct ObjectOptions {
// }
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketOptions {}
pub struct BucketOptions {
pub deleted: bool, // true only when site replication is enabled
pub cached: bool, // true only when we are requesting a cached response instead of hitting the disk for example ListBuckets() call.
pub no_metadata: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketInfo {
@@ -510,7 +529,20 @@ pub struct DeletedObject {
}
#[async_trait::async_trait]
pub trait StorageAPI {
pub trait ObjectIO: Send + Sync + 'static {
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader>;
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo>;
}
#[async_trait::async_trait]
pub trait StorageAPI: ObjectIO {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
@@ -536,15 +568,15 @@ pub trait StorageAPI {
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>;
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader>;
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo>;
// async fn get_object_reader(
// &self,
// bucket: &str,
// object: &str,
// range: HTTPRangeSpec,
// h: HeaderMap,
// opts: &ObjectOptions,
// ) -> Result<GetObjectReader>;
// async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn put_object_part(
&self,
bucket: &str,

View File

@@ -6,8 +6,12 @@ mod storage;
use clap::Parser;
use common::error::{Error, Result};
use ecstore::{
bucket::init_bucket_metadata_sys,
endpoints::EndpointServerPools,
store::{init_local_disks, update_erasure_type, ECStore},
set_global_endpoints,
store::{init_local_disks, ECStore},
store_api::{BucketOptions, StorageAPI},
update_erasure_type,
};
use grpc::make_server;
use hyper_util::{
@@ -89,7 +93,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// 用于rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())
.map_err(|err| Error::from_string(err.to_string()))?;
set_global_endpoints(endpoint_pools.as_ref().clone()).await;
update_erasure_type(setup_type).await;
// 初始化本地磁盘
@@ -179,11 +183,23 @@ async fn run(opt: config::Opt) -> Result<()> {
});
// init store
ECStore::new(opt.address.clone(), endpoint_pools.clone())
let store = ECStore::new(opt.address.clone(), endpoint_pools.clone())
.await
.map_err(|err| Error::from_string(err.to_string()))?;
info!(" init store success!");
let buckets_list = store
.list_bucket(&BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.map_err(|err| Error::from_string(err.to_string()))?;
let buckets = buckets_list.iter().map(|v| v.name.clone()).collect();
init_bucket_metadata_sys(store.clone(), buckets).await;
tokio::select! {
_ = tokio::signal::ctrl_c() => {

View File

@@ -1,15 +1,19 @@
use bytes::BufMut;
use bytes::Bytes;
use ecstore::bucket::get_bucket_metadata_sys;
use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use ecstore::bucket::tags::Tags;
use ecstore::bucket_meta::BucketMetadata;
use ecstore::disk::error::DiskError;
use ecstore::disk::RUSTFS_META_BUCKET;
use ecstore::store::new_object_layer_fn;
use ecstore::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
use ecstore::store_api::DeleteBucketOptions;
use ecstore::store_api::HTTPRangeSpec;
use ecstore::store_api::MakeBucketOptions;
use ecstore::store_api::MultipartUploadResult;
use ecstore::store_api::ObjectIO;
use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
@@ -17,6 +21,7 @@ use ecstore::store_api::StorageAPI;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::HeaderMap;
use log::warn;
use s3s::dto::*;
use s3s::s3_error;
use s3s::Body;
@@ -25,6 +30,7 @@ use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
use transform_stream::AsyncTryStream;
@@ -248,7 +254,7 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
@@ -322,7 +328,7 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
if let Err(e) = store.get_bucket_info(&input.bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
@@ -373,7 +379,7 @@ impl S3 for FS {
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
let bucket_infos = try_!(store.list_bucket(&BucketOptions {}).await);
let bucket_infos = try_!(store.list_bucket(&BucketOptions::default()).await);
let buckets: Vec<Bucket> = bucket_infos
.iter()
@@ -701,53 +707,19 @@ impl S3 for FS {
}))
.await?;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
let meta_obj = try_!(
store
.get_object_reader(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
HTTPRangeSpec::nil(),
Default::default(),
&ObjectOptions::default(),
)
.await
);
let stream = meta_obj.stream;
let mut data = vec![];
pin_mut!(stream);
while let Some(x) = stream.next().await {
let x = try_!(x);
data.put_slice(&x[..]);
let mut tag_map = HashMap::new();
for tag in tagging.tag_set.iter() {
tag_map.insert(tag.key.clone(), tag.value.clone());
}
let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
if tagging.tag_set.is_empty() {
meta.tagging = None;
} else {
meta.tagging = Some(tagging.tag_set.into_iter().map(|x| (x.key, x.value)).collect())
}
let tags = Tags::new(tag_map, false);
let data = try_!(meta.marshal_msg());
let len = data.len();
try_!(
store
.put_object(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
PutObjReader::new(StreamingBlob::from(Body::from(data)), len),
&ObjectOptions::default(),
)
.await
);
let data = try_!(tags.marshal_msg());
let _updated = try_!(bucket_meta_sys.update(&bucket, BUCKET_TAGGING_CONFIG, data).await);
Ok(S3Response::new(Default::default()))
}
@@ -763,51 +735,23 @@ impl S3 for FS {
}))
.await?;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let meta_obj = try_!(
store
.get_object_reader(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
HTTPRangeSpec::nil(),
Default::default(),
&ObjectOptions::default(),
)
.await
);
let stream = meta_obj.stream;
let mut data = vec![];
pin_mut!(stream);
while let Some(x) = stream.next().await {
let x = try_!(x);
data.put_slice(&x[..]);
}
let meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
if meta.tagging.is_none() {
return Err({
let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist");
err.set_status_code("404".try_into().unwrap());
err
});
}
Ok(S3Response::new(GetBucketTaggingOutput {
tag_set: meta
.tagging
.unwrap()
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
let tag_set: Vec<Tag> = match bucket_meta_sys.get_tagging_config(&bucket).await {
Ok((tags, _)) => tags
.tag_set
.tag_map
.into_iter()
.map(|(key, value)| Tag { key, value })
.collect(),
}))
Err(err) => {
warn!("get_tagging_config err {:?}", &err);
// TODO: check not found
Vec::new()
}
};
Ok(S3Response::new(GetBucketTaggingOutput { tag_set }))
}
#[tracing::instrument(level = "debug", skip(self))]
@@ -824,48 +768,16 @@ impl S3 for FS {
}))
.await?;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
let meta_obj = try_!(
store
.get_object_reader(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
HTTPRangeSpec::nil(),
Default::default(),
&ObjectOptions::default(),
)
.await
);
let tag_map = HashMap::new();
let stream = meta_obj.stream;
let tags = Tags::new(tag_map, false);
let mut data = vec![];
pin_mut!(stream);
let data = try_!(tags.marshal_msg());
while let Some(x) = stream.next().await {
let x = try_!(x);
data.put_slice(&x[..]);
}
let mut meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
meta.tagging = None;
let data = try_!(meta.marshal_msg());
let len = data.len();
try_!(
store
.put_object(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
PutObjReader::new(StreamingBlob::from(Body::from(data)), len),
&ObjectOptions::default(),
)
.await
);
let _updated = try_!(bucket_meta_sys.update(&bucket, BUCKET_TAGGING_CONFIG, data).await);
Ok(S3Response::new(DeleteBucketTaggingOutput {}))
}