This commit is contained in:
weisd
2024-10-01 04:35:25 +08:00
parent 87a332ee81
commit 32baa7f146
39 changed files with 513 additions and 238 deletions

14
TODO.md
View File

@@ -2,18 +2,18 @@
## 基础存储
- [ ] EC可用读写数量判断 Read/WriteQuorum
- [ ] 优化并发执行,边读边取,可中断
- [x] EC可用读写数量判断 Read/WriteQuorum
- [ ] 优化后台并发执行,可中断
- [ ] 小文件存储到metafile, inlinedata
- [ ] 完善bucketmeta
- [ ] 对象锁
- [ ] 代码优化 使用范型?
- [ ] 抽象出metafile存储
- [ ] 边读写边hash
- [x] 对象锁
- [ ] 边读写边hash实现reader嵌套
- [x] 远程rpc
- [x] 错误类型判断,程序中判断错误类型,如何统一错误
- [x] 优化xlmeta, 自定义msg数据结构
- [x] appendFile, createFile, readFile, walk_dir sync io
- [ ] 优化io.reader 参考 GetObjectNInfo 方便io copy 如果 异步写,再平衡
- [ ] 代码优化 使用范型?
- [ ] 抽象出metafile存储
## 基础功能

View File

@@ -21,20 +21,20 @@ impl std::str::FromStr for Algorithm {
}
// 定义EncryptionAction结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct EncryptionAction {
algorithm: Option<Algorithm>,
master_key_id: Option<String>,
}
// 定义Rule结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Rule {
default_encryption_action: EncryptionAction,
}
// 定义BucketSSEConfig结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct BucketSSEConfig {
xml_ns: String,
xml_name: String,

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use name::Name;
// 定义common结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
struct Common {
pub id: String,
pub filter: S3Key,
@@ -13,57 +13,57 @@ struct Common {
}
// 定义Queue结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
struct Queue {
pub common: Common,
pub arn: ARN,
}
// 定义ARN结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ARN {
pub target_id: TargetID,
pub region: String,
}
// 定义TargetID结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct TargetID {
pub id: String,
pub name: String,
}
// 定义FilterRule结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct FilterRule {
pub name: String,
pub value: String,
}
// 定义FilterRuleList结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct FilterRuleList {
pub rules: Vec<FilterRule>,
}
// 定义S3Key结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct S3Key {
pub rule_list: FilterRuleList,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Lambda {
arn: String,
}
// 定义Topic结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Topic {
arn: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Config {
queue_list: Vec<Queue>,
lambda_list: Vec<Lambda>,

View File

@@ -1,6 +1,6 @@
use super::{prefix::Prefix, tag::Tag};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct And {
pub object_size_greater_than: i64,
pub object_size_less_than: i64,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct DelMarkerExpiration {
pub days: usize,
}

View File

@@ -3,21 +3,21 @@ use time::OffsetDateTime;
// ExpirationDays is a type alias to unmarshal Days in Expiration
pub type ExpirationDays = usize;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExpirationDate(Option<OffsetDateTime>);
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExpireDeleteMarker {
pub marker: Boolean,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Boolean {
pub val: bool,
pub set: bool,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Expiration {
pub days: Option<ExpirationDays>,
pub date: Option<ExpirationDate>,

View File

@@ -3,7 +3,7 @@ use std::collections::HashMap;
use super::{and::And, prefix::Prefix, tag::Tag};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Filter {
pub set: bool,

View File

@@ -2,7 +2,7 @@ use super::rule::Rule;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Lifecycle {
pub rules: Vec<Rule>,
pub expiry_updated_at: Option<OffsetDateTime>,

View File

@@ -1,14 +1,14 @@
use super::{expiration::ExpirationDays, transition::TransitionDays};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct NoncurrentVersionExpiration {
pub noncurrent_days: ExpirationDays,
pub newer_noncurrent_versions: usize,
set: bool,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct NoncurrentVersionTransition {
pub noncurrent_days: TransitionDays,
pub storage_class: String,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Prefix {
pub val: String,
pub set: bool,

View File

@@ -8,14 +8,14 @@ use super::{
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub enum Status {
#[default]
Enabled,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Rule {
pub id: String,
pub status: Status,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Tag {
pub key: String,
pub value: String,

View File

@@ -3,10 +3,10 @@ use time::OffsetDateTime;
pub type TransitionDays = usize;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct TransitionDate(Option<OffsetDateTime>);
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Transition {
pub days: Option<TransitionDays>,
pub date: Option<TransitionDate>,

View File

@@ -1,32 +1,34 @@
use byteorder::{BigEndian, ByteOrder};
use super::{
encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy,
quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning,
};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use rmp_serde::Serializer as rmpSerializer;
use serde::Serializer;
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
use std::str::FromStr;
use time::macros::datetime;
use time::OffsetDateTime;
use tracing::error;
use super::{
encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy,
quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning,
};
use crate::error::Result;
use crate::bucket::tags;
use crate::config::common::{read_config, save_config};
use crate::config::error::ConfigError;
use crate::error::{Error, Result};
use crate::disk::BUCKET_META_PREFIX;
use crate::utils::crypto::hex;
use crate::store::ECStore;
use crate::store_api::StorageAPI;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;
pub const BUCKET_METADATA_VERSION: u16 = 1;
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "PascalCase", default)]
pub struct BucketMetadata {
pub name: String,
#[serde(serialize_with = "write_times")]
pub created: OffsetDateTime,
pub lock_enabled: bool, // 虽然标记为不使用,但可能需要保留
pub policy_config_json: Vec<u8>,
@@ -41,27 +43,16 @@ pub struct BucketMetadata {
pub bucket_targets_config_json: Vec<u8>,
pub bucket_targets_config_meta_json: Vec<u8>,
#[serde(serialize_with = "write_times")]
pub policy_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub object_lock_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub encryption_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub tagging_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub quota_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub replication_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub versioning_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub lifecycle_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub notification_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub bucket_targets_config_updated_at: OffsetDateTime,
#[serde(serialize_with = "write_times")]
pub bucket_targets_config_meta_updated_at: OffsetDateTime,
#[serde(skip)]
@@ -147,9 +138,9 @@ impl BucketMetadata {
format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE)
}
fn msg_size(&self) -> usize {
unimplemented!()
}
// fn msg_size(&self) -> usize {
// unimplemented!()
// }
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
@@ -159,12 +150,99 @@ impl BucketMetadata {
Ok(buf)
}
pub fn unmarshal(_buf: &[u8]) -> Result<Self> {
unimplemented!()
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketMetadata = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn check_header(buf: &[u8]) -> Result<()> {
if buf.len() <= 4 {
return Err(Error::msg("read_bucket_metadata: data invalid"));
}
// TODO: check version
Ok(())
}
fn default_timestamps(&mut self) {
if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.tagging_config_updated_at = self.created
}
}
async fn save(&mut self, api: &ECStore) -> Result<()> {
self.parse_all_configs(api)?;
let mut buf: Vec<u8> = vec![0; 4];
LittleEndian::write_u16(&mut buf[0..2], BUCKET_METADATA_FORMAT);
LittleEndian::write_u16(&mut buf[2..4], BUCKET_METADATA_VERSION);
let data = self.marshal_msg()?;
buf.extend_from_slice(&data);
save_config(api, self.save_file_path().as_str(), &buf).await?;
Ok(())
}
fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> {
if !self.tagging_config_xml.is_empty() {
self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?);
}
Ok(())
}
}
fn deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result<S, D::Error>
pub async fn load_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketMetadata> {
load_bucket_metadata_parse(api, bucket, true).await
}
async fn load_bucket_metadata_parse(api: &ECStore, bucket: &str, parse: bool) -> Result<BucketMetadata> {
let mut bm = match read_bucket_metadata(api, bucket).await {
Ok(res) => res,
Err(err) => {
if let Some(e) = err.downcast_ref::<ConfigError>() {
if !ConfigError::is_not_found(&e) {
return Err(err);
}
}
BucketMetadata::new(bucket)
}
};
bm.default_timestamps();
if parse {
bm.parse_all_configs(api)?;
}
// TODO: parse_all_configs
Ok(bm)
}
async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketMetadata> {
if bucket.is_empty() {
error!("bucket name empty");
return Err(Error::msg("invalid argument"));
}
let bm = BucketMetadata::new(&bucket);
let file_path = bm.save_file_path();
let data = read_config(api, &file_path).await?;
BucketMetadata::check_header(&data)?;
BucketMetadata::unmarshal(&data[4..])
}
fn _deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result<S, D::Error>
where
S: FromStr,
S::Err: Display,
@@ -175,7 +253,7 @@ where
unimplemented!()
}
fn write_times<S>(t: &OffsetDateTime, s: S) -> Result<S::Ok, S::Error>
fn _write_time<S>(t: &OffsetDateTime, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
@@ -183,47 +261,16 @@ where
let sec = t.unix_timestamp() - 62135596800;
let nsec = t.nanosecond();
buf[0] = 0xc7;
buf[1] = 0x0c;
buf[2] = 0x05;
buf[0] = 0xc7; // mext8
buf[1] = 0x0c; // 长度
buf[2] = 0x05; // 时间扩展类型
BigEndian::write_u64(&mut buf[3..], sec as u64);
BigEndian::write_u32(&mut buf[11..], nsec as u32);
s.serialize_bytes(&buf)
}
fn write_time(t: OffsetDateTime) -> Result<(), String> {
// let t = t.saturating_sub(0); // 转换为自 UNIX_EPOCH 以来的时间
println!("t:{:?}", t);
println!("offset:{:?}", datetime!(0-01-01 0:00 UTC));
let mut buf = vec![0x0; 15];
let sec = t.unix_timestamp() - 62135596800;
let nsec = t.nanosecond();
println!("sec:{:?}", sec);
println!("nsec:{:?}", nsec);
buf[0] = 0xc7;
buf[1] = 0x0c;
buf[2] = 0x05;
// 扩展格式的时间类型
// buf.push(0xc7); // mext8
// buf.push(12); // 长度
// buf.push(0x05); // 时间扩展类型
// 将 Unix 时间戳和纳秒部分写入缓冲区
BigEndian::write_u64(&mut buf[3..], sec as u64);
BigEndian::write_u32(&mut buf[11..], nsec as u32);
println!("hex:{:?}", hex(buf));
Ok(())
}
#[cfg(test)]
mod test {
use crate::utils::crypto::hex;
use super::*;
@@ -235,6 +282,8 @@ mod test {
let buf = bm.marshal_msg().unwrap();
println!("{:?}", hex(buf))
let new = BucketMetadata::unmarshal(&buf).unwrap();
assert_eq!(bm.name, new.name);
}
}

View File

@@ -0,0 +1,82 @@
use std::{collections::HashMap, sync::Arc};
use crate::error::{Error, Result};
use crate::store::ECStore;
use lazy_static::lazy_static;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use super::metadata::{load_bucket_metadata, BucketMetadata};
use super::tags;
lazy_static! {
pub static ref GLOBAL_BucketMetadataSys: Arc<Option<BucketMetadataSys>> = Arc::new(Some(BucketMetadataSys::new()));
}
pub fn get_bucket_metadata_sys() -> Arc<Option<BucketMetadataSys>> {
GLOBAL_BucketMetadataSys.clone()
}
#[derive(Debug, Default)]
pub struct BucketMetadataSys {
metadata_map: RwLock<HashMap<String, BucketMetadata>>,
api: Option<Arc<ECStore>>,
initialized: RwLock<bool>,
}
impl BucketMetadataSys {
fn new() -> Self {
Self { ..Default::default() }
}
pub fn init(&mut self, api: Arc<ECStore>, buckets: Vec<String>) {
self.api = Some(api);
unimplemented!()
}
async fn reset(&mut self) {
let mut map = self.metadata_map.write().await;
map.clear();
}
pub async fn get_config(&self, bucket: String) -> Result<(BucketMetadata, bool)> {
if let Some(api) = self.api.as_ref() {
let has_bm = {
let map = self.metadata_map.read().await;
if let Some(bm) = map.get(&bucket) {
Some(bm.clone())
} else {
None
}
};
if let Some(bm) = has_bm {
return Ok((bm, false));
} else {
let bm = match load_bucket_metadata(&api, bucket.as_str()).await {
Ok(res) => res,
Err(err) => {
if *self.initialized.read().await {
return Err(Error::msg("errBucketMetadataNotInitialized"));
} else {
return Err(err);
}
}
};
let mut map = self.metadata_map.write().await;
map.insert(bucket, bm.clone());
Ok((bm, true))
}
} else {
Err(Error::msg("errBucketMetadataNotInitialized"))
}
}
pub async fn get_tagging_config(&self, bucket: String) -> Result<(tags::Tags, Option<OffsetDateTime>)> {
unimplemented!()
}
}

View File

@@ -1,7 +1,8 @@
mod encryption;
mod event;
mod lifecycle;
mod metadata;
pub mod metadata;
mod metadata_sys;
mod objectlock;
mod policy;
mod quota;
@@ -10,4 +11,4 @@ mod tags;
mod target;
mod versioning;
pub use metadata::BucketMetadata;
pub use metadata_sys::get_bucket_metadata_sys;

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)]
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
pub enum RetMode {
#[default]
Govenance,
@@ -20,19 +20,19 @@ impl std::str::FromStr for RetMode {
}
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DefaultRetention {
pub mode: RetMode,
pub days: Option<usize>,
pub years: Option<usize>,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
pub default_retention: DefaultRetention,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
pub object_lock_enabled: String,
pub rule: Option<Rule>,

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ActionSet(HashSet<Action>);
impl ActionSet {}

View File

@@ -9,7 +9,7 @@ use super::{
resource::ResourceSet,
};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketPolicyArgs {
account_name: String,
groups: Vec<String>,
@@ -20,7 +20,7 @@ pub struct BucketPolicyArgs {
object_name: String,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BPStatement {
sid: String,
effect: Effect,
@@ -32,7 +32,7 @@ pub struct BPStatement {
conditions: Option<Functions>,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketPolicy {
pub id: String,
pub version: String,

View File

@@ -41,14 +41,14 @@ pub trait FunctionApi {
// }
// }
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
enum Function {
#[default]
Test,
}
// 定义Functions类型
#[derive(Deserialize, Serialize, Default)]
#[derive(Deserialize, Serialize, Default, Clone)]
pub struct Functions(Vec<Function>);
impl Debug for Functions {

View File

@@ -15,11 +15,11 @@ const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::";
const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::";
// 定义Resource结构体
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash)]
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
pub struct Resource {
pattern: String,
r#type: ResourceARNType,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ResourceSet(HashSet<Resource>);

View File

@@ -7,7 +7,7 @@ pub enum QuotaType {
}
// 定义BucketQuota结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketQuota {
quota: Option<u64>, // 使用Option来表示可能不存在的字段

View File

@@ -2,7 +2,7 @@ use super::tag::Tag;
use serde::{Deserialize, Serialize};
// 定义And结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct And {
prefix: Option<String>,
tags: Option<Vec<Tag>>,

View File

@@ -3,7 +3,7 @@ use super::tag::Tag;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Filter {
prefix: String,
and: And,

View File

@@ -6,7 +6,7 @@ mod tag;
use rule::Rule;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Config {
rules: Vec<Rule>,
role_arn: String,

View File

@@ -1,29 +1,29 @@
use super::filter::Filter;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub enum Status {
#[default]
Enabled,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct DeleteMarkerReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct DeleteReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExistingObjectReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Destination {
pub bucket: String,
pub storage_class: String,
@@ -31,18 +31,18 @@ pub struct Destination {
}
// 定义ReplicaModifications结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ReplicaModifications {
status: Status,
}
// 定义SourceSelectionCriteria结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct SourceSelectionCriteria {
replica_modifications: ReplicaModifications,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Rule {
pub id: String,
pub status: Status,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Tag {
pub key: Option<String>,
pub value: Option<String>,

View File

@@ -1,8 +1,10 @@
use crate::error::{Error, Result};
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
// 定义tagSet结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TagSet {
#[serde(rename = "Tag")]
tag_map: HashMap<String, String>,
@@ -10,10 +12,25 @@ pub struct TagSet {
}
// 定义tagging结构体
#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Tags {
#[serde(rename = "Tagging")]
xml_name: String,
#[serde(rename = "TagSet")]
tag_set: Option<TagSet>,
}
impl Tags {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Tags = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Credentials {
access_key: String,
secret_key: String,
@@ -10,13 +10,13 @@ pub struct Credentials {
expiration: Option<OffsetDateTime>,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub enum ServiceType {
#[default]
Replication,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct LatencyStat {
curr: Duration, // 当前延迟
avg: Duration, // 平均延迟
@@ -24,7 +24,7 @@ pub struct LatencyStat {
}
// 定义BucketTarget结构体
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct BucketTarget {
source_bucket: String,
@@ -73,7 +73,7 @@ pub struct BucketTarget {
edge: bool,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct BucketTargets {
pub targets: Vec<BucketTarget>,
}

View File

@@ -25,12 +25,12 @@ impl std::fmt::Display for State {
}
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct ExcludedPrefix {
pub prefix: String,
}
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
pub struct Versioning {
pub status: State,
pub excluded_prefixes: Vec<ExcludedPrefix>,

View File

@@ -14,12 +14,10 @@ pub const BUCKET_METADATA_VERSION: u16 = 1;
#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
pub struct BucketMetadata {
format: u16,
version: u16,
pub format: u16,
pub version: u16,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub tagging: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub created: Option<OffsetDateTime>,
}
@@ -60,7 +58,8 @@ impl BucketMetadata {
Ok(buf)
}
pub fn unmarshal_from(buffer: &[u8]) -> Result<Self> {
Ok(rmp_serde::from_slice(buffer)?)
pub fn unmarshal_from(buf: &[u8]) -> Result<Self> {
let t: BucketMetadata = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -0,0 +1,54 @@
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use crate::store::ECStore;
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader};
use http::HeaderMap;
use s3s::dto::StreamingBlob;
use s3s::Body;
use super::error::ConfigError;
pub async fn read_config(api: &ECStore, file: &str) -> Result<Vec<u8>> {
let (data, _obj) = read_config_with_metadata(api, file, &ObjectOptions::default()).await?;
Ok(data)
}
async fn read_config_with_metadata(api: &ECStore, file: &str, opts: &ObjectOptions) -> Result<(Vec<u8>, ObjectInfo)> {
let range = HTTPRangeSpec::nil();
let h = HeaderMap::new();
let mut rd = api.get_object_reader(RUSTFS_META_BUCKET, file, range, h, &opts).await?;
let data = rd.read_all().await?;
if data.is_empty() {
return Err(Error::new(ConfigError::NotFound));
}
Ok((data, rd.object_info))
}
pub async fn save_config(api: &ECStore, file: &str, data: &[u8]) -> Result<()> {
save_config_with_opts(
api,
file,
data,
&ObjectOptions {
max_parity: true,
..Default::default()
},
)
.await
}
async fn save_config_with_opts(api: &ECStore, file: &str, data: &[u8], opts: &ObjectOptions) -> Result<()> {
let _ = api
.put_object(
RUSTFS_META_BUCKET,
file,
PutObjReader::new(StreamingBlob::from(Body::from(data.to_vec())), data.len()),
&ObjectOptions::default(),
)
.await?;
Ok(())
}

View File

@@ -0,0 +1,15 @@
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("config not found")]
NotFound,
}
impl ConfigError {
/// Returns `true` if the config error is [`NotFound`].
///
/// [`NotFound`]: ConfigError::NotFound
#[must_use]
pub fn is_not_found(&self) -> bool {
matches!(self, Self::NotFound)
}
}

View File

@@ -0,0 +1,2 @@
pub mod common;
pub mod error;

View File

@@ -1,5 +1,6 @@
pub mod bucket_meta;
mod chunk_stream;
mod config;
pub mod disk;
pub mod disks_layout;
pub mod endpoints;

View File

@@ -19,8 +19,8 @@ use crate::{
store::{GLOBAL_IsDistErasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
store_api::{
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
PutObjReader, StorageAPI,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete,
PartInfo, PutObjReader, StorageAPI,
},
utils::hash,
};
@@ -258,6 +258,25 @@ struct DelObj {
obj: ObjectToDelete,
}
#[async_trait::async_trait]
impl ObjectIO for Sets {
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
self.get_disks_by_key(object)
.get_object_reader(bucket, object, range, h, opts)
.await
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).put_object(bucket, object, data, opts).await
}
}
#[async_trait::async_trait]
impl StorageAPI for Sets {
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
@@ -383,22 +402,6 @@ impl StorageAPI for Sets {
.await
}
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
self.get_disks_by_key(object)
.get_object_reader(bucket, object, range, h, opts)
.await
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
self.get_disks_by_key(object).put_object(bucket, object, data, opts).await
}
async fn put_object_part(
&self,
bucket: &str,

View File

@@ -1,7 +1,8 @@
#![allow(clippy::map_entry)]
use crate::disk::endpoint::EndpointType;
use crate::store_api::ObjectIO;
use crate::{
bucket::BucketMetadata,
bucket::metadata::BucketMetadata,
disk::{error::DiskError, new_disk, DiskOption, DiskStore, WalkDirOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
endpoints::{EndpointServerPools, SetupType},
error::{Error, Result},
@@ -494,6 +495,38 @@ pub struct ListPathOptions {
pub limit: i32,
}
#[async_trait::async_trait]
impl ObjectIO for ECStore {
#[tracing::instrument(level = "debug", skip(self))]
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await;
}
unimplemented!()
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), data, opts).await;
}
unimplemented!()
}
}
#[async_trait::async_trait]
impl StorageAPI for ECStore {
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
@@ -740,34 +773,6 @@ impl StorageAPI for ECStore {
unimplemented!()
}
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader> {
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].get_object_reader(bucket, object.as_str(), range, h, opts).await;
}
unimplemented!()
}
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].put_object(bucket, object.as_str(), data, opts).await;
}
unimplemented!()
}
async fn put_object_part(
&self,
bucket: &str,

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use crate::error::{Error, Result};
use futures::StreamExt;
use http::HeaderMap;
use rmp_serde::Serializer;
use s3s::dto::StreamingBlob;
@@ -281,12 +282,26 @@ pub struct GetObjectReader {
pub object_info: ObjectInfo,
}
// impl GetObjectReader {
// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self {
// GetObjectReader { stream, object_info }
// }
// }
impl GetObjectReader {
// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self {
// GetObjectReader { stream, object_info }
// }
pub async fn read_all(&mut self) -> Result<Vec<u8>> {
let mut data = Vec::new();
while let Some(x) = self.stream.next().await {
let buf = match x {
Ok(res) => res,
Err(e) => return Err(Error::msg(e.to_string())),
};
data.extend_from_slice(buf.as_ref());
}
Ok(data)
}
}
#[derive(Debug)]
pub struct HTTPRangeSpec {
pub is_suffix_length: bool,
pub start: i64,
@@ -510,7 +525,20 @@ pub struct DeletedObject {
}
#[async_trait::async_trait]
pub trait StorageAPI {
pub trait ObjectIO: Send + Sync + 'static {
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader>;
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo>;
}
#[async_trait::async_trait]
pub trait StorageAPI: ObjectIO {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
@@ -536,15 +564,15 @@ pub trait StorageAPI {
async fn put_object_info(&self, bucket: &str, object: &str, info: ObjectInfo, opts: &ObjectOptions) -> Result<()>;
async fn get_object_reader(
&self,
bucket: &str,
object: &str,
range: HTTPRangeSpec,
h: HeaderMap,
opts: &ObjectOptions,
) -> Result<GetObjectReader>;
async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo>;
// async fn get_object_reader(
// &self,
// bucket: &str,
// object: &str,
// range: HTTPRangeSpec,
// h: HeaderMap,
// opts: &ObjectOptions,
// ) -> Result<GetObjectReader>;
// async fn put_object(&self, bucket: &str, object: &str, data: PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn put_object_part(
&self,
bucket: &str,

View File

@@ -1,5 +1,6 @@
use bytes::BufMut;
use bytes::Bytes;
use ecstore::bucket::get_bucket_metadata_sys;
use ecstore::bucket_meta::BucketMetadata;
use ecstore::disk::error::DiskError;
use ecstore::disk::RUSTFS_META_BUCKET;
@@ -10,6 +11,7 @@ use ecstore::store_api::DeleteBucketOptions;
use ecstore::store_api::HTTPRangeSpec;
use ecstore::store_api::MakeBucketOptions;
use ecstore::store_api::MultipartUploadResult;
use ecstore::store_api::ObjectIO;
use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
@@ -769,45 +771,62 @@ impl S3 for FS {
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
let meta_obj = try_!(
store
.get_object_reader(
RUSTFS_META_BUCKET,
BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
HTTPRangeSpec::nil(),
Default::default(),
&ObjectOptions::default(),
)
.await
);
// let a = get_bucket_metadata_sys();
let stream = meta_obj.stream;
Ok(S3Response::new(GetBucketTaggingOutput { ..Default::default() }))
let mut data = vec![];
pin_mut!(stream);
// let meta_obj = try_!(
// store
// .get_object_reader(
// RUSTFS_META_BUCKET,
// BucketMetadata::new(bucket.as_str()).save_file_path().as_str(),
// HTTPRangeSpec::nil(),
// Default::default(),
// &ObjectOptions::default(),
// )
// .await
// );
while let Some(x) = stream.next().await {
let x = try_!(x);
data.put_slice(&x[..]);
}
// let stream = meta_obj.stream;
let meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
if meta.tagging.is_none() {
return Err({
let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist");
err.set_status_code("404".try_into().unwrap());
err
});
}
// let mut data = vec![];
// pin_mut!(stream);
Ok(S3Response::new(GetBucketTaggingOutput {
tag_set: meta
.tagging
.unwrap()
.into_iter()
.map(|(key, value)| Tag { key, value })
.collect(),
}))
// while let Some(x) = stream.next().await {
// let x = try_!(x);
// data.put_slice(&x[..]);
// }
// if data.is_empty() {
// return Err({
// let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist");
// err.set_status_code("404".try_into().unwrap());
// err
// });
// }
// warn!("bm: 1111");
// let meta = try_!(BucketMetadata::unmarshal_from(&data[..]));
// warn!("bm 33333");
// if meta.tagging.is_none() {
// return Err({
// let mut err = S3Error::with_message(S3ErrorCode::Custom("NoSuchTagSet".into()), "The TagSet does not exist");
// err.set_status_code("404".try_into().unwrap());
// err
// });
// }
// warn!("bm:4444");
// Ok(S3Response::new(GetBucketTaggingOutput {
// tag_set: meta
// .tagging
// .unwrap()
// .into_iter()
// .map(|(key, value)| Tag { key, value })
// .collect(),
// }))
}
#[tracing::instrument(level = "debug", skip(self))]