From cc440f23a36d79f3f661d69edb8794a8bbe2bbc8 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 1 Jul 2025 13:13:26 +0800 Subject: [PATCH 1/4] fix: clippy --- ecstore/src/disk/local.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 6aee14e6..2a04034c 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -135,7 +135,7 @@ impl LocalDisk { Ok(path) => path, Err(e) => { if e.kind() == ErrorKind::NotFound { - return Err(DiskError::VolumeNotFound.into()); + return Err(DiskError::VolumeNotFound); } return Err(to_file_error(e).into()); } @@ -162,7 +162,7 @@ impl LocalDisk { let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?; if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx { - return Err(Error::from(DiskError::InconsistentDisk)); + return Err(DiskError::InconsistentDisk); } id = Some(fm.erasure.this); From 9d74f56f5741ae13bc9d8c1f96a4bc0a0412c0ac Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 1 Jul 2025 15:23:53 +0800 Subject: [PATCH 2/4] feat: add ExportBucketMetadata handler --- ecstore/src/bucket/metadata_sys.rs | 26 +- ecstore/src/bucket/policy_sys.rs | 2 +- rustfs/src/admin/handlers.rs | 1 + rustfs/src/admin/handlers/bucket_meta.rs | 346 +++++++++++++++++++++++ rustfs/src/admin/mod.rs | 140 ++++----- 5 files changed, 446 insertions(+), 69 deletions(-) create mode 100644 rustfs/src/admin/handlers/bucket_meta.rs diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 5e3cf190..333f5900 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -19,7 +19,7 @@ use std::{collections::HashMap, sync::Arc}; use time::OffsetDateTime; use tokio::sync::RwLock; use tokio::time::sleep; -use tracing::{error, warn}; +use tracing::error; use super::metadata::{BucketMetadata, load_bucket_metadata}; use super::quota::BucketQuota; @@ -76,6 +76,27 @@ pub async fn delete(bucket: &str, config_file: &str) -> Result { bucket_meta_sys.delete(bucket, config_file).await } +pub async fn get_bucket_policy(bucket: &str) -> Result<(BucketPolicy, OffsetDateTime)> { + let bucket_meta_sys_lock = get_bucket_metadata_sys()?; + let bucket_meta_sys = bucket_meta_sys_lock.read().await; + + bucket_meta_sys.get_bucket_policy(bucket).await +} + +pub async fn get_quota_config(bucket: &str) -> Result<(BucketQuota, OffsetDateTime)> { + let bucket_meta_sys_lock = get_bucket_metadata_sys()?; + let bucket_meta_sys = bucket_meta_sys_lock.read().await; + + bucket_meta_sys.get_quota_config(bucket).await +} + +pub async fn get_bucket_targets_config(bucket: &str) -> Result { + let bucket_meta_sys_lock = get_bucket_metadata_sys()?; + let bucket_meta_sys = bucket_meta_sys_lock.read().await; + + bucket_meta_sys.get_bucket_targets_config(bucket).await +} + pub async fn get_tagging_config(bucket: &str) -> Result<(Tagging, OffsetDateTime)> { let bucket_meta_sys_lock = get_bucket_metadata_sys()?; let bucket_meta_sys = bucket_meta_sys_lock.read().await; @@ -340,7 +361,6 @@ impl BucketMetadataSys { } pub async fn get_config_from_disk(&self, bucket: &str) -> Result { - println!("load data from disk"); if is_meta_bucketname(bucket) { return Err(Error::other("errInvalidArgument")); } @@ -381,7 +401,6 @@ impl BucketMetadataSys { let bm = match self.get_config(bucket).await { Ok((res, _)) => res, Err(err) => { - warn!("get_versioning_config err {:?}", &err); return if err == Error::ConfigNotFound { Ok((VersioningConfiguration::default(), OffsetDateTime::UNIX_EPOCH)) } else { @@ -445,7 +464,6 @@ impl BucketMetadataSys { let bm = match self.get_config(bucket).await { Ok((bm, _)) => bm.notification_config.clone(), Err(err) => { - warn!("get_notification_config err {:?}", &err); if err == Error::ConfigNotFound { None } else { diff --git a/ecstore/src/bucket/policy_sys.rs b/ecstore/src/bucket/policy_sys.rs index 37c0c1a3..5f6e15ed 100644 --- a/ecstore/src/bucket/policy_sys.rs +++ b/ecstore/src/bucket/policy_sys.rs @@ -21,7 +21,7 @@ impl PolicySys { } pub async fn get(bucket: &str) -> Result { let bucket_meta_sys_lock = get_bucket_metadata_sys()?; - let bucket_meta_sys = bucket_meta_sys_lock.write().await; + let bucket_meta_sys = bucket_meta_sys_lock.read().await; let (cfg, _) = bucket_meta_sys.get_bucket_policy(bucket).await?; diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 5993438b..b0f9acca 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -56,6 +56,7 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; // use url::UrlQuery; +pub mod bucket_meta; pub mod event; pub mod group; pub mod policys; diff --git a/rustfs/src/admin/handlers/bucket_meta.rs b/rustfs/src/admin/handlers/bucket_meta.rs new file mode 100644 index 00000000..6fc0f4de --- /dev/null +++ b/rustfs/src/admin/handlers/bucket_meta.rs @@ -0,0 +1,346 @@ +use std::io::{Cursor, Write as _}; + +use crate::{ + admin::router::Operation, + auth::{check_key_valid, get_session_token}, +}; +use ecstore::bucket::utils::serialize; +use ecstore::{ + StorageAPI, + bucket::{ + metadata::{ + BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_QUOTA_CONFIG_FILE, + BUCKET_REPLICATION_CONFIG, BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, BUCKET_TARGETS_FILE, BUCKET_VERSIONING_CONFIG, + OBJECT_LOCK_CONFIG, + }, + metadata_sys, + }, + error::StorageError, + new_object_layer_fn, + store_api::BucketOptions, +}; +use http::{HeaderMap, StatusCode}; +use matchit::Params; +use rustfs_utils::path::path_join_buf; +use s3s::{ + Body, S3Request, S3Response, S3Result, + header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}, + s3_error, +}; +use serde::Deserialize; +use serde_urlencoded::from_bytes; +use zip::{ZipArchive, ZipWriter, result::ZipError, write::SimpleFileOptions}; + +#[derive(Debug, Default, serde::Deserialize)] +pub struct ExportBucketMetadataQuery { + pub bucket: String, +} + +pub struct ExportBucketMetadata {} + +#[async_trait::async_trait] +impl Operation for ExportBucketMetadata { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + let query = { + if let Some(query) = req.uri.query() { + let input: ExportBucketMetadataQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; + input + } else { + ExportBucketMetadataQuery::default() + } + }; + + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let (_cred, _owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + + let Some(store) = new_object_layer_fn() else { + return Err(s3_error!(InvalidRequest, "object store not init")); + }; + + let buckets = if query.bucket.is_empty() { + store + .list_bucket(&BucketOptions::default()) + .await + .map_err(|e| s3_error!(InternalError, "list buckets failed: {e}"))? + } else { + let bucket = store + .get_bucket_info(&query.bucket, &BucketOptions::default()) + .await + .map_err(|e| s3_error!(InternalError, "get bucket failed: {e}"))?; + vec![bucket] + }; + + let mut zip_writer = ZipWriter::new(Cursor::new(Vec::new())); + + let confs = [ + BUCKET_POLICY_CONFIG, + BUCKET_NOTIFICATION_CONFIG, + BUCKET_LIFECYCLE_CONFIG, + BUCKET_SSECONFIG, + BUCKET_TAGGING_CONFIG, + BUCKET_QUOTA_CONFIG_FILE, + OBJECT_LOCK_CONFIG, + BUCKET_VERSIONING_CONFIG, + BUCKET_REPLICATION_CONFIG, + BUCKET_TARGETS_FILE, + ]; + + for bucket in buckets { + for &conf in confs.iter() { + let conf_path = path_join_buf(&[bucket.name.as_str(), conf]); + match conf { + BUCKET_POLICY_CONFIG => { + let config = match metadata_sys::get_bucket_policy(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_json = + serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_json) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_NOTIFICATION_CONFIG => { + let config = match metadata_sys::get_notification_config(&bucket.name).await { + Ok(Some(res)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + Ok(None) => continue, + }; + + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_LIFECYCLE_CONFIG => { + let config = match metadata_sys::get_lifecycle_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_TAGGING_CONFIG => { + let config = match metadata_sys::get_tagging_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_QUOTA_CONFIG_FILE => { + let config = match metadata_sys::get_quota_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_json = + serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_json) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + OBJECT_LOCK_CONFIG => { + let config = match metadata_sys::get_object_lock_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_SSECONFIG => { + let config = match metadata_sys::get_sse_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_VERSIONING_CONFIG => { + let config = match metadata_sys::get_versioning_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_REPLICATION_CONFIG => { + let config = match metadata_sys::get_replication_config(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + let config_xml = + serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_xml) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + BUCKET_TARGETS_FILE => { + let config = match metadata_sys::get_bucket_targets_config(&bucket.name).await { + Ok(res) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; + + let config_json = + serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; + + zip_writer + .start_file(conf_path, SimpleFileOptions::default()) + .map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?; + zip_writer + .write_all(&config_json) + .map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?; + } + _ => {} + } + } + } + + let zip_bytes = zip_writer + .finish() + .map_err(|e| s3_error!(InternalError, "finish zip failed: {e}"))?; + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/zip".parse().unwrap()); + header.insert(CONTENT_DISPOSITION, "attachment; filename=bucket-meta.zip".parse().unwrap()); + header.insert(CONTENT_LENGTH, zip_bytes.get_ref().len().to_string().parse().unwrap()); + Ok(S3Response::with_headers((StatusCode::OK, Body::from(zip_bytes.into_inner())), header)) + } +} + +#[derive(Debug, Default, Deserialize)] +pub struct ImportBucketMetadataQuery { + pub bucket: String, +} + +pub struct ImportBucketMetadata {} + +#[async_trait::async_trait] +impl Operation for ImportBucketMetadata { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + let _query = { + if let Some(query) = req.uri.query() { + let input: ImportBucketMetadataQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; + input + } else { + ImportBucketMetadataQuery::default() + } + }; + + let Some(input_cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "get cred failed")); + }; + + let (_cred, _owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + } +} diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 38c3030a..17046522 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -5,7 +5,7 @@ pub mod utils; // use ecstore::global::{is_dist_erasure, is_erasure}; use handlers::{ - group, policys, pools, rebalance, + bucket_meta, group, policys, pools, rebalance, service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount}, sts, tier, user, }; @@ -119,11 +119,85 @@ pub fn make_admin_route() -> std::io::Result { format!("{}{}", ADMIN_PREFIX, "/v3/background-heal/status").as_str(), AdminOperation(&handlers::BackgroundHealStatusHandler {}), )?; - // } + + // ? + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/tier").as_str(), + AdminOperation(&tier::ListTiers {}), + )?; + // ? + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/tier-stats").as_str(), + AdminOperation(&tier::GetTierInfo {}), + )?; + // ?force=xxx + r.insert( + Method::DELETE, + format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(), + AdminOperation(&tier::RemoveTier {}), + )?; + // ?force=xxx + // body: AddOrUpdateTierReq + r.insert( + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/v3/tier").as_str(), + AdminOperation(&tier::AddTier {}), + )?; + // ? + // body: AddOrUpdateTierReq + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(), + AdminOperation(&tier::EditTier {}), + )?; + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/tier/clear").as_str(), + AdminOperation(&tier::ClearTier {}), + )?; + + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/export-bucket-metadata").as_str(), + AdminOperation(&bucket_meta::ExportBucketMetadata {}), + )?; + + r.insert( + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/import-bucket-metadata").as_str(), + AdminOperation(&bucket_meta::ImportBucketMetadata {}), + )?; + + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(), + AdminOperation(&ListRemoteTargetHandler {}), + )?; + + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(), + AdminOperation(&GetReplicationMetricsHandler {}), + )?; + + r.insert( + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(), + AdminOperation(&SetRemoteTargetHandler {}), + )?; + + r.insert( + Method::DELETE, + format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(), + AdminOperation(&RemoveRemoteTargetHandler {}), + )?; Ok(r) } +/// user router fn register_user_route(r: &mut S3Router) -> std::io::Result<()> { // 1 r.insert( @@ -240,30 +314,6 @@ fn register_user_route(r: &mut S3Router) -> std::io::Result<()> AdminOperation(&user::ImportIam {}), )?; - r.insert( - Method::GET, - format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(), - AdminOperation(&ListRemoteTargetHandler {}), - )?; - - r.insert( - Method::GET, - format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(), - AdminOperation(&GetReplicationMetricsHandler {}), - )?; - - r.insert( - Method::PUT, - format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(), - AdminOperation(&SetRemoteTargetHandler {}), - )?; - - r.insert( - Method::DELETE, - format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(), - AdminOperation(&RemoveRemoteTargetHandler {}), - )?; - // list-canned-policies?bucket=xxx r.insert( Method::GET, @@ -299,43 +349,5 @@ fn register_user_route(r: &mut S3Router) -> std::io::Result<()> AdminOperation(&policys::SetPolicyForUserOrGroup {}), )?; - // ? - r.insert( - Method::GET, - format!("{}{}", ADMIN_PREFIX, "/v3/tier").as_str(), - AdminOperation(&tier::ListTiers {}), - )?; - // ? - r.insert( - Method::GET, - format!("{}{}", ADMIN_PREFIX, "/v3/tier-stats").as_str(), - AdminOperation(&tier::GetTierInfo {}), - )?; - // ?force=xxx - r.insert( - Method::DELETE, - format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(), - AdminOperation(&tier::RemoveTier {}), - )?; - // ?force=xxx - // body: AddOrUpdateTierReq - r.insert( - Method::PUT, - format!("{}{}", ADMIN_PREFIX, "/v3/tier").as_str(), - AdminOperation(&tier::AddTier {}), - )?; - // ? - // body: AddOrUpdateTierReq - r.insert( - Method::POST, - format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(), - AdminOperation(&tier::EditTier {}), - )?; - r.insert( - Method::POST, - format!("{}{}", ADMIN_PREFIX, "/v3/tier/clear").as_str(), - AdminOperation(&tier::ClearTier {}), - )?; - Ok(()) } From c8868a47d7c9fa8161cbfb8bc399cb21d396be8d Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 1 Jul 2025 16:41:24 +0800 Subject: [PATCH 3/4] fix: resolve Send trait issue in ImportBucketMetadata async function --- rustfs/src/admin/handlers/bucket_meta.rs | 117 ++++++++++++++++++++++- 1 file changed, 112 insertions(+), 5 deletions(-) diff --git a/rustfs/src/admin/handlers/bucket_meta.rs b/rustfs/src/admin/handlers/bucket_meta.rs index 6fc0f4de..1e9b3bb0 100644 --- a/rustfs/src/admin/handlers/bucket_meta.rs +++ b/rustfs/src/admin/handlers/bucket_meta.rs @@ -1,10 +1,13 @@ -use std::io::{Cursor, Write as _}; +use std::{ + collections::HashMap, + io::{Cursor, Read as _, Write as _}, + sync::Arc, +}; use crate::{ admin::router::Operation, auth::{check_key_valid, get_session_token}, }; -use ecstore::bucket::utils::serialize; use ecstore::{ StorageAPI, bucket::{ @@ -19,9 +22,10 @@ use ecstore::{ new_object_layer_fn, store_api::BucketOptions, }; +use ecstore::{bucket::utils::serialize, store_api::MakeBucketOptions}; use http::{HeaderMap, StatusCode}; use matchit::Params; -use rustfs_utils::path::path_join_buf; +use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf}; use s3s::{ Body, S3Request, S3Response, S3Result, header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}, @@ -29,7 +33,8 @@ use s3s::{ }; use serde::Deserialize; use serde_urlencoded::from_bytes; -use zip::{ZipArchive, ZipWriter, result::ZipError, write::SimpleFileOptions}; +use tracing::warn; +use zip::{ZipArchive, ZipWriter, write::SimpleFileOptions}; #[derive(Debug, Default, serde::Deserialize)] pub struct ExportBucketMetadataQuery { @@ -95,7 +100,7 @@ impl Operation for ExportBucketMetadata { let conf_path = path_join_buf(&[bucket.name.as_str(), conf]); match conf { BUCKET_POLICY_CONFIG => { - let config = match metadata_sys::get_bucket_policy(&bucket.name).await { + let config: policy::policy::BucketPolicy = match metadata_sys::get_bucket_policy(&bucket.name).await { Ok((res, _)) => res, Err(e) => { if e == StorageError::ConfigNotFound { @@ -314,6 +319,7 @@ impl Operation for ExportBucketMetadata { #[derive(Debug, Default, Deserialize)] pub struct ImportBucketMetadataQuery { + #[allow(dead_code)] pub bucket: String, } @@ -339,6 +345,107 @@ impl Operation for ImportBucketMetadata { let (_cred, _owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + let mut input = req.input; + let body = match input.store_all_unlimited().await { + Ok(b) => b, + Err(e) => { + warn!("get body failed, e: {:?}", e); + return Err(s3_error!(InvalidRequest, "get body failed")); + } + }; + + let mut zip_reader = ZipArchive::new(Cursor::new(body)).map_err(|e| s3_error!(InternalError, "get body failed: {e}"))?; + + // First pass: read all file contents into memory + let mut file_contents = Vec::new(); + for i in 0..zip_reader.len() { + let mut file = zip_reader + .by_index(i) + .map_err(|e| s3_error!(InternalError, "get file failed: {e}"))?; + let file_path = file.name().to_string(); + + let mut content = Vec::new(); + file.read_to_end(&mut content) + .map_err(|e| s3_error!(InternalError, "read file failed: {e}"))?; + + file_contents.push((file_path, content)); + } + + // Extract bucket names + let mut bucket_names = Vec::new(); + for (file_path, _) in &file_contents { + let file_path_split = file_path.split(SLASH_SEPARATOR).collect::>(); + + if file_path_split.len() < 2 { + warn!("file path is invalid: {}", file_path); + continue; + } + + let bucket_name = file_path_split[0].to_string(); + if !bucket_names.contains(&bucket_name) { + bucket_names.push(bucket_name); + } + } + + // Get existing bucket metadata + let mut bucket_metadatas = HashMap::new(); + for bucket_name in bucket_names { + match metadata_sys::get_config_from_disk(&bucket_name).await { + Ok(res) => bucket_metadatas.insert(bucket_name, Arc::new(res)), + Err(e) => { + if e == StorageError::ConfigNotFound { + warn!("bucket metadata not found: {e}"); + continue; + } + warn!("get bucket metadata failed: {e}"); + continue; + } + }; + } + + let Some(store) = new_object_layer_fn() else { + return Err(s3_error!(InvalidRequest, "object store not init")); + }; + + // Second pass: process file contents + for (file_path, content) in file_contents { + let file_path_split = file_path.split(SLASH_SEPARATOR).collect::>(); + + if file_path_split.len() < 2 { + warn!("file path is invalid: {}", file_path); + continue; + } + + let bucket_name = file_path_split[0]; + let conf_name = file_path_split[1]; + + // create bucket if not exists + if !bucket_metadatas.contains_key(bucket_name) { + if let Err(e) = store + .make_bucket( + bucket_name, + &MakeBucketOptions { + force_create: true, + ..Default::default() + }, + ) + .await + { + warn!("create bucket failed: {e}"); + continue; + } + + let metadata = metadata_sys::get(bucket_name).await.unwrap_or_default(); + bucket_metadatas.insert(bucket_name.to_string(), metadata.clone()); + } + + if conf_name == BUCKET_POLICY_CONFIG { + let _config: policy::policy::BucketPolicy = + serde_json::from_slice(&content).map_err(|e| s3_error!(InternalError, "deserialize config failed: {e}"))?; + // TODO: Apply the configuration + } + } + let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) From cf7a930d3986bb592caee91d72994f67fd7f57e9 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 1 Jul 2025 17:26:47 +0800 Subject: [PATCH 4/4] todo --- ecstore/src/bucket/metadata_sys.rs | 2 +- ecstore/src/disk/fs.rs | 7 --- ecstore/src/disk/os.rs | 3 +- rustfs/src/admin/handlers/bucket_meta.rs | 61 ++++++++++++++++++------ 4 files changed, 50 insertions(+), 23 deletions(-) diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 333f5900..a9efd66d 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -56,7 +56,7 @@ pub async fn set_bucket_metadata(bucket: String, bm: BucketMetadata) -> Result<( Ok(()) } -pub(crate) async fn get(bucket: &str) -> Result> { +pub async fn get(bucket: &str) -> Result> { let sys = get_bucket_metadata_sys()?; let lock = sys.read().await; lock.get(bucket).await diff --git a/ecstore/src/disk/fs.rs b/ecstore/src/disk/fs.rs index 07475e07..f083e6d3 100644 --- a/ecstore/src/disk/fs.rs +++ b/ecstore/src/disk/fs.rs @@ -5,8 +5,6 @@ use tokio::{ io, }; -pub const SLASH_SEPARATOR: &str = "/"; - #[cfg(not(windows))] pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool { use std::os::unix::fs::MetadataExt; @@ -522,9 +520,4 @@ mod tests { // Should be different files assert!(!same_file(&metadata1, &metadata2)); } - - #[test] - fn test_slash_separator() { - assert_eq!(SLASH_SEPARATOR, "/"); - } } diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index 6158c1d4..67df445b 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -5,6 +5,7 @@ use std::{ use super::error::Result; use crate::disk::error_conv::to_file_error; +use rustfs_utils::path::SLASH_SEPARATOR; use tokio::fs; use tracing::warn; @@ -90,7 +91,7 @@ pub async fn read_dir(path: impl AsRef, count: i32) -> std::io::Result { - let config = match metadata_sys::get_notification_config(&bucket.name).await { - Ok(Some(res)) => res, - Err(e) => { - if e == StorageError::ConfigNotFound { - continue; + let config: s3s::dto::NotificationConfiguration = + match metadata_sys::get_notification_config(&bucket.name).await { + Ok(Some(res)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; + } + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); } - return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); - } - Ok(None) => continue, - }; + Ok(None) => continue, + }; let config_xml = serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; @@ -407,6 +413,8 @@ impl Operation for ImportBucketMetadata { return Err(s3_error!(InvalidRequest, "object store not init")); }; + let update_at = OffsetDateTime::now_utc(); + // Second pass: process file contents for (file_path, content) in file_contents { let file_path_split = file_path.split(SLASH_SEPARATOR).collect::>(); @@ -439,10 +447,35 @@ impl Operation for ImportBucketMetadata { bucket_metadatas.insert(bucket_name.to_string(), metadata.clone()); } - if conf_name == BUCKET_POLICY_CONFIG { - let _config: policy::policy::BucketPolicy = - serde_json::from_slice(&content).map_err(|e| s3_error!(InternalError, "deserialize config failed: {e}"))?; - // TODO: Apply the configuration + match conf_name { + BUCKET_POLICY_CONFIG => { + let config: policy::policy::BucketPolicy = match serde_json::from_slice(&content) { + Ok(config) => config, + Err(e) => { + warn!("deserialize config failed: {e}"); + continue; + } + }; + + if config.version.is_empty() { + continue; + } + + // let mut metadata = bucket_metadatas.get_mut(bucket_name).unwrap().clone(); + // metadata.policy_config_json = content; + // metadata.policy_config_updated_at = update_at; + } + BUCKET_NOTIFICATION_CONFIG => { + if let Err(e) = deserialize::(&content) { + warn!("deserialize config failed: {e}"); + continue; + } + + // let mut metadata = bucket_metadatas.get_mut(bucket_name).unwrap().clone(); + // metadata.notification_config_xml = content; + // metadata.notification_config_updated_at = update_at; + } + _ => {} } }