From c8868a47d7c9fa8161cbfb8bc399cb21d396be8d Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 1 Jul 2025 16:41:24 +0800 Subject: [PATCH] fix: resolve Send trait issue in ImportBucketMetadata async function --- rustfs/src/admin/handlers/bucket_meta.rs | 117 ++++++++++++++++++++++- 1 file changed, 112 insertions(+), 5 deletions(-) diff --git a/rustfs/src/admin/handlers/bucket_meta.rs b/rustfs/src/admin/handlers/bucket_meta.rs index 6fc0f4de..1e9b3bb0 100644 --- a/rustfs/src/admin/handlers/bucket_meta.rs +++ b/rustfs/src/admin/handlers/bucket_meta.rs @@ -1,10 +1,13 @@ -use std::io::{Cursor, Write as _}; +use std::{ + collections::HashMap, + io::{Cursor, Read as _, Write as _}, + sync::Arc, +}; use crate::{ admin::router::Operation, auth::{check_key_valid, get_session_token}, }; -use ecstore::bucket::utils::serialize; use ecstore::{ StorageAPI, bucket::{ @@ -19,9 +22,10 @@ use ecstore::{ new_object_layer_fn, store_api::BucketOptions, }; +use ecstore::{bucket::utils::serialize, store_api::MakeBucketOptions}; use http::{HeaderMap, StatusCode}; use matchit::Params; -use rustfs_utils::path::path_join_buf; +use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf}; use s3s::{ Body, S3Request, S3Response, S3Result, header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE}, @@ -29,7 +33,8 @@ use s3s::{ }; use serde::Deserialize; use serde_urlencoded::from_bytes; -use zip::{ZipArchive, ZipWriter, result::ZipError, write::SimpleFileOptions}; +use tracing::warn; +use zip::{ZipArchive, ZipWriter, write::SimpleFileOptions}; #[derive(Debug, Default, serde::Deserialize)] pub struct ExportBucketMetadataQuery { @@ -95,7 +100,7 @@ impl Operation for ExportBucketMetadata { let conf_path = path_join_buf(&[bucket.name.as_str(), conf]); match conf { BUCKET_POLICY_CONFIG => { - let config = match metadata_sys::get_bucket_policy(&bucket.name).await { + let config: policy::policy::BucketPolicy = match metadata_sys::get_bucket_policy(&bucket.name).await { Ok((res, _)) => res, Err(e) => { if e == StorageError::ConfigNotFound { @@ -314,6 +319,7 @@ impl Operation for ExportBucketMetadata { #[derive(Debug, Default, Deserialize)] pub struct ImportBucketMetadataQuery { + #[allow(dead_code)] pub bucket: String, } @@ -339,6 +345,107 @@ impl Operation for ImportBucketMetadata { let (_cred, _owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + let mut input = req.input; + let body = match input.store_all_unlimited().await { + Ok(b) => b, + Err(e) => { + warn!("get body failed, e: {:?}", e); + return Err(s3_error!(InvalidRequest, "get body failed")); + } + }; + + let mut zip_reader = ZipArchive::new(Cursor::new(body)).map_err(|e| s3_error!(InternalError, "get body failed: {e}"))?; + + // First pass: read all file contents into memory + let mut file_contents = Vec::new(); + for i in 0..zip_reader.len() { + let mut file = zip_reader + .by_index(i) + .map_err(|e| s3_error!(InternalError, "get file failed: {e}"))?; + let file_path = file.name().to_string(); + + let mut content = Vec::new(); + file.read_to_end(&mut content) + .map_err(|e| s3_error!(InternalError, "read file failed: {e}"))?; + + file_contents.push((file_path, content)); + } + + // Extract bucket names + let mut bucket_names = Vec::new(); + for (file_path, _) in &file_contents { + let file_path_split = file_path.split(SLASH_SEPARATOR).collect::>(); + + if file_path_split.len() < 2 { + warn!("file path is invalid: {}", file_path); + continue; + } + + let bucket_name = file_path_split[0].to_string(); + if !bucket_names.contains(&bucket_name) { + bucket_names.push(bucket_name); + } + } + + // Get existing bucket metadata + let mut bucket_metadatas = HashMap::new(); + for bucket_name in bucket_names { + match metadata_sys::get_config_from_disk(&bucket_name).await { + Ok(res) => bucket_metadatas.insert(bucket_name, Arc::new(res)), + Err(e) => { + if e == StorageError::ConfigNotFound { + warn!("bucket metadata not found: {e}"); + continue; + } + warn!("get bucket metadata failed: {e}"); + continue; + } + }; + } + + let Some(store) = new_object_layer_fn() else { + return Err(s3_error!(InvalidRequest, "object store not init")); + }; + + // Second pass: process file contents + for (file_path, content) in file_contents { + let file_path_split = file_path.split(SLASH_SEPARATOR).collect::>(); + + if file_path_split.len() < 2 { + warn!("file path is invalid: {}", file_path); + continue; + } + + let bucket_name = file_path_split[0]; + let conf_name = file_path_split[1]; + + // create bucket if not exists + if !bucket_metadatas.contains_key(bucket_name) { + if let Err(e) = store + .make_bucket( + bucket_name, + &MakeBucketOptions { + force_create: true, + ..Default::default() + }, + ) + .await + { + warn!("create bucket failed: {e}"); + continue; + } + + let metadata = metadata_sys::get(bucket_name).await.unwrap_or_default(); + bucket_metadatas.insert(bucket_name.to_string(), metadata.clone()); + } + + if conf_name == BUCKET_POLICY_CONFIG { + let _config: policy::policy::BucketPolicy = + serde_json::from_slice(&content).map_err(|e| s3_error!(InternalError, "deserialize config failed: {e}"))?; + // TODO: Apply the configuration + } + } + let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))