feat: improve ImportBucketMetadata handle (#26)

* feat: improve ImportBucketMetadata handle
This commit is contained in:
weisd
2025-07-02 16:31:16 +08:00
committed by GitHub
parent 2e14b32ccd
commit bf690ad353
3 changed files with 119 additions and 19 deletions

View File

@@ -54,7 +54,7 @@ pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml";
pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml";
pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json";
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "PascalCase", default)]
pub struct BucketMetadata {
pub name: String,

View File

@@ -18,7 +18,7 @@ pub mod metadata;
pub mod metadata_sys;
pub mod object_lock;
pub mod policy_sys;
mod quota;
pub mod quota;
pub mod replication;
pub mod tagging;
pub mod target;

View File

@@ -15,7 +15,6 @@
use std::{
collections::HashMap,
io::{Cursor, Read as _, Write as _},
sync::Arc,
};
use crate::{
@@ -29,9 +28,11 @@ use ecstore::{
metadata::{
BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_QUOTA_CONFIG_FILE,
BUCKET_REPLICATION_CONFIG, BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, BUCKET_TARGETS_FILE, BUCKET_VERSIONING_CONFIG,
OBJECT_LOCK_CONFIG,
BucketMetadata, OBJECT_LOCK_CONFIG,
},
metadata_sys,
quota::BucketQuota,
target::BucketTargets,
},
error::StorageError,
new_object_layer_fn,
@@ -43,9 +44,14 @@ use ecstore::{
};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use policy::policy::BucketPolicy;
use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf};
use s3s::{
Body, S3Request, S3Response, S3Result,
dto::{
BucketLifecycleConfiguration, ObjectLockConfiguration, ReplicationConfiguration, ServerSideEncryptionConfiguration,
Tagging, VersioningConfiguration,
},
header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE},
s3_error,
};
@@ -161,7 +167,7 @@ impl Operation for ExportBucketMetadata {
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
}
BUCKET_LIFECYCLE_CONFIG => {
let config = match metadata_sys::get_lifecycle_config(&bucket.name).await {
let config: BucketLifecycleConfiguration = match metadata_sys::get_lifecycle_config(&bucket.name).await {
Ok((res, _)) => res,
Err(e) => {
if e == StorageError::ConfigNotFound {
@@ -181,7 +187,7 @@ impl Operation for ExportBucketMetadata {
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
}
BUCKET_TAGGING_CONFIG => {
let config = match metadata_sys::get_tagging_config(&bucket.name).await {
let config: Tagging = match metadata_sys::get_tagging_config(&bucket.name).await {
Ok((res, _)) => res,
Err(e) => {
if e == StorageError::ConfigNotFound {
@@ -201,7 +207,7 @@ impl Operation for ExportBucketMetadata {
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
}
BUCKET_QUOTA_CONFIG_FILE => {
let config = match metadata_sys::get_quota_config(&bucket.name).await {
let config: BucketQuota = match metadata_sys::get_quota_config(&bucket.name).await {
Ok((res, _)) => res,
Err(e) => {
if e == StorageError::ConfigNotFound {
@@ -301,7 +307,7 @@ impl Operation for ExportBucketMetadata {
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
}
BUCKET_TARGETS_FILE => {
let config = match metadata_sys::get_bucket_targets_config(&bucket.name).await {
let config: BucketTargets = match metadata_sys::get_bucket_targets_config(&bucket.name).await {
Ok(res) => res,
Err(e) => {
if e == StorageError::ConfigNotFound {
@@ -408,10 +414,12 @@ impl Operation for ImportBucketMetadata {
}
// Get existing bucket metadata
let mut bucket_metadatas = HashMap::new();
let mut bucket_metadatas: HashMap<String, BucketMetadata> = HashMap::new();
for bucket_name in bucket_names {
match metadata_sys::get_config_from_disk(&bucket_name).await {
Ok(res) => bucket_metadatas.insert(bucket_name, Arc::new(res)),
Ok(res) => {
bucket_metadatas.insert(bucket_name, res);
}
Err(e) => {
if e == StorageError::ConfigNotFound {
warn!("bucket metadata not found: {e}");
@@ -427,7 +435,7 @@ impl Operation for ImportBucketMetadata {
return Err(s3_error!(InvalidRequest, "object store not init"));
};
let _update_at = OffsetDateTime::now_utc();
let update_at = OffsetDateTime::now_utc();
// Second pass: process file contents
for (file_path, content) in file_contents {
@@ -458,12 +466,13 @@ impl Operation for ImportBucketMetadata {
}
let metadata = metadata_sys::get(bucket_name).await.unwrap_or_default();
bucket_metadatas.insert(bucket_name.to_string(), metadata.clone());
bucket_metadatas.insert(bucket_name.to_string(), (*metadata).clone());
}
match conf_name {
BUCKET_POLICY_CONFIG => {
let config: policy::policy::BucketPolicy = match serde_json::from_slice(&content) {
let config: BucketPolicy = match serde_json::from_slice(&content) {
Ok(config) => config,
Err(e) => {
warn!("deserialize config failed: {e}");
@@ -475,9 +484,9 @@ impl Operation for ImportBucketMetadata {
continue;
}
// let mut metadata = bucket_metadatas.get_mut(bucket_name).unwrap().clone();
// metadata.policy_config_json = content;
// metadata.policy_config_updated_at = update_at;
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.policy_config_json = content;
metadata.policy_config_updated_at = update_at;
}
BUCKET_NOTIFICATION_CONFIG => {
if let Err(e) = deserialize::<s3s::dto::NotificationConfiguration>(&content) {
@@ -485,14 +494,105 @@ impl Operation for ImportBucketMetadata {
continue;
}
// let mut metadata = bucket_metadatas.get_mut(bucket_name).unwrap().clone();
// metadata.notification_config_xml = content;
// metadata.notification_config_updated_at = update_at;
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.notification_config_xml = content;
metadata.notification_config_updated_at = update_at;
}
BUCKET_LIFECYCLE_CONFIG => {
if let Err(e) = deserialize::<BucketLifecycleConfiguration>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.lifecycle_config_xml = content;
metadata.lifecycle_config_updated_at = update_at;
}
BUCKET_SSECONFIG => {
if let Err(e) = deserialize::<ServerSideEncryptionConfiguration>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.encryption_config_xml = content;
metadata.encryption_config_updated_at = update_at;
}
BUCKET_TAGGING_CONFIG => {
if let Err(e) = deserialize::<Tagging>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.tagging_config_xml = content;
metadata.tagging_config_updated_at = update_at;
}
BUCKET_QUOTA_CONFIG_FILE => {
if let Err(e) = serde_json::from_slice::<BucketQuota>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.quota_config_json = content;
metadata.quota_config_updated_at = update_at;
}
OBJECT_LOCK_CONFIG => {
if let Err(e) = deserialize::<ObjectLockConfiguration>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.object_lock_config_xml = content;
metadata.object_lock_config_updated_at = update_at;
}
BUCKET_VERSIONING_CONFIG => {
if let Err(e) = deserialize::<VersioningConfiguration>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.versioning_config_xml = content;
metadata.versioning_config_updated_at = update_at;
}
BUCKET_REPLICATION_CONFIG => {
if let Err(e) = deserialize::<ReplicationConfiguration>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.replication_config_xml = content;
metadata.replication_config_updated_at = update_at;
}
BUCKET_TARGETS_FILE => {
if let Err(e) = serde_json::from_slice::<BucketTargets>(&content) {
warn!("deserialize config failed: {e}");
continue;
}
let metadata = bucket_metadatas.get_mut(bucket_name).unwrap();
metadata.bucket_targets_config_json = content;
metadata.bucket_targets_config_updated_at = update_at;
}
_ => {}
}
}
// TODO: site replication notify
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))