mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix: resolve Send trait issue in ImportBucketMetadata async function
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
use std::io::{Cursor, Write as _};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::{Cursor, Read as _, Write as _},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
admin::router::Operation,
|
||||
auth::{check_key_valid, get_session_token},
|
||||
};
|
||||
use ecstore::bucket::utils::serialize;
|
||||
use ecstore::{
|
||||
StorageAPI,
|
||||
bucket::{
|
||||
@@ -19,9 +22,10 @@ use ecstore::{
|
||||
new_object_layer_fn,
|
||||
store_api::BucketOptions,
|
||||
};
|
||||
use ecstore::{bucket::utils::serialize, store_api::MakeBucketOptions};
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use matchit::Params;
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf};
|
||||
use s3s::{
|
||||
Body, S3Request, S3Response, S3Result,
|
||||
header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE},
|
||||
@@ -29,7 +33,8 @@ use s3s::{
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde_urlencoded::from_bytes;
|
||||
use zip::{ZipArchive, ZipWriter, result::ZipError, write::SimpleFileOptions};
|
||||
use tracing::warn;
|
||||
use zip::{ZipArchive, ZipWriter, write::SimpleFileOptions};
|
||||
|
||||
#[derive(Debug, Default, serde::Deserialize)]
|
||||
pub struct ExportBucketMetadataQuery {
|
||||
@@ -95,7 +100,7 @@ impl Operation for ExportBucketMetadata {
|
||||
let conf_path = path_join_buf(&[bucket.name.as_str(), conf]);
|
||||
match conf {
|
||||
BUCKET_POLICY_CONFIG => {
|
||||
let config = match metadata_sys::get_bucket_policy(&bucket.name).await {
|
||||
let config: policy::policy::BucketPolicy = match metadata_sys::get_bucket_policy(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
@@ -314,6 +319,7 @@ impl Operation for ExportBucketMetadata {
|
||||
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct ImportBucketMetadataQuery {
|
||||
#[allow(dead_code)]
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
@@ -339,6 +345,107 @@ impl Operation for ImportBucketMetadata {
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let mut input = req.input;
|
||||
let body = match input.store_all_unlimited().await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("get body failed, e: {:?}", e);
|
||||
return Err(s3_error!(InvalidRequest, "get body failed"));
|
||||
}
|
||||
};
|
||||
|
||||
let mut zip_reader = ZipArchive::new(Cursor::new(body)).map_err(|e| s3_error!(InternalError, "get body failed: {e}"))?;
|
||||
|
||||
// First pass: read all file contents into memory
|
||||
let mut file_contents = Vec::new();
|
||||
for i in 0..zip_reader.len() {
|
||||
let mut file = zip_reader
|
||||
.by_index(i)
|
||||
.map_err(|e| s3_error!(InternalError, "get file failed: {e}"))?;
|
||||
let file_path = file.name().to_string();
|
||||
|
||||
let mut content = Vec::new();
|
||||
file.read_to_end(&mut content)
|
||||
.map_err(|e| s3_error!(InternalError, "read file failed: {e}"))?;
|
||||
|
||||
file_contents.push((file_path, content));
|
||||
}
|
||||
|
||||
// Extract bucket names
|
||||
let mut bucket_names = Vec::new();
|
||||
for (file_path, _) in &file_contents {
|
||||
let file_path_split = file_path.split(SLASH_SEPARATOR).collect::<Vec<&str>>();
|
||||
|
||||
if file_path_split.len() < 2 {
|
||||
warn!("file path is invalid: {}", file_path);
|
||||
continue;
|
||||
}
|
||||
|
||||
let bucket_name = file_path_split[0].to_string();
|
||||
if !bucket_names.contains(&bucket_name) {
|
||||
bucket_names.push(bucket_name);
|
||||
}
|
||||
}
|
||||
|
||||
// Get existing bucket metadata
|
||||
let mut bucket_metadatas = HashMap::new();
|
||||
for bucket_name in bucket_names {
|
||||
match metadata_sys::get_config_from_disk(&bucket_name).await {
|
||||
Ok(res) => bucket_metadatas.insert(bucket_name, Arc::new(res)),
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
warn!("bucket metadata not found: {e}");
|
||||
continue;
|
||||
}
|
||||
warn!("get bucket metadata failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(s3_error!(InvalidRequest, "object store not init"));
|
||||
};
|
||||
|
||||
// Second pass: process file contents
|
||||
for (file_path, content) in file_contents {
|
||||
let file_path_split = file_path.split(SLASH_SEPARATOR).collect::<Vec<&str>>();
|
||||
|
||||
if file_path_split.len() < 2 {
|
||||
warn!("file path is invalid: {}", file_path);
|
||||
continue;
|
||||
}
|
||||
|
||||
let bucket_name = file_path_split[0];
|
||||
let conf_name = file_path_split[1];
|
||||
|
||||
// create bucket if not exists
|
||||
if !bucket_metadatas.contains_key(bucket_name) {
|
||||
if let Err(e) = store
|
||||
.make_bucket(
|
||||
bucket_name,
|
||||
&MakeBucketOptions {
|
||||
force_create: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("create bucket failed: {e}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let metadata = metadata_sys::get(bucket_name).await.unwrap_or_default();
|
||||
bucket_metadatas.insert(bucket_name.to_string(), metadata.clone());
|
||||
}
|
||||
|
||||
if conf_name == BUCKET_POLICY_CONFIG {
|
||||
let _config: policy::policy::BucketPolicy =
|
||||
serde_json::from_slice(&content).map_err(|e| s3_error!(InternalError, "deserialize config failed: {e}"))?;
|
||||
// TODO: Apply the configuration
|
||||
}
|
||||
}
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
|
||||
|
||||
Reference in New Issue
Block a user