diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 74971f64..6d2b7fc1 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -10,7 +10,7 @@ env: jobs: build: - timeout-minutes: 10 + timeout-minutes: 30 runs-on: ubuntu-latest strategy: matrix: @@ -67,10 +67,14 @@ jobs: - name: checkout uses: actions/checkout@v4 - - name: run fs + - name: run fs start + working-directory: . + run: | + nohup make e2e-server & + + - name: run fs test working-directory: . run: | - make e2e-server > /dev/null & make probe-e2e - name: e2e test diff --git a/Cargo.lock b/Cargo.lock index 6f5db919..380a3a65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1502,6 +1502,8 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" dependencies = [ + "cc", + "libc", "libm", "lru", "parking_lot", @@ -1838,6 +1840,16 @@ dependencies = [ "cfg-if", "cpufeatures", "digest", + "sha2-asm", +] + +[[package]] +name = "sha2-asm" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b845214d6175804686b2bd482bcffe96651bb2d1200742b712003504a2dac1ab" +dependencies = [ + "cc", ] [[package]] diff --git a/common/protos/src/generated/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs index 054d023d..1a4f32e6 100644 --- a/common/protos/src/generated/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -580,7 +580,13 @@ pub struct GenerallyLockResponse { } /// Generated client implementations. pub mod node_service_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; use tonic::codegen::http::Uri; #[derive(Debug, Clone)] @@ -602,8 +608,8 @@ pub mod node_service_client { where T: tonic::client::GrpcService, T::Error: Into, - T::ResponseBody: Body + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -628,7 +634,7 @@ pub mod node_service_client { >, , - >>::Error: Into + Send + Sync, + >>::Error: Into + std::marker::Send + std::marker::Sync, { NodeServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -672,8 +678,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -697,8 +702,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -722,8 +726,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -747,8 +750,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -772,8 +774,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -797,8 +798,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -822,8 +822,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -844,8 +843,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -869,8 +867,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -894,8 +891,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -916,8 +912,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -941,8 +936,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -967,8 +961,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -992,8 +985,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1017,8 +1009,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1042,8 +1033,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1067,8 +1057,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1092,8 +1081,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1117,8 +1105,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1142,8 +1129,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1167,8 +1153,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1192,8 +1177,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1217,8 +1201,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1242,8 +1225,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1264,8 +1246,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1289,8 +1270,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1314,8 +1294,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1339,8 +1318,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1364,8 +1342,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1389,8 +1366,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1414,8 +1390,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1439,8 +1414,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1464,8 +1438,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1489,8 +1462,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1514,8 +1486,7 @@ pub mod node_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -1532,11 +1503,17 @@ pub mod node_service_client { } /// Generated server implementations. pub mod node_service_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with NodeServiceServer. #[async_trait] - pub trait NodeService: Send + Sync + 'static { + pub trait NodeService: std::marker::Send + std::marker::Sync + 'static { /// -------------------------------meta service-------------------------- async fn ping( &self, @@ -1607,7 +1584,7 @@ pub mod node_service_server { type WriteStreamStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + Send + + std::marker::Send + 'static; async fn write_stream( &self, @@ -1620,7 +1597,7 @@ pub mod node_service_server { type ReadAtStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + Send + + std::marker::Send + 'static; /// rpc Append(AppendRequest) returns (AppendResponse) {}; async fn read_at( @@ -1774,14 +1751,14 @@ pub mod node_service_server { >; } #[derive(Debug)] - pub struct NodeServiceServer { + pub struct NodeServiceServer { inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - impl NodeServiceServer { + impl NodeServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -1835,8 +1812,8 @@ pub mod node_service_server { impl tonic::codegen::Service> for NodeServiceServer where T: NodeService, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -3428,23 +3405,25 @@ pub mod node_service_server { } _ => { Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", tonic::Code::Unimplemented as i32) - .header( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ) - .body(empty_body()) - .unwrap(), - ) + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) }) } } } } - impl Clone for NodeServiceServer { + impl Clone for NodeServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -3456,7 +3435,9 @@ pub mod node_service_server { } } } - impl tonic::server::NamedService for NodeServiceServer { - const NAME: &'static str = "node_service.NodeService"; + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "node_service.NodeService"; + impl tonic::server::NamedService for NodeServiceServer { + const NAME: &'static str = SERVICE_NAME; } } diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index d3123277..7228d49d 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -23,7 +23,7 @@ tracing-error.workspace = true http.workspace = true url.workspace = true uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] } -reed-solomon-erasure = "6.0.0" +reed-solomon-erasure = { version = "6.0.0", features = [ "simd-accel" ] } transform-stream = "0.3.0" lazy_static.workspace = true lock.workspace = true @@ -37,7 +37,7 @@ s3s = "0.10.0" crc32fast = "1.4.2" siphasher = "1.0.1" base64-simd = "0.8.0" -sha2 = "0.10.8" +sha2 = { version = "0.10.8", features = ["asm"] } hex-simd = "0.8.0" path-clean = "1.0.1" tokio = { workspace = true, features = ["io-util", "sync"] } diff --git a/ecstore/src/bucket/encryption/mod.rs b/ecstore/src/bucket/encryption/mod.rs index 20e7d315..1aa3c32f 100644 --- a/ecstore/src/bucket/encryption/mod.rs +++ b/ecstore/src/bucket/encryption/mod.rs @@ -1,3 +1,5 @@ +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; // 定义Algorithm枚举类型 @@ -21,22 +23,37 @@ impl std::str::FromStr for Algorithm { } // 定义EncryptionAction结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct EncryptionAction { algorithm: Option, master_key_id: Option, } // 定义Rule结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Rule { default_encryption_action: EncryptionAction, } // 定义BucketSSEConfig结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketSSEConfig { xml_ns: String, xml_name: String, rules: Vec, } + +impl BucketSSEConfig { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: BucketSSEConfig = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/error.rs b/ecstore/src/bucket/error.rs index 781f7f11..a18a295a 100644 --- a/ecstore/src/bucket/error.rs +++ b/ecstore/src/bucket/error.rs @@ -2,4 +2,18 @@ pub enum BucketMetadataError { #[error("tagging not found")] TaggingNotFound, + #[error("bucket policy not found")] + BucketPolicyNotFound, + #[error("bucket object lock not found")] + BucketObjectLockConfigNotFound, + #[error("bucket lifecycle not found")] + BucketLifecycleNotFound, + #[error("bucket SSE config not found")] + BucketSSEConfigNotFound, + #[error("bucket quota config not found")] + BucketQuotaConfigNotFound, + #[error("bucket replication config not found")] + BucketReplicationConfigNotFound, + #[error("bucket remote target not found")] + BucketRemoteTargetNotFound, } diff --git a/ecstore/src/bucket/event/mod.rs b/ecstore/src/bucket/event/mod.rs index d1cb49fb..8f8a96bb 100644 --- a/ecstore/src/bucket/event/mod.rs +++ b/ecstore/src/bucket/event/mod.rs @@ -1,11 +1,12 @@ mod name; +use crate::error::Result; +use name::Name; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; -use name::Name; - // 定义common结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] struct Common { pub id: String, pub filter: S3Key, @@ -13,59 +14,74 @@ struct Common { } // 定义Queue结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] struct Queue { pub common: Common, pub arn: ARN, } // 定义ARN结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct ARN { pub target_id: TargetID, pub region: String, } // 定义TargetID结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct TargetID { pub id: String, pub name: String, } // 定义FilterRule结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct FilterRule { pub name: String, pub value: String, } // 定义FilterRuleList结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct FilterRuleList { pub rules: Vec, } // 定义S3Key结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct S3Key { pub rule_list: FilterRuleList, } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Lambda { arn: String, } // 定义Topic结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Topic { arn: String, } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Config { queue_list: Vec, lambda_list: Vec, topic_list: Vec, } + +impl Config { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Config = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/lifecycle/lifecycle.rs b/ecstore/src/bucket/lifecycle/lifecycle.rs index e4f2fa6f..109cad29 100644 --- a/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -1,9 +1,26 @@ use super::rule::Rule; +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Lifecycle { pub rules: Vec, pub expiry_updated_at: Option, } + +impl Lifecycle { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Lifecycle = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index 0097f23e..6b832dd6 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -188,9 +188,44 @@ impl BucketMetadata { } fn default_timestamps(&mut self) { + if self.policy_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.policy_config_updated_at = self.created + } + if self.encryption_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.encryption_config_updated_at = self.created + } + if self.tagging_config_updated_at == OffsetDateTime::UNIX_EPOCH { self.tagging_config_updated_at = self.created } + if self.object_lock_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.object_lock_config_updated_at = self.created + } + if self.quota_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.quota_config_updated_at = self.created + } + + if self.replication_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.replication_config_updated_at = self.created + } + + if self.versioning_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.versioning_config_updated_at = self.created + } + + if self.lifecycle_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.lifecycle_config_updated_at = self.created + } + if self.notification_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.notification_config_updated_at = self.created + } + + if self.bucket_targets_config_updated_at == OffsetDateTime::UNIX_EPOCH { + self.bucket_targets_config_updated_at = self.created + } + if self.bucket_targets_config_meta_updated_at == OffsetDateTime::UNIX_EPOCH { + self.bucket_targets_config_meta_updated_at = self.created + } } pub fn update_config(&mut self, config_file: &str, data: Vec) -> Result { @@ -262,9 +297,39 @@ impl BucketMetadata { } fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> { + if !self.policy_config_json.is_empty() { + self.policy_config = Some(BucketPolicy::unmarshal(&self.policy_config_json)?); + } + if !self.notification_config_xml.is_empty() { + self.notification_config = Some(event::Config::unmarshal(&self.notification_config_xml)?); + } + if !self.lifecycle_config_xml.is_empty() { + self.lifecycle_config = Some(Lifecycle::unmarshal(&self.lifecycle_config_xml)?); + } + + if !self.object_lock_config_xml.is_empty() { + self.object_lock_config = Some(objectlock::Config::unmarshal(&self.object_lock_config_xml)?); + } + if !self.versioning_config_xml.is_empty() { + self.versioning_config = Some(Versioning::unmarshal(&self.versioning_config_xml)?); + } + if !self.encryption_config_xml.is_empty() { + self.sse_config = Some(BucketSSEConfig::unmarshal(&self.encryption_config_xml)?); + } if !self.tagging_config_xml.is_empty() { self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?); } + if !self.quota_config_json.is_empty() { + self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?); + } + if !self.replication_config_xml.is_empty() { + self.replication_config = Some(replication::Config::unmarshal(&self.replication_config_xml)?); + } + if !self.bucket_targets_config_json.is_empty() { + self.bucket_target_config = Some(BucketTargets::unmarshal(&self.bucket_targets_config_json)?); + } else { + self.bucket_target_config = Some(BucketTargets::default()) + } Ok(()) } diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index af0316c7..90bc6cdc 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -16,8 +16,13 @@ use time::OffsetDateTime; use tokio::sync::RwLock; use tracing::{error, warn}; +use super::encryption::BucketSSEConfig; +use super::lifecycle::lifecycle::Lifecycle; use super::metadata::{load_bucket_metadata, BucketMetadata}; -use super::tags; +use super::policy::bucket_policy::BucketPolicy; +use super::quota::BucketQuota; +use super::target::BucketTargets; +use super::{event, objectlock, replication, tags, versioning}; lazy_static! { static ref GLOBAL_BucketMetadataSys: Arc> = Arc::new(RwLock::new(BucketMetadataSys::new())); @@ -232,6 +237,42 @@ impl BucketMetadataSys { } } + pub async fn get_versioning_config(&self, bucket: &str) -> Result<(versioning::Versioning, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_versioning_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Ok((versioning::Versioning::default(), OffsetDateTime::UNIX_EPOCH)); + } else { + return Err(err); + } + } + }; + + Ok((bm.versioning_config.unwrap_or_default(), bm.versioning_config_updated_at)) + } + + pub async fn get_bucket_policy(&self, bucket: &str) -> Result<(BucketPolicy, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_bucket_policy err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketPolicyNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.policy_config { + Ok((config, bm.policy_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::BucketPolicyNotFound)) + } + } + pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> { let bm = match self.get_config(bucket).await { Ok((res, _)) => res, @@ -251,4 +292,163 @@ impl BucketMetadataSys { Err(Error::new(BucketMetadataError::TaggingNotFound)) } } + + pub async fn get_object_lock_config(&self, bucket: &str) -> Result<(objectlock::Config, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_object_lock_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.object_lock_config { + Ok((config, bm.object_lock_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound)) + } + } + + pub async fn get_lifecycle_config(&self, bucket: &str) -> Result<(Lifecycle, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_lifecycle_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketLifecycleNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.lifecycle_config { + if config.rules.is_empty() { + Err(Error::new(BucketMetadataError::BucketLifecycleNotFound)) + } else { + Ok((config, bm.lifecycle_config_updated_at)) + } + } else { + Err(Error::new(BucketMetadataError::BucketLifecycleNotFound)) + } + } + + pub async fn get_notification_config(&self, bucket: &str) -> Result> { + let bm = match self.get_config(bucket).await { + Ok((bm, _)) => bm.notification_config, + Err(err) => { + warn!("get_notification_config err {:?}", &err); + if config::error::is_not_found(&err) { + None + } else { + return Err(err); + } + } + }; + + Ok(bm) + } + + pub async fn get_sse_config(&self, bucket: &str) -> Result<(BucketSSEConfig, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_sse_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.sse_config { + Ok((config, bm.encryption_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound)) + } + } + + pub async fn created_at(&self, bucket: &str) -> Result { + let bm = match self.get_config(bucket).await { + Ok((bm, _)) => bm.created, + Err(err) => { + return Err(err); + } + }; + + Ok(bm) + } + + pub async fn get_quota_config(&self, bucket: &str) -> Result<(BucketQuota, OffsetDateTime)> { + let bm = match self.get_config(bucket).await { + Ok((res, _)) => res, + Err(err) => { + warn!("get_quota_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.quota_config { + Ok((config, bm.quota_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound)) + } + } + + pub async fn get_replication_config(&self, bucket: &str) -> Result<(replication::Config, OffsetDateTime)> { + let (bm, reload) = match self.get_config(bucket).await { + Ok(res) => res, + Err(err) => { + warn!("get_replication_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.replication_config { + if reload { + // TODO: globalBucketTargetSys + } + + Ok((config, bm.replication_config_updated_at)) + } else { + Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound)) + } + } + + pub async fn get_bucket_targets_config(&self, bucket: &str) -> Result { + let (bm, reload) = match self.get_config(bucket).await { + Ok(res) => res, + Err(err) => { + warn!("get_replication_config err {:?}", &err); + if config::error::is_not_found(&err) { + return Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound)); + } else { + return Err(err); + } + } + }; + + if let Some(config) = bm.bucket_target_config { + if reload { + // TODO: globalBucketTargetSys + } + + Ok(config) + } else { + Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound)) + } + } } diff --git a/ecstore/src/bucket/mod.rs b/ecstore/src/bucket/mod.rs index a905fba1..59e35a47 100644 --- a/ecstore/src/bucket/mod.rs +++ b/ecstore/src/bucket/mod.rs @@ -11,6 +11,7 @@ mod replication; pub mod tags; mod target; pub mod utils; -mod versioning; +pub mod versioning; +pub mod versioning_sys; pub use metadata_sys::{bucket_metadata_sys_set, get_bucket_metadata_sys, init_bucket_metadata_sys}; diff --git a/ecstore/src/bucket/objectlock/mod.rs b/ecstore/src/bucket/objectlock/mod.rs index 74fa0f3b..dcd27332 100644 --- a/ecstore/src/bucket/objectlock/mod.rs +++ b/ecstore/src/bucket/objectlock/mod.rs @@ -1,3 +1,5 @@ +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)] @@ -37,3 +39,18 @@ pub struct Config { pub object_lock_enabled: String, pub rule: Option, } + +impl Config { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Config = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/policy/bucket_policy.rs b/ecstore/src/bucket/policy/bucket_policy.rs index b769ec66..512bf8a8 100644 --- a/ecstore/src/bucket/policy/bucket_policy.rs +++ b/ecstore/src/bucket/policy/bucket_policy.rs @@ -1,3 +1,5 @@ +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -38,3 +40,18 @@ pub struct BucketPolicy { pub version: String, pub statements: Vec, } + +impl BucketPolicy { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: BucketPolicy = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/quota/mod.rs b/ecstore/src/bucket/quota/mod.rs index 12c50dc9..a7753d85 100644 --- a/ecstore/src/bucket/quota/mod.rs +++ b/ecstore/src/bucket/quota/mod.rs @@ -1,3 +1,5 @@ +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; // 定义QuotaType枚举类型 @@ -19,3 +21,18 @@ pub struct BucketQuota { quota_type: Option, } + +impl BucketQuota { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: BucketQuota = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/replication/mod.rs b/ecstore/src/bucket/replication/mod.rs index a50bf7c6..a54e8c5d 100644 --- a/ecstore/src/bucket/replication/mod.rs +++ b/ecstore/src/bucket/replication/mod.rs @@ -3,11 +3,28 @@ mod filter; mod rule; mod tag; +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use rule::Rule; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Config { rules: Vec, role_arn: String, } + +impl Config { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Config = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/target/mod.rs b/ecstore/src/bucket/target/mod.rs index e296423b..f830c522 100644 --- a/ecstore/src/bucket/target/mod.rs +++ b/ecstore/src/bucket/target/mod.rs @@ -1,8 +1,10 @@ +use crate::error::Result; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; use std::time::Duration; use time::OffsetDateTime; -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Credentials { access_key: String, secret_key: String, @@ -10,13 +12,13 @@ pub struct Credentials { expiration: Option, } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub enum ServiceType { #[default] Replication, } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct LatencyStat { curr: Duration, // 当前延迟 avg: Duration, // 平均延迟 @@ -24,7 +26,7 @@ pub struct LatencyStat { } // 定义BucketTarget结构体 -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketTarget { source_bucket: String, @@ -73,7 +75,22 @@ pub struct BucketTarget { edge: bool, } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct BucketTargets { pub targets: Vec, } + +impl BucketTargets { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: BucketTargets = rmp_serde::from_slice(buf)?; + Ok(t) + } +} diff --git a/ecstore/src/bucket/versioning/mod.rs b/ecstore/src/bucket/versioning/mod.rs index 37fdcdfa..04019b21 100644 --- a/ecstore/src/bucket/versioning/mod.rs +++ b/ecstore/src/bucket/versioning/mod.rs @@ -1,12 +1,23 @@ +use crate::{ + error::{Error, Result}, + utils, +}; +use rmp_serde::Serializer as rmpSerializer; use serde::{Deserialize, Serialize}; +#[derive(Debug, thiserror::Error)] +pub enum VersioningErr { + #[error("too many excluded prefixes")] + TooManyExcludedPrefixes, + #[error("excluded prefixes extension supported only when versioning is enabled")] + ExcludedPrefixNotSupported, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Deserialize, Serialize)] pub enum State { #[default] - Enabled, Suspended, - // 如果未来可能会使用到Disabled状态,可以在这里添加 - // Disabled, + Enabled, } // 实现Display trait用于打印 @@ -18,21 +29,112 @@ impl std::fmt::Display for State { match *self { State::Enabled => "Enabled", State::Suspended => "Suspended", - // 如果未来可能会使用到Disabled状态,可以在这里添加 - // State::Disabled => "Disabled", } ) } } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct ExcludedPrefix { pub prefix: String, } -#[derive(Debug, Deserialize, Serialize, Default,Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Versioning { pub status: State, pub excluded_prefixes: Vec, pub exclude_folders: bool, } + +impl Versioning { + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Versioning = rmp_serde::from_slice(buf)?; + Ok(t) + } + + pub fn validate(&self) -> Result<()> { + match self.status { + State::Suspended => { + if self.excluded_prefixes.len() > 0 { + return Err(Error::new(VersioningErr::ExcludedPrefixNotSupported)); + } + } + State::Enabled => { + if self.excluded_prefixes.len() > 10 { + return Err(Error::new(VersioningErr::TooManyExcludedPrefixes)); + } + } + } + + Ok(()) + } + + pub fn enabled(&self) -> bool { + self.status == State::Enabled + } + + pub fn versioned(&self, prefix: &str) -> bool { + self.prefix_enabled(prefix) || self.prefix_suspended(prefix) + } + + pub fn prefix_enabled(&self, prefix: &str) -> bool { + if self.status != State::Enabled { + return false; + } + + if prefix.is_empty() { + return true; + } + if self.exclude_folders && prefix.ends_with("/") { + return false; + } + + for sprefix in self.excluded_prefixes.iter() { + let full_prefix = format!("{}*", sprefix.prefix); + if utils::wildcard::match_simple(&full_prefix, prefix) { + return false; + } + } + return true; + } + + pub fn suspended(&self) -> bool { + self.status == State::Suspended + } + + pub fn prefix_suspended(&self, prefix: &str) -> bool { + if self.status == State::Suspended { + return true; + } + + if self.status == State::Enabled { + if prefix.is_empty() { + return false; + } + + if self.exclude_folders && prefix.starts_with("/") { + return true; + } + + for sprefix in self.excluded_prefixes.iter() { + let full_prefix = format!("{}*", sprefix.prefix); + if utils::wildcard::match_simple(&full_prefix, prefix) { + return true; + } + } + } + return false; + } + + pub fn prefixes_excluded(&self) -> bool { + self.excluded_prefixes.len() > 0 || self.exclude_folders + } +} diff --git a/ecstore/src/bucket/versioning_sys.rs b/ecstore/src/bucket/versioning_sys.rs new file mode 100644 index 00000000..52a78fc0 --- /dev/null +++ b/ecstore/src/bucket/versioning_sys.rs @@ -0,0 +1,62 @@ +use super::get_bucket_metadata_sys; +use super::versioning::Versioning; +use crate::disk::RUSTFS_META_BUCKET; +use crate::error::Result; +use tracing::warn; + +pub struct BucketVersioningSys {} + +impl BucketVersioningSys { + pub async fn enabled(bucket: &str) -> bool { + match Self::get(bucket).await { + Ok(res) => res.enabled(), + Err(err) => { + warn!("{:?}", err); + false + } + } + } + + pub async fn prefix_enabled(bucket: &str, prefix: &str) -> bool { + match Self::get(bucket).await { + Ok(res) => res.prefix_enabled(prefix), + Err(err) => { + warn!("{:?}", err); + false + } + } + } + + pub async fn suspended(bucket: &str) -> bool { + match Self::get(bucket).await { + Ok(res) => res.suspended(), + Err(err) => { + warn!("{:?}", err); + false + } + } + } + + pub async fn prefix_suspended(bucket: &str, prefix: &str) -> bool { + match Self::get(bucket).await { + Ok(res) => res.prefix_suspended(prefix), + Err(err) => { + warn!("{:?}", err); + false + } + } + } + + pub async fn get(bucket: &str) -> Result { + if bucket == RUSTFS_META_BUCKET || bucket.starts_with(RUSTFS_META_BUCKET) { + return Ok(Versioning::default()); + } + + let bucket_meta_sys_lock = get_bucket_metadata_sys().await; + let bucket_meta_sys = bucket_meta_sys_lock.write().await; + + let (cfg, _) = bucket_meta_sys.get_versioning_config(bucket).await?; + + Ok(cfg) + } +} diff --git a/ecstore/src/utils/mod.rs b/ecstore/src/utils/mod.rs index 90e120d0..b5ca93bd 100644 --- a/ecstore/src/utils/mod.rs +++ b/ecstore/src/utils/mod.rs @@ -4,3 +4,4 @@ pub mod fs; pub mod hash; pub mod net; pub mod path; +pub mod wildcard; diff --git a/ecstore/src/utils/wildcard.rs b/ecstore/src/utils/wildcard.rs new file mode 100644 index 00000000..8652a88b --- /dev/null +++ b/ecstore/src/utils/wildcard.rs @@ -0,0 +1,70 @@ +pub fn match_simple(pattern: &str, name: &str) -> bool { + if pattern.is_empty() { + return name == pattern; + } + if pattern == "*" { + return true; + } + // Do an extended wildcard '*' and '?' match. + deep_match_rune(name.as_bytes(), pattern.as_bytes(), true) +} + +pub fn match_pattern(pattern: &str, name: &str) -> bool { + if pattern.is_empty() { + return name == pattern; + } + if pattern == "*" { + return true; + } + // Do an extended wildcard '*' and '?' match. + deep_match_rune(name.as_bytes(), pattern.as_bytes(), false) +} + +fn deep_match_rune(str_: &[u8], pattern: &[u8], simple: bool) -> bool { + let (mut str_, mut pattern) = (str_, pattern); + while !pattern.is_empty() { + match pattern[0] as char { + '*' => { + return if pattern.len() == 1 { + true + } else if deep_match_rune(&str_[..], &pattern[1..], simple) + || (!str_.is_empty() && deep_match_rune(&str_[1..], pattern, simple)) + { + true + } else { + false + }; + } + '?' => { + if str_.is_empty() { + return simple; + } + } + _ => { + if str_.is_empty() || str_[0] != pattern[0] { + return false; + } + } + } + str_ = &str_[1..]; + pattern = &pattern[1..]; + } + str_.is_empty() && pattern.is_empty() +} + +pub fn match_as_pattern_prefix(pattern: &str, text: &str) -> bool { + let mut i = 0; + while i < text.len() && i < pattern.len() { + match pattern.as_bytes()[i] as char { + '*' => return true, + '?' => i += 1, + _ => { + if pattern.as_bytes()[i] != text.as_bytes()[i] { + return false; + } + } + } + i += 1; + } + text.len() <= pattern.len() +} diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index eaa66fc2..7ea11fbb 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,7 +1,11 @@ use bytes::Bytes; use ecstore::bucket::get_bucket_metadata_sys; use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG; +use ecstore::bucket::metadata::BUCKET_VERSIONING_CONFIG; use ecstore::bucket::tags::Tags; +use ecstore::bucket::versioning::State as VersioningState; +use ecstore::bucket::versioning::Versioning; +use ecstore::bucket::versioning_sys::BucketVersioningSys; use ecstore::disk::error::DiskError; use ecstore::new_object_layer_fn; use ecstore::store_api::BucketOptions; @@ -17,6 +21,7 @@ use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; use futures::pin_mut; use futures::{Stream, StreamExt}; +use http::status; use http::HeaderMap; use log::warn; use s3s::dto::*; @@ -850,6 +855,84 @@ impl S3 for FS { Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None })) } + + #[tracing::instrument(level = "debug", skip(self))] + async fn get_bucket_versioning( + &self, + req: S3Request, + ) -> S3Result> { + let GetBucketVersioningInput { bucket, .. } = req.input; + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("Not init",))), + }; + + if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await { + if DiskError::VolumeNotFound.is(&e) { + return Err(s3_error!(NoSuchBucket)); + } else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e))); + } + } + + let cfg = try_!(BucketVersioningSys::get(&bucket).await); + + let status = match cfg.status { + VersioningState::Enabled => Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)), + VersioningState::Suspended => Some(BucketVersioningStatus::from_static(BucketVersioningStatus::SUSPENDED)), + }; + + Ok(S3Response::new(GetBucketVersioningOutput { + mfa_delete: None, + status, + })) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn put_bucket_versioning( + &self, + req: S3Request, + ) -> S3Result> { + let PutBucketVersioningInput { + bucket, + versioning_configuration, + .. + } = req.input; + + // TODO: check other sys + // check site replication enable + // check bucket object lock enable + // check replication suspended + + let mut cfg = match BucketVersioningSys::get(&bucket).await { + Ok(res) => res, + Err(err) => { + warn!("BucketVersioningSys::get err {:?}", err); + Versioning::default() + } + }; + + if let Some(verstatus) = versioning_configuration.status { + cfg.status = match verstatus.as_str() { + BucketVersioningStatus::ENABLED => VersioningState::Enabled, + BucketVersioningStatus::SUSPENDED => VersioningState::Suspended, + _ => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init")), + } + } + + let data = try_!(cfg.marshal_msg()); + + let bucket_meta_sys_lock = get_bucket_metadata_sys().await; + let mut bucket_meta_sys = bucket_meta_sys_lock.write().await; + + try_!(bucket_meta_sys.update(&bucket, BUCKET_VERSIONING_CONFIG, data).await); + + // TODO: globalSiteReplicationSys.BucketMetaHook + + Ok(S3Response::new(PutBucketVersioningOutput {})) + } } #[allow(dead_code)] diff --git a/scripts/run.sh b/scripts/run.sh index 1385973a..ba2d73bc 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -26,4 +26,5 @@ fi # --domain-name 127.0.0.1:9010 \ # "$DATA_DIR_ARG" -cargo run "$DATA_DIR_ARG" \ No newline at end of file +# cargo run "$DATA_DIR_ARG" +cargo run ./target/volume/test