Merge branch 'main' into feat/ec_config

This commit is contained in:
weisd
2024-10-10 13:06:39 +08:00
committed by GitHub
22 changed files with 864 additions and 133 deletions

View File

@@ -10,7 +10,7 @@ env:
jobs:
build:
timeout-minutes: 10
timeout-minutes: 30
runs-on: ubuntu-latest
strategy:
matrix:
@@ -67,10 +67,14 @@ jobs:
- name: checkout
uses: actions/checkout@v4
- name: run fs
- name: run fs start
working-directory: .
run: |
nohup make e2e-server &
- name: run fs test
working-directory: .
run: |
make e2e-server > /dev/null &
make probe-e2e
- name: e2e test

12
Cargo.lock generated
View File

@@ -1502,6 +1502,8 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f"
dependencies = [
"cc",
"libc",
"libm",
"lru",
"parking_lot",
@@ -1838,6 +1840,16 @@ dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"sha2-asm",
]
[[package]]
name = "sha2-asm"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b845214d6175804686b2bd482bcffe96651bb2d1200742b712003504a2dac1ab"
dependencies = [
"cc",
]
[[package]]

View File

@@ -580,7 +580,13 @@ pub struct GenerallyLockResponse {
}
/// Generated client implementations.
pub mod node_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
@@ -602,8 +608,8 @@ pub mod node_service_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
@@ -628,7 +634,7 @@ pub mod node_service_client {
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
>>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
{
NodeServiceClient::new(InterceptedService::new(inner, interceptor))
}
@@ -672,8 +678,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -697,8 +702,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -722,8 +726,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -747,8 +750,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -772,8 +774,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -797,8 +798,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -822,8 +822,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -844,8 +843,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -869,8 +867,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -894,8 +891,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -916,8 +912,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -941,8 +936,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -967,8 +961,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -992,8 +985,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1017,8 +1009,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1042,8 +1033,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1067,8 +1057,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1092,8 +1081,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1117,8 +1105,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1142,8 +1129,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1167,8 +1153,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1192,8 +1177,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1217,8 +1201,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1242,8 +1225,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1264,8 +1246,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1289,8 +1270,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1314,8 +1294,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1339,8 +1318,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1364,8 +1342,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1389,8 +1366,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1414,8 +1390,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1439,8 +1414,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1464,8 +1438,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1489,8 +1462,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1514,8 +1486,7 @@ pub mod node_service_client {
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1532,11 +1503,17 @@ pub mod node_service_client {
}
/// Generated server implementations.
pub mod node_service_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with NodeServiceServer.
#[async_trait]
pub trait NodeService: Send + Sync + 'static {
pub trait NodeService: std::marker::Send + std::marker::Sync + 'static {
/// -------------------------------meta service--------------------------
async fn ping(
&self,
@@ -1607,7 +1584,7 @@ pub mod node_service_server {
type WriteStreamStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::WriteResponse, tonic::Status>,
>
+ Send
+ std::marker::Send
+ 'static;
async fn write_stream(
&self,
@@ -1620,7 +1597,7 @@ pub mod node_service_server {
type ReadAtStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::ReadAtResponse, tonic::Status>,
>
+ Send
+ std::marker::Send
+ 'static;
/// rpc Append(AppendRequest) returns (AppendResponse) {};
async fn read_at(
@@ -1774,14 +1751,14 @@ pub mod node_service_server {
>;
}
#[derive(Debug)]
pub struct NodeServiceServer<T: NodeService> {
pub struct NodeServiceServer<T> {
inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
impl<T: NodeService> NodeServiceServer<T> {
impl<T> NodeServiceServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
@@ -1835,8 +1812,8 @@ pub mod node_service_server {
impl<T, B> tonic::codegen::Service<http::Request<B>> for NodeServiceServer<T>
where
T: NodeService,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
B: Body + std::marker::Send + 'static,
B::Error: Into<StdError> + std::marker::Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
@@ -3428,23 +3405,25 @@ pub mod node_service_server {
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", tonic::Code::Unimplemented as i32)
.header(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
)
.body(empty_body())
.unwrap(),
)
let mut response = http::Response::new(empty_body());
let headers = response.headers_mut();
headers
.insert(
tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(),
);
headers
.insert(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
})
}
}
}
}
impl<T: NodeService> Clone for NodeServiceServer<T> {
impl<T> Clone for NodeServiceServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
@@ -3456,7 +3435,9 @@ pub mod node_service_server {
}
}
}
impl<T: NodeService> tonic::server::NamedService for NodeServiceServer<T> {
const NAME: &'static str = "node_service.NodeService";
/// Generated gRPC service name
pub const SERVICE_NAME: &str = "node_service.NodeService";
impl<T> tonic::server::NamedService for NodeServiceServer<T> {
const NAME: &'static str = SERVICE_NAME;
}
}

View File

@@ -23,7 +23,7 @@ tracing-error.workspace = true
http.workspace = true
url.workspace = true
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = "6.0.0"
reed-solomon-erasure = { version = "6.0.0", features = [ "simd-accel" ] }
transform-stream = "0.3.0"
lazy_static.workspace = true
lock.workspace = true
@@ -37,7 +37,7 @@ s3s = "0.10.0"
crc32fast = "1.4.2"
siphasher = "1.0.1"
base64-simd = "0.8.0"
sha2 = "0.10.8"
sha2 = { version = "0.10.8", features = ["asm"] }
hex-simd = "0.8.0"
path-clean = "1.0.1"
tokio = { workspace = true, features = ["io-util", "sync"] }

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义Algorithm枚举类型
@@ -21,22 +23,37 @@ impl std::str::FromStr for Algorithm {
}
// 定义EncryptionAction结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct EncryptionAction {
algorithm: Option<Algorithm>,
master_key_id: Option<String>,
}
// 定义Rule结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
default_encryption_action: EncryptionAction,
}
// 定义BucketSSEConfig结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketSSEConfig {
xml_ns: String,
xml_name: String,
rules: Vec<Rule>,
}
impl BucketSSEConfig {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketSSEConfig = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -2,4 +2,18 @@
pub enum BucketMetadataError {
#[error("tagging not found")]
TaggingNotFound,
#[error("bucket policy not found")]
BucketPolicyNotFound,
#[error("bucket object lock not found")]
BucketObjectLockConfigNotFound,
#[error("bucket lifecycle not found")]
BucketLifecycleNotFound,
#[error("bucket SSE config not found")]
BucketSSEConfigNotFound,
#[error("bucket quota config not found")]
BucketQuotaConfigNotFound,
#[error("bucket replication config not found")]
BucketReplicationConfigNotFound,
#[error("bucket remote target not found")]
BucketRemoteTargetNotFound,
}

View File

@@ -1,11 +1,12 @@
mod name;
use crate::error::Result;
use name::Name;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use name::Name;
// 定义common结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
struct Common {
pub id: String,
pub filter: S3Key,
@@ -13,59 +14,74 @@ struct Common {
}
// 定义Queue结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
struct Queue {
pub common: Common,
pub arn: ARN,
}
// 定义ARN结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ARN {
pub target_id: TargetID,
pub region: String,
}
// 定义TargetID结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TargetID {
pub id: String,
pub name: String,
}
// 定义FilterRule结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct FilterRule {
pub name: String,
pub value: String,
}
// 定义FilterRuleList结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct FilterRuleList {
pub rules: Vec<FilterRule>,
}
// 定义S3Key结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct S3Key {
pub rule_list: FilterRuleList,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Lambda {
arn: String,
}
// 定义Topic结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Topic {
arn: String,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
queue_list: Vec<Queue>,
lambda_list: Vec<Lambda>,
topic_list: Vec<Topic>,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,9 +1,26 @@
use super::rule::Rule;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Lifecycle {
pub rules: Vec<Rule>,
pub expiry_updated_at: Option<OffsetDateTime>,
}
impl Lifecycle {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Lifecycle = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -188,9 +188,44 @@ impl BucketMetadata {
}
fn default_timestamps(&mut self) {
if self.policy_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.policy_config_updated_at = self.created
}
if self.encryption_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.encryption_config_updated_at = self.created
}
if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.tagging_config_updated_at = self.created
}
if self.object_lock_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.object_lock_config_updated_at = self.created
}
if self.quota_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.quota_config_updated_at = self.created
}
if self.replication_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.replication_config_updated_at = self.created
}
if self.versioning_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.versioning_config_updated_at = self.created
}
if self.lifecycle_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.lifecycle_config_updated_at = self.created
}
if self.notification_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.notification_config_updated_at = self.created
}
if self.bucket_targets_config_updated_at == OffsetDateTime::UNIX_EPOCH {
self.bucket_targets_config_updated_at = self.created
}
if self.bucket_targets_config_meta_updated_at == OffsetDateTime::UNIX_EPOCH {
self.bucket_targets_config_meta_updated_at = self.created
}
}
pub fn update_config(&mut self, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
@@ -262,9 +297,39 @@ impl BucketMetadata {
}
fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> {
if !self.policy_config_json.is_empty() {
self.policy_config = Some(BucketPolicy::unmarshal(&self.policy_config_json)?);
}
if !self.notification_config_xml.is_empty() {
self.notification_config = Some(event::Config::unmarshal(&self.notification_config_xml)?);
}
if !self.lifecycle_config_xml.is_empty() {
self.lifecycle_config = Some(Lifecycle::unmarshal(&self.lifecycle_config_xml)?);
}
if !self.object_lock_config_xml.is_empty() {
self.object_lock_config = Some(objectlock::Config::unmarshal(&self.object_lock_config_xml)?);
}
if !self.versioning_config_xml.is_empty() {
self.versioning_config = Some(Versioning::unmarshal(&self.versioning_config_xml)?);
}
if !self.encryption_config_xml.is_empty() {
self.sse_config = Some(BucketSSEConfig::unmarshal(&self.encryption_config_xml)?);
}
if !self.tagging_config_xml.is_empty() {
self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?);
}
if !self.quota_config_json.is_empty() {
self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?);
}
if !self.replication_config_xml.is_empty() {
self.replication_config = Some(replication::Config::unmarshal(&self.replication_config_xml)?);
}
if !self.bucket_targets_config_json.is_empty() {
self.bucket_target_config = Some(BucketTargets::unmarshal(&self.bucket_targets_config_json)?);
} else {
self.bucket_target_config = Some(BucketTargets::default())
}
Ok(())
}

View File

@@ -16,8 +16,13 @@ use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::{error, warn};
use super::encryption::BucketSSEConfig;
use super::lifecycle::lifecycle::Lifecycle;
use super::metadata::{load_bucket_metadata, BucketMetadata};
use super::tags;
use super::policy::bucket_policy::BucketPolicy;
use super::quota::BucketQuota;
use super::target::BucketTargets;
use super::{event, objectlock, replication, tags, versioning};
lazy_static! {
static ref GLOBAL_BucketMetadataSys: Arc<RwLock<BucketMetadataSys>> = Arc::new(RwLock::new(BucketMetadataSys::new()));
@@ -232,6 +237,42 @@ impl BucketMetadataSys {
}
}
pub async fn get_versioning_config(&self, bucket: &str) -> Result<(versioning::Versioning, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_versioning_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Ok((versioning::Versioning::default(), OffsetDateTime::UNIX_EPOCH));
} else {
return Err(err);
}
}
};
Ok((bm.versioning_config.unwrap_or_default(), bm.versioning_config_updated_at))
}
pub async fn get_bucket_policy(&self, bucket: &str) -> Result<(BucketPolicy, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_bucket_policy err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketPolicyNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.policy_config {
Ok((config, bm.policy_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketPolicyNotFound))
}
}
pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
@@ -251,4 +292,163 @@ impl BucketMetadataSys {
Err(Error::new(BucketMetadataError::TaggingNotFound))
}
}
pub async fn get_object_lock_config(&self, bucket: &str) -> Result<(objectlock::Config, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_object_lock_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.object_lock_config {
Ok((config, bm.object_lock_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound))
}
}
pub async fn get_lifecycle_config(&self, bucket: &str) -> Result<(Lifecycle, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_lifecycle_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketLifecycleNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.lifecycle_config {
if config.rules.is_empty() {
Err(Error::new(BucketMetadataError::BucketLifecycleNotFound))
} else {
Ok((config, bm.lifecycle_config_updated_at))
}
} else {
Err(Error::new(BucketMetadataError::BucketLifecycleNotFound))
}
}
pub async fn get_notification_config(&self, bucket: &str) -> Result<Option<event::Config>> {
let bm = match self.get_config(bucket).await {
Ok((bm, _)) => bm.notification_config,
Err(err) => {
warn!("get_notification_config err {:?}", &err);
if config::error::is_not_found(&err) {
None
} else {
return Err(err);
}
}
};
Ok(bm)
}
pub async fn get_sse_config(&self, bucket: &str) -> Result<(BucketSSEConfig, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_sse_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.sse_config {
Ok((config, bm.encryption_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound))
}
}
pub async fn created_at(&self, bucket: &str) -> Result<OffsetDateTime> {
let bm = match self.get_config(bucket).await {
Ok((bm, _)) => bm.created,
Err(err) => {
return Err(err);
}
};
Ok(bm)
}
pub async fn get_quota_config(&self, bucket: &str) -> Result<(BucketQuota, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_quota_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.quota_config {
Ok((config, bm.quota_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound))
}
}
pub async fn get_replication_config(&self, bucket: &str) -> Result<(replication::Config, OffsetDateTime)> {
let (bm, reload) = match self.get_config(bucket).await {
Ok(res) => res,
Err(err) => {
warn!("get_replication_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.replication_config {
if reload {
// TODO: globalBucketTargetSys
}
Ok((config, bm.replication_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound))
}
}
pub async fn get_bucket_targets_config(&self, bucket: &str) -> Result<BucketTargets> {
let (bm, reload) = match self.get_config(bucket).await {
Ok(res) => res,
Err(err) => {
warn!("get_replication_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound));
} else {
return Err(err);
}
}
};
if let Some(config) = bm.bucket_target_config {
if reload {
// TODO: globalBucketTargetSys
}
Ok(config)
} else {
Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound))
}
}
}

View File

@@ -11,6 +11,7 @@ mod replication;
pub mod tags;
mod target;
pub mod utils;
mod versioning;
pub mod versioning;
pub mod versioning_sys;
pub use metadata_sys::{bucket_metadata_sys_set, get_bucket_metadata_sys, init_bucket_metadata_sys};

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
@@ -37,3 +39,18 @@ pub struct Config {
pub object_lock_enabled: String,
pub rule: Option<Rule>,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -38,3 +40,18 @@ pub struct BucketPolicy {
pub version: String,
pub statements: Vec<BPStatement>,
}
impl BucketPolicy {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketPolicy = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,3 +1,5 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义QuotaType枚举类型
@@ -19,3 +21,18 @@ pub struct BucketQuota {
quota_type: Option<QuotaType>,
}
impl BucketQuota {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketQuota = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -3,11 +3,28 @@ mod filter;
mod rule;
mod tag;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use rule::Rule;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
rules: Vec<Rule>,
role_arn: String,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,8 +1,10 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Credentials {
access_key: String,
secret_key: String,
@@ -10,13 +12,13 @@ pub struct Credentials {
expiration: Option<OffsetDateTime>,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub enum ServiceType {
#[default]
Replication,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct LatencyStat {
curr: Duration, // 当前延迟
avg: Duration, // 平均延迟
@@ -24,7 +26,7 @@ pub struct LatencyStat {
}
// 定义BucketTarget结构体
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketTarget {
source_bucket: String,
@@ -73,7 +75,22 @@ pub struct BucketTarget {
edge: bool,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketTargets {
pub targets: Vec<BucketTarget>,
}
impl BucketTargets {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketTargets = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,12 +1,23 @@
use crate::{
error::{Error, Result},
utils,
};
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum VersioningErr {
#[error("too many excluded prefixes")]
TooManyExcludedPrefixes,
#[error("excluded prefixes extension supported only when versioning is enabled")]
ExcludedPrefixNotSupported,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Deserialize, Serialize)]
pub enum State {
#[default]
Enabled,
Suspended,
// 如果未来可能会使用到Disabled状态可以在这里添加
// Disabled,
Enabled,
}
// 实现Display trait用于打印
@@ -18,21 +29,112 @@ impl std::fmt::Display for State {
match *self {
State::Enabled => "Enabled",
State::Suspended => "Suspended",
// 如果未来可能会使用到Disabled状态可以在这里添加
// State::Disabled => "Disabled",
}
)
}
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ExcludedPrefix {
pub prefix: String,
}
#[derive(Debug, Deserialize, Serialize, Default,Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Versioning {
pub status: State,
pub excluded_prefixes: Vec<ExcludedPrefix>,
pub exclude_folders: bool,
}
impl Versioning {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Versioning = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn validate(&self) -> Result<()> {
match self.status {
State::Suspended => {
if self.excluded_prefixes.len() > 0 {
return Err(Error::new(VersioningErr::ExcludedPrefixNotSupported));
}
}
State::Enabled => {
if self.excluded_prefixes.len() > 10 {
return Err(Error::new(VersioningErr::TooManyExcludedPrefixes));
}
}
}
Ok(())
}
pub fn enabled(&self) -> bool {
self.status == State::Enabled
}
pub fn versioned(&self, prefix: &str) -> bool {
self.prefix_enabled(prefix) || self.prefix_suspended(prefix)
}
pub fn prefix_enabled(&self, prefix: &str) -> bool {
if self.status != State::Enabled {
return false;
}
if prefix.is_empty() {
return true;
}
if self.exclude_folders && prefix.ends_with("/") {
return false;
}
for sprefix in self.excluded_prefixes.iter() {
let full_prefix = format!("{}*", sprefix.prefix);
if utils::wildcard::match_simple(&full_prefix, prefix) {
return false;
}
}
return true;
}
pub fn suspended(&self) -> bool {
self.status == State::Suspended
}
pub fn prefix_suspended(&self, prefix: &str) -> bool {
if self.status == State::Suspended {
return true;
}
if self.status == State::Enabled {
if prefix.is_empty() {
return false;
}
if self.exclude_folders && prefix.starts_with("/") {
return true;
}
for sprefix in self.excluded_prefixes.iter() {
let full_prefix = format!("{}*", sprefix.prefix);
if utils::wildcard::match_simple(&full_prefix, prefix) {
return true;
}
}
}
return false;
}
pub fn prefixes_excluded(&self) -> bool {
self.excluded_prefixes.len() > 0 || self.exclude_folders
}
}

View File

@@ -0,0 +1,62 @@
use super::get_bucket_metadata_sys;
use super::versioning::Versioning;
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::Result;
use tracing::warn;
pub struct BucketVersioningSys {}
impl BucketVersioningSys {
pub async fn enabled(bucket: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.enabled(),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
pub async fn prefix_enabled(bucket: &str, prefix: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.prefix_enabled(prefix),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
pub async fn suspended(bucket: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.suspended(),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
pub async fn prefix_suspended(bucket: &str, prefix: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.prefix_suspended(prefix),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
pub async fn get(bucket: &str) -> Result<Versioning> {
if bucket == RUSTFS_META_BUCKET || bucket.starts_with(RUSTFS_META_BUCKET) {
return Ok(Versioning::default());
}
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.write().await;
let (cfg, _) = bucket_meta_sys.get_versioning_config(bucket).await?;
Ok(cfg)
}
}

View File

@@ -4,3 +4,4 @@ pub mod fs;
pub mod hash;
pub mod net;
pub mod path;
pub mod wildcard;

View File

@@ -0,0 +1,70 @@
pub fn match_simple(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
}
if pattern == "*" {
return true;
}
// Do an extended wildcard '*' and '?' match.
deep_match_rune(name.as_bytes(), pattern.as_bytes(), true)
}
pub fn match_pattern(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
}
if pattern == "*" {
return true;
}
// Do an extended wildcard '*' and '?' match.
deep_match_rune(name.as_bytes(), pattern.as_bytes(), false)
}
fn deep_match_rune(str_: &[u8], pattern: &[u8], simple: bool) -> bool {
let (mut str_, mut pattern) = (str_, pattern);
while !pattern.is_empty() {
match pattern[0] as char {
'*' => {
return if pattern.len() == 1 {
true
} else if deep_match_rune(&str_[..], &pattern[1..], simple)
|| (!str_.is_empty() && deep_match_rune(&str_[1..], pattern, simple))
{
true
} else {
false
};
}
'?' => {
if str_.is_empty() {
return simple;
}
}
_ => {
if str_.is_empty() || str_[0] != pattern[0] {
return false;
}
}
}
str_ = &str_[1..];
pattern = &pattern[1..];
}
str_.is_empty() && pattern.is_empty()
}
pub fn match_as_pattern_prefix(pattern: &str, text: &str) -> bool {
let mut i = 0;
while i < text.len() && i < pattern.len() {
match pattern.as_bytes()[i] as char {
'*' => return true,
'?' => i += 1,
_ => {
if pattern.as_bytes()[i] != text.as_bytes()[i] {
return false;
}
}
}
i += 1;
}
text.len() <= pattern.len()
}

View File

@@ -1,7 +1,11 @@
use bytes::Bytes;
use ecstore::bucket::get_bucket_metadata_sys;
use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use ecstore::bucket::metadata::BUCKET_VERSIONING_CONFIG;
use ecstore::bucket::tags::Tags;
use ecstore::bucket::versioning::State as VersioningState;
use ecstore::bucket::versioning::Versioning;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::disk::error::DiskError;
use ecstore::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
@@ -17,6 +21,7 @@ use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::status;
use http::HeaderMap;
use log::warn;
use s3s::dto::*;
@@ -850,6 +855,84 @@ impl S3 for FS {
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn get_bucket_versioning(
&self,
req: S3Request<GetBucketVersioningInput>,
) -> S3Result<S3Response<GetBucketVersioningOutput>> {
let GetBucketVersioningInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))),
};
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let cfg = try_!(BucketVersioningSys::get(&bucket).await);
let status = match cfg.status {
VersioningState::Enabled => Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
VersioningState::Suspended => Some(BucketVersioningStatus::from_static(BucketVersioningStatus::SUSPENDED)),
};
Ok(S3Response::new(GetBucketVersioningOutput {
mfa_delete: None,
status,
}))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_bucket_versioning(
&self,
req: S3Request<PutBucketVersioningInput>,
) -> S3Result<S3Response<PutBucketVersioningOutput>> {
let PutBucketVersioningInput {
bucket,
versioning_configuration,
..
} = req.input;
// TODO: check other sys
// check site replication enable
// check bucket object lock enable
// check replication suspended
let mut cfg = match BucketVersioningSys::get(&bucket).await {
Ok(res) => res,
Err(err) => {
warn!("BucketVersioningSys::get err {:?}", err);
Versioning::default()
}
};
if let Some(verstatus) = versioning_configuration.status {
cfg.status = match verstatus.as_str() {
BucketVersioningStatus::ENABLED => VersioningState::Enabled,
BucketVersioningStatus::SUSPENDED => VersioningState::Suspended,
_ => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init")),
}
}
let data = try_!(cfg.marshal_msg());
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
try_!(bucket_meta_sys.update(&bucket, BUCKET_VERSIONING_CONFIG, data).await);
// TODO: globalSiteReplicationSys.BucketMetaHook
Ok(S3Response::new(PutBucketVersioningOutput {}))
}
}
#[allow(dead_code)]

View File

@@ -26,4 +26,5 @@ fi
# --domain-name 127.0.0.1:9010 \
# "$DATA_DIR_ARG"
cargo run "$DATA_DIR_ARG"
# cargo run "$DATA_DIR_ARG"
cargo run ./target/volume/test