add bucket sys marshal func

This commit is contained in:
weisd
2024-10-09 15:06:50 +08:00
parent 2fe9d75cb5
commit 9ecb045131
13 changed files with 456 additions and 29 deletions

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义Algorithm枚举类型
@@ -21,22 +23,37 @@ impl std::str::FromStr for Algorithm {
}
// 定义EncryptionAction结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct EncryptionAction {
algorithm: Option<Algorithm>,
master_key_id: Option<String>,
}
// 定义Rule结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
default_encryption_action: EncryptionAction,
}
// 定义BucketSSEConfig结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketSSEConfig {
xml_ns: String,
xml_name: String,
rules: Vec<Rule>,
}
impl BucketSSEConfig {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketSSEConfig = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -2,4 +2,18 @@
pub enum BucketMetadataError {
#[error("tagging not found")]
TaggingNotFound,
#[error("bucket policy not found")]
BucketPolicyNotFound,
#[error("bucket object lock not found")]
BucketObjectLockConfigNotFound,
#[error("bucket lifecycle not found")]
BucketLifecycleNotFound,
#[error("bucket SSE config not found")]
BucketSSEConfigNotFound,
#[error("bucket quota config not found")]
BucketQuotaConfigNotFound,
#[error("bucket replication config not found")]
BucketReplicationConfigNotFound,
#[error("bucket remote target not found")]
BucketRemoteTargetNotFound,
}

View File

@@ -1,11 +1,12 @@
mod name;
use crate::error::Result;
use name::Name;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use name::Name;
// 定义common结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
struct Common {
pub id: String,
pub filter: S3Key,
@@ -13,59 +14,74 @@ struct Common {
}
// 定义Queue结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
struct Queue {
pub common: Common,
pub arn: ARN,
}
// 定义ARN结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ARN {
pub target_id: TargetID,
pub region: String,
}
// 定义TargetID结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TargetID {
pub id: String,
pub name: String,
}
// 定义FilterRule结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct FilterRule {
pub name: String,
pub value: String,
}
// 定义FilterRuleList结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct FilterRuleList {
pub rules: Vec<FilterRule>,
}
// 定义S3Key结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct S3Key {
pub rule_list: FilterRuleList,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Lambda {
arn: String,
}
// 定义Topic结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Topic {
arn: String,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
queue_list: Vec<Queue>,
lambda_list: Vec<Lambda>,
topic_list: Vec<Topic>,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,9 +1,26 @@
use super::rule::Rule;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Lifecycle {
pub rules: Vec<Rule>,
pub expiry_updated_at: Option<OffsetDateTime>,
}
impl Lifecycle {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Lifecycle = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -190,9 +190,44 @@ impl BucketMetadata {
}
fn default_timestamps(&mut self) {
if self.policy_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.policy_config_updated_at = self.created
}
if self.encryption_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.encryption_config_updated_at = self.created
}
if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.tagging_config_updated_at = self.created
}
if self.object_lock_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.object_lock_config_updated_at = self.created
}
if self.quota_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.quota_config_updated_at = self.created
}
if self.replication_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.replication_config_updated_at = self.created
}
if self.versioning_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.versioning_config_updated_at = self.created
}
if self.lifecycle_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.lifecycle_config_updated_at = self.created
}
if self.notification_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.notification_config_updated_at = self.created
}
if self.bucket_targets_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.bucket_targets_config_updated_at = self.created
}
if self.bucket_targets_config_meta_updated_at == OffsetDateTime::UNIX_EPOCH {
self.bucket_targets_config_meta_updated_at = self.created
}
}
pub fn update_config(&mut self, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
@@ -264,9 +299,39 @@ impl BucketMetadata {
}
fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> {
if !self.policy_config_json.is_empty() {
self.policy_config = Some(BucketPolicy::unmarshal(&self.policy_config_json)?);
}
if !self.notification_config_xml.is_empty() {
self.notification_config = Some(event::Config::unmarshal(&self.notification_config_xml)?);
}
if !self.lifecycle_config_xml.is_empty() {
self.lifecycle_config = Some(Lifecycle::unmarshal(&self.lifecycle_config_xml)?);
}
if !self.object_lock_config_xml.is_empty() {
self.object_lock_config = Some(objectlock::Config::unmarshal(&self.object_lock_config_xml)?);
}
if !self.versioning_config_xml.is_empty() {
self.versioning_config = Some(Versioning::unmarshal(&self.versioning_config_xml)?);
}
if !self.encryption_config_xml.is_empty() {
self.sse_config = Some(BucketSSEConfig::unmarshal(&self.encryption_config_xml)?);
}
if !self.tagging_config_xml.is_empty() {
self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?);
}
if !self.quota_config_json.is_empty() {
self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?);
}
if !self.replication_config_xml.is_empty() {
self.replication_config = Some(replication::Config::unmarshal(&self.replication_config_xml)?);
}
if !self.bucket_targets_config_json.is_empty() {
self.bucket_target_config = Some(BucketTargets::unmarshal(&self.bucket_targets_config_json)?);
} else {
self.bucket_target_config = Some(BucketTargets::default())
}
Ok(())
}

View File

@@ -16,8 +16,13 @@ use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use super::encryption::BucketSSEConfig;
use super::lifecycle::lifecycle::Lifecycle;
use super::metadata::{load_bucket_metadata, BucketMetadata};
use super::tags;
use super::policy::bucket_policy::BucketPolicy;
use super::quota::BucketQuota;
use super::target::BucketTargets;
use super::{event, objectlock, policy, replication, tags, versioning};
lazy_static! {
static ref GLOBAL_BucketMetadataSys: Arc<RwLock<BucketMetadataSys>> = Arc::new(RwLock::new(BucketMetadataSys::new()));
@@ -236,6 +241,42 @@ impl BucketMetadataSys {
}
}
pub async fn get_versioning_config(&self, bucket: &str) -> Result<(versioning::Versioning, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_versioning_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Ok((versioning::Versioning::default(), OffsetDateTime::UNIX_EPOCH));
} else {
return Err(err);
}
}
};
Ok((bm.versioning_config.unwrap_or_default(), bm.versioning_config_updated_at))
}
pub async fn get_bucket_policy(&self, bucket: &str) -> Result<(BucketPolicy, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_bucket_policy err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketPolicyNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.policy_config {
Ok((config, bm.policy_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketPolicyNotFound))
}
}
pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
@@ -255,4 +296,163 @@ impl BucketMetadataSys {
Err(Error::new(BucketMetadataError::TaggingNotFound))
}
}
pub async fn get_object_lock_config(&self, bucket: &str) -> Result<(objectlock::Config, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_object_lock_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.object_lock_config {
Ok((config, bm.object_lock_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound))
}
}
pub async fn get_lifecycle_config(&self, bucket: &str) -> Result<(Lifecycle, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_lifecycle_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketLifecycleNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.lifecycle_config {
if config.rules.is_empty() {
Err(Error::new(BucketMetadataError::BucketLifecycleNotFound))
} else {
Ok((config, bm.lifecycle_config_updated_at))
}
} else {
Err(Error::new(BucketMetadataError::BucketLifecycleNotFound))
}
}
pub async fn get_notification_config(&self, bucket: &str) -> Result<Option<event::Config>> {
let bm = match self.get_config(bucket).await {
Ok((bm, _)) => bm.notification_config,
Err(err) => {
warn!("get_notification_config err {:?}", &err);
if config::error::is_not_found(&err) {
None
} else {
return Err(err);
}
}
};
Ok(bm)
}
pub async fn get_sse_config(&self, bucket: &str) -> Result<(BucketSSEConfig, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_sse_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.sse_config {
Ok((config, bm.encryption_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound))
}
}
pub async fn created_at(&self, bucket: &str) -> Result<OffsetDateTime> {
let bm = match self.get_config(bucket).await {
Ok((bm, _)) => bm.created,
Err(err) => {
return Err(err);
}
};
Ok(bm)
}
pub async fn get_quota_config(&self, bucket: &str) -> Result<(BucketQuota, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_quota_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.quota_config {
Ok((config, bm.quota_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound))
}
}
pub async fn get_replication_config(&self, bucket: &str) -> Result<(replication::Config, OffsetDateTime)> {
let (bm, reload) = match self.get_config(bucket).await {
Ok(res) => res,
Err(err) => {
warn!("get_replication_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.replication_config {
if reload {
// TODO: globalBucketTargetSys
}
Ok((config, bm.replication_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound))
}
}
pub async fn get_bucket_targets_config(&self, bucket: &str) -> Result<BucketTargets> {
let (bm, reload) = match self.get_config(bucket).await {
Ok(res) => res,
Err(err) => {
warn!("get_replication_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.bucket_target_config {
if reload {
// TODO: globalBucketTargetSys
}
Ok(config)
} else {
Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound))
}
}
}

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
@@ -37,3 +39,18 @@ pub struct Config {
pub object_lock_enabled: String,
pub rule: Option<Rule>,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -38,3 +40,18 @@ pub struct BucketPolicy {
pub version: String,
pub statements: Vec<BPStatement>,
}
impl BucketPolicy {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketPolicy = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义QuotaType枚举类型
@@ -19,3 +21,18 @@ pub struct BucketQuota {
quota_type: Option<QuotaType>,
}
impl BucketQuota {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketQuota = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -3,11 +3,28 @@ mod filter;
mod rule;
mod tag;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use rule::Rule;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
rules: Vec<Rule>,
role_arn: String,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,8 +1,10 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Credentials {
access_key: String,
secret_key: String,
@@ -10,13 +12,13 @@ pub struct Credentials {
expiration: Option<OffsetDateTime>,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub enum ServiceType {
#[default]
Replication,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct LatencyStat {
curr: Duration, // 当前延迟
avg: Duration, // 平均延迟
@@ -24,7 +26,7 @@ pub struct LatencyStat {
}
// 定义BucketTarget结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketTarget {
source_bucket: String,
@@ -73,7 +75,22 @@ pub struct BucketTarget {
edge: bool,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketTargets {
pub targets: Vec<BucketTarget>,
}
impl BucketTargets {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketTargets = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Deserialize, Serialize)]
@@ -25,14 +27,29 @@ impl std::fmt::Display for State {
}
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ExcludedPrefix {
pub prefix: String,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Versioning {
pub status: State,
pub excluded_prefixes: Vec<ExcludedPrefix>,
pub exclude_folders: bool,
}
impl Versioning {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Versioning = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,11 +1,8 @@
use bytes::BufMut;
use bytes::Bytes;
use ecstore::bucket::get_bucket_metadata_sys;
use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use ecstore::bucket::tags::Tags;
use ecstore::bucket_meta::BucketMetadata;
use ecstore::disk::error::DiskError;
use ecstore::disk::RUSTFS_META_BUCKET;
use ecstore::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
@@ -24,7 +21,6 @@ use http::HeaderMap;
use log::warn;
use s3s::dto::*;
use s3s::s3_error;
use s3s::Body;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;