mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
feat: add ImportBucketMetadata handler
This commit is contained in:
@@ -19,7 +19,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{error, warn};
|
||||
use tracing::error;
|
||||
|
||||
use super::metadata::{BucketMetadata, load_bucket_metadata};
|
||||
use super::quota::BucketQuota;
|
||||
@@ -56,7 +56,7 @@ pub async fn set_bucket_metadata(bucket: String, bm: BucketMetadata) -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get(bucket: &str) -> Result<Arc<BucketMetadata>> {
|
||||
pub async fn get(bucket: &str) -> Result<Arc<BucketMetadata>> {
|
||||
let sys = get_bucket_metadata_sys()?;
|
||||
let lock = sys.read().await;
|
||||
lock.get(bucket).await
|
||||
@@ -76,6 +76,27 @@ pub async fn delete(bucket: &str, config_file: &str) -> Result<OffsetDateTime> {
|
||||
bucket_meta_sys.delete(bucket, config_file).await
|
||||
}
|
||||
|
||||
pub async fn get_bucket_policy(bucket: &str) -> Result<(BucketPolicy, OffsetDateTime)> {
|
||||
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
|
||||
|
||||
bucket_meta_sys.get_bucket_policy(bucket).await
|
||||
}
|
||||
|
||||
pub async fn get_quota_config(bucket: &str) -> Result<(BucketQuota, OffsetDateTime)> {
|
||||
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
|
||||
|
||||
bucket_meta_sys.get_quota_config(bucket).await
|
||||
}
|
||||
|
||||
pub async fn get_bucket_targets_config(bucket: &str) -> Result<BucketTargets> {
|
||||
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
|
||||
|
||||
bucket_meta_sys.get_bucket_targets_config(bucket).await
|
||||
}
|
||||
|
||||
pub async fn get_tagging_config(bucket: &str) -> Result<(Tagging, OffsetDateTime)> {
|
||||
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
|
||||
@@ -340,7 +361,6 @@ impl BucketMetadataSys {
|
||||
}
|
||||
|
||||
pub async fn get_config_from_disk(&self, bucket: &str) -> Result<BucketMetadata> {
|
||||
println!("load data from disk");
|
||||
if is_meta_bucketname(bucket) {
|
||||
return Err(Error::other("errInvalidArgument"));
|
||||
}
|
||||
@@ -381,7 +401,6 @@ impl BucketMetadataSys {
|
||||
let bm = match self.get_config(bucket).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(err) => {
|
||||
warn!("get_versioning_config err {:?}", &err);
|
||||
return if err == Error::ConfigNotFound {
|
||||
Ok((VersioningConfiguration::default(), OffsetDateTime::UNIX_EPOCH))
|
||||
} else {
|
||||
@@ -445,7 +464,6 @@ impl BucketMetadataSys {
|
||||
let bm = match self.get_config(bucket).await {
|
||||
Ok((bm, _)) => bm.notification_config.clone(),
|
||||
Err(err) => {
|
||||
warn!("get_notification_config err {:?}", &err);
|
||||
if err == Error::ConfigNotFound {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -21,7 +21,7 @@ impl PolicySys {
|
||||
}
|
||||
pub async fn get(bucket: &str) -> Result<BucketPolicy> {
|
||||
let bucket_meta_sys_lock = get_bucket_metadata_sys()?;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.write().await;
|
||||
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
|
||||
|
||||
let (cfg, _) = bucket_meta_sys.get_bucket_policy(bucket).await?;
|
||||
|
||||
|
||||
@@ -5,8 +5,6 @@ use tokio::{
|
||||
io,
|
||||
};
|
||||
|
||||
pub const SLASH_SEPARATOR: &str = "/";
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
@@ -522,9 +520,4 @@ mod tests {
|
||||
// Should be different files
|
||||
assert!(!same_file(&metadata1, &metadata2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_slash_separator() {
|
||||
assert_eq!(SLASH_SEPARATOR, "/");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +135,7 @@ impl LocalDisk {
|
||||
Ok(path) => path,
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
return Err(DiskError::VolumeNotFound.into());
|
||||
return Err(DiskError::VolumeNotFound);
|
||||
}
|
||||
return Err(to_file_error(e).into());
|
||||
}
|
||||
@@ -162,7 +162,7 @@ impl LocalDisk {
|
||||
let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
|
||||
|
||||
if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx {
|
||||
return Err(Error::from(DiskError::InconsistentDisk));
|
||||
return Err(DiskError::InconsistentDisk);
|
||||
}
|
||||
|
||||
id = Some(fm.erasure.this);
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::{
|
||||
|
||||
use super::error::Result;
|
||||
use crate::disk::error_conv::to_file_error;
|
||||
use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use tokio::fs;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -90,7 +91,7 @@ pub async fn read_dir(path: impl AsRef<Path>, count: i32) -> std::io::Result<Vec
|
||||
if file_type.is_file() {
|
||||
volumes.push(name);
|
||||
} else if file_type.is_dir() {
|
||||
volumes.push(format!("{}{}", name, super::fs::SLASH_SEPARATOR));
|
||||
volumes.push(format!("{}{}", name, SLASH_SEPARATOR));
|
||||
}
|
||||
count -= 1;
|
||||
if count == 0 {
|
||||
|
||||
@@ -56,6 +56,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{error, info, warn};
|
||||
// use url::UrlQuery;
|
||||
|
||||
pub mod bucket_meta;
|
||||
pub mod event;
|
||||
pub mod group;
|
||||
pub mod policys;
|
||||
|
||||
486
rustfs/src/admin/handlers/bucket_meta.rs
Normal file
486
rustfs/src/admin/handlers/bucket_meta.rs
Normal file
@@ -0,0 +1,486 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::{Cursor, Read as _, Write as _},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
admin::router::Operation,
|
||||
auth::{check_key_valid, get_session_token},
|
||||
};
|
||||
|
||||
use ecstore::{
|
||||
StorageAPI,
|
||||
bucket::{
|
||||
metadata::{
|
||||
BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_QUOTA_CONFIG_FILE,
|
||||
BUCKET_REPLICATION_CONFIG, BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, BUCKET_TARGETS_FILE, BUCKET_VERSIONING_CONFIG,
|
||||
OBJECT_LOCK_CONFIG,
|
||||
},
|
||||
metadata_sys,
|
||||
},
|
||||
error::StorageError,
|
||||
new_object_layer_fn,
|
||||
store_api::BucketOptions,
|
||||
};
|
||||
use ecstore::{
|
||||
bucket::utils::{deserialize, serialize},
|
||||
store_api::MakeBucketOptions,
|
||||
};
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use matchit::Params;
|
||||
use rustfs_utils::path::{SLASH_SEPARATOR, path_join_buf};
|
||||
use s3s::{
|
||||
Body, S3Request, S3Response, S3Result,
|
||||
header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE},
|
||||
s3_error,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde_urlencoded::from_bytes;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::warn;
|
||||
use zip::{ZipArchive, ZipWriter, write::SimpleFileOptions};
|
||||
|
||||
#[derive(Debug, Default, serde::Deserialize)]
|
||||
pub struct ExportBucketMetadataQuery {
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
pub struct ExportBucketMetadata {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ExportBucketMetadata {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: ExportBucketMetadataQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
ExportBucketMetadataQuery::default()
|
||||
}
|
||||
};
|
||||
|
||||
let Some(input_cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
};
|
||||
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(s3_error!(InvalidRequest, "object store not init"));
|
||||
};
|
||||
|
||||
let buckets = if query.bucket.is_empty() {
|
||||
store
|
||||
.list_bucket(&BucketOptions::default())
|
||||
.await
|
||||
.map_err(|e| s3_error!(InternalError, "list buckets failed: {e}"))?
|
||||
} else {
|
||||
let bucket = store
|
||||
.get_bucket_info(&query.bucket, &BucketOptions::default())
|
||||
.await
|
||||
.map_err(|e| s3_error!(InternalError, "get bucket failed: {e}"))?;
|
||||
vec![bucket]
|
||||
};
|
||||
|
||||
let mut zip_writer = ZipWriter::new(Cursor::new(Vec::new()));
|
||||
|
||||
let confs = [
|
||||
BUCKET_POLICY_CONFIG,
|
||||
BUCKET_NOTIFICATION_CONFIG,
|
||||
BUCKET_LIFECYCLE_CONFIG,
|
||||
BUCKET_SSECONFIG,
|
||||
BUCKET_TAGGING_CONFIG,
|
||||
BUCKET_QUOTA_CONFIG_FILE,
|
||||
OBJECT_LOCK_CONFIG,
|
||||
BUCKET_VERSIONING_CONFIG,
|
||||
BUCKET_REPLICATION_CONFIG,
|
||||
BUCKET_TARGETS_FILE,
|
||||
];
|
||||
|
||||
for bucket in buckets {
|
||||
for &conf in confs.iter() {
|
||||
let conf_path = path_join_buf(&[bucket.name.as_str(), conf]);
|
||||
match conf {
|
||||
BUCKET_POLICY_CONFIG => {
|
||||
let config: policy::policy::BucketPolicy = match metadata_sys::get_bucket_policy(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_json =
|
||||
serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_json)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_NOTIFICATION_CONFIG => {
|
||||
let config: s3s::dto::NotificationConfiguration =
|
||||
match metadata_sys::get_notification_config(&bucket.name).await {
|
||||
Ok(Some(res)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
Ok(None) => continue,
|
||||
};
|
||||
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_LIFECYCLE_CONFIG => {
|
||||
let config = match metadata_sys::get_lifecycle_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_TAGGING_CONFIG => {
|
||||
let config = match metadata_sys::get_tagging_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_QUOTA_CONFIG_FILE => {
|
||||
let config = match metadata_sys::get_quota_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_json =
|
||||
serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_json)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
OBJECT_LOCK_CONFIG => {
|
||||
let config = match metadata_sys::get_object_lock_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_SSECONFIG => {
|
||||
let config = match metadata_sys::get_sse_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_VERSIONING_CONFIG => {
|
||||
let config = match metadata_sys::get_versioning_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_REPLICATION_CONFIG => {
|
||||
let config = match metadata_sys::get_replication_config(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_xml =
|
||||
serialize(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_xml)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
BUCKET_TARGETS_FILE => {
|
||||
let config = match metadata_sys::get_bucket_targets_config(&bucket.name).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
|
||||
let config_json =
|
||||
serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
|
||||
zip_writer
|
||||
.start_file(conf_path, SimpleFileOptions::default())
|
||||
.map_err(|e| s3_error!(InternalError, "start file failed: {e}"))?;
|
||||
zip_writer
|
||||
.write_all(&config_json)
|
||||
.map_err(|e| s3_error!(InternalError, "write file failed: {e}"))?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let zip_bytes = zip_writer
|
||||
.finish()
|
||||
.map_err(|e| s3_error!(InternalError, "finish zip failed: {e}"))?;
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/zip".parse().unwrap());
|
||||
header.insert(CONTENT_DISPOSITION, "attachment; filename=bucket-meta.zip".parse().unwrap());
|
||||
header.insert(CONTENT_LENGTH, zip_bytes.get_ref().len().to_string().parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(zip_bytes.into_inner())), header))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct ImportBucketMetadataQuery {
|
||||
#[allow(dead_code)]
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
pub struct ImportBucketMetadata {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ImportBucketMetadata {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let _query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: ImportBucketMetadataQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
ImportBucketMetadataQuery::default()
|
||||
}
|
||||
};
|
||||
|
||||
let Some(input_cred) = req.credentials else {
|
||||
return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
};
|
||||
|
||||
let (_cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let mut input = req.input;
|
||||
let body = match input.store_all_unlimited().await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
warn!("get body failed, e: {:?}", e);
|
||||
return Err(s3_error!(InvalidRequest, "get body failed"));
|
||||
}
|
||||
};
|
||||
|
||||
let mut zip_reader = ZipArchive::new(Cursor::new(body)).map_err(|e| s3_error!(InternalError, "get body failed: {e}"))?;
|
||||
|
||||
// First pass: read all file contents into memory
|
||||
let mut file_contents = Vec::new();
|
||||
for i in 0..zip_reader.len() {
|
||||
let mut file = zip_reader
|
||||
.by_index(i)
|
||||
.map_err(|e| s3_error!(InternalError, "get file failed: {e}"))?;
|
||||
let file_path = file.name().to_string();
|
||||
|
||||
let mut content = Vec::new();
|
||||
file.read_to_end(&mut content)
|
||||
.map_err(|e| s3_error!(InternalError, "read file failed: {e}"))?;
|
||||
|
||||
file_contents.push((file_path, content));
|
||||
}
|
||||
|
||||
// Extract bucket names
|
||||
let mut bucket_names = Vec::new();
|
||||
for (file_path, _) in &file_contents {
|
||||
let file_path_split = file_path.split(SLASH_SEPARATOR).collect::<Vec<&str>>();
|
||||
|
||||
if file_path_split.len() < 2 {
|
||||
warn!("file path is invalid: {}", file_path);
|
||||
continue;
|
||||
}
|
||||
|
||||
let bucket_name = file_path_split[0].to_string();
|
||||
if !bucket_names.contains(&bucket_name) {
|
||||
bucket_names.push(bucket_name);
|
||||
}
|
||||
}
|
||||
|
||||
// Get existing bucket metadata
|
||||
let mut bucket_metadatas = HashMap::new();
|
||||
for bucket_name in bucket_names {
|
||||
match metadata_sys::get_config_from_disk(&bucket_name).await {
|
||||
Ok(res) => bucket_metadatas.insert(bucket_name, Arc::new(res)),
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
warn!("bucket metadata not found: {e}");
|
||||
continue;
|
||||
}
|
||||
warn!("get bucket metadata failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(s3_error!(InvalidRequest, "object store not init"));
|
||||
};
|
||||
|
||||
let update_at = OffsetDateTime::now_utc();
|
||||
|
||||
// Second pass: process file contents
|
||||
for (file_path, content) in file_contents {
|
||||
let file_path_split = file_path.split(SLASH_SEPARATOR).collect::<Vec<&str>>();
|
||||
|
||||
if file_path_split.len() < 2 {
|
||||
warn!("file path is invalid: {}", file_path);
|
||||
continue;
|
||||
}
|
||||
|
||||
let bucket_name = file_path_split[0];
|
||||
let conf_name = file_path_split[1];
|
||||
|
||||
// create bucket if not exists
|
||||
if !bucket_metadatas.contains_key(bucket_name) {
|
||||
if let Err(e) = store
|
||||
.make_bucket(
|
||||
bucket_name,
|
||||
&MakeBucketOptions {
|
||||
force_create: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("create bucket failed: {e}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let metadata = metadata_sys::get(bucket_name).await.unwrap_or_default();
|
||||
bucket_metadatas.insert(bucket_name.to_string(), metadata.clone());
|
||||
}
|
||||
|
||||
match conf_name {
|
||||
BUCKET_POLICY_CONFIG => {
|
||||
let config: policy::policy::BucketPolicy = match serde_json::from_slice(&content) {
|
||||
Ok(config) => config,
|
||||
Err(e) => {
|
||||
warn!("deserialize config failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if config.version.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// let mut metadata = bucket_metadatas.get_mut(bucket_name).unwrap().clone();
|
||||
// metadata.policy_config_json = content;
|
||||
// metadata.policy_config_updated_at = update_at;
|
||||
}
|
||||
BUCKET_NOTIFICATION_CONFIG => {
|
||||
if let Err(e) = deserialize::<s3s::dto::NotificationConfiguration>(&content) {
|
||||
warn!("deserialize config failed: {e}");
|
||||
continue;
|
||||
}
|
||||
|
||||
// let mut metadata = bucket_metadatas.get_mut(bucket_name).unwrap().clone();
|
||||
// metadata.notification_config_xml = content;
|
||||
// metadata.notification_config_updated_at = update_at;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
|
||||
}
|
||||
}
|
||||
@@ -5,8 +5,7 @@ pub mod utils;
|
||||
|
||||
// use ecstore::global::{is_dist_erasure, is_erasure};
|
||||
use handlers::{
|
||||
event::{ListNotificationTargets, RemoveNotificationTarget, SetNotificationTarget},
|
||||
group, policys, pools, rebalance,
|
||||
bucket_meta, group, policys, pools, rebalance,
|
||||
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
|
||||
sts, tier, user,
|
||||
};
|
||||
@@ -120,11 +119,85 @@ pub fn make_admin_route() -> std::io::Result<impl S3Route> {
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/background-heal/status").as_str(),
|
||||
AdminOperation(&handlers::BackgroundHealStatusHandler {}),
|
||||
)?;
|
||||
// }
|
||||
|
||||
// ?
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier").as_str(),
|
||||
AdminOperation(&tier::ListTiers {}),
|
||||
)?;
|
||||
// ?
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier-stats").as_str(),
|
||||
AdminOperation(&tier::GetTierInfo {}),
|
||||
)?;
|
||||
// ?force=xxx
|
||||
r.insert(
|
||||
Method::DELETE,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(),
|
||||
AdminOperation(&tier::RemoveTier {}),
|
||||
)?;
|
||||
// ?force=xxx
|
||||
// body: AddOrUpdateTierReq
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier").as_str(),
|
||||
AdminOperation(&tier::AddTier {}),
|
||||
)?;
|
||||
// ?
|
||||
// body: AddOrUpdateTierReq
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(),
|
||||
AdminOperation(&tier::EditTier {}),
|
||||
)?;
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier/clear").as_str(),
|
||||
AdminOperation(&tier::ClearTier {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/export-bucket-metadata").as_str(),
|
||||
AdminOperation(&bucket_meta::ExportBucketMetadata {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", ADMIN_PREFIX, "/import-bucket-metadata").as_str(),
|
||||
AdminOperation(&bucket_meta::ImportBucketMetadata {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
|
||||
AdminOperation(&ListRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
|
||||
AdminOperation(&GetReplicationMetricsHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
|
||||
AdminOperation(&SetRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::DELETE,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
|
||||
AdminOperation(&RemoveRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
/// user router
|
||||
fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()> {
|
||||
// 1
|
||||
r.insert(
|
||||
@@ -241,30 +314,6 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
|
||||
AdminOperation(&user::ImportIam {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
|
||||
AdminOperation(&ListRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
|
||||
AdminOperation(&GetReplicationMetricsHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
|
||||
AdminOperation(&SetRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::DELETE,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
|
||||
AdminOperation(&RemoveRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
// list-canned-policies?bucket=xxx
|
||||
r.insert(
|
||||
Method::GET,
|
||||
@@ -338,23 +387,5 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
|
||||
AdminOperation(&tier::ClearTier {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target-list").as_str(),
|
||||
AdminOperation(&ListNotificationTargets {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target-set").as_str(),
|
||||
AdminOperation(&SetNotificationTarget {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::DELETE,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/target-remove").as_str(),
|
||||
AdminOperation(&RemoveNotificationTarget {}),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user