feat:decom,rebalance

This commit is contained in:
weisd
2025-04-15 10:03:01 +08:00
parent 38b7bbbc49
commit f624ec5ba6
14 changed files with 1971 additions and 368 deletions

View File

@@ -94,7 +94,7 @@ pub async fn delete_config<S: StorageAPI>(api: Arc<S>, file: &str) -> Result<()>
}
}
async fn save_config_with_opts<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>, opts: &ObjectOptions) -> Result<()> {
pub async fn save_config_with_opts<S: StorageAPI>(api: Arc<S>, file: &str, data: Vec<u8>, opts: &ObjectOptions) -> Result<()> {
let size = data.len();
let _ = api
.put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::new(Box::new(Cursor::new(data)), size), opts)

View File

@@ -21,6 +21,7 @@ pub mod peer;
pub mod peer_rest_client;
pub mod pools;
mod quorum;
pub mod rebalance;
pub mod set_disk;
mod sets;
pub mod store;

View File

@@ -1,7 +1,7 @@
use crate::endpoints::EndpointServerPools;
use crate::global::get_global_endpoints;
use crate::peer_rest_client::PeerRestClient;
use crate::StorageAPI;
use crate::{endpoints::EndpointServerPools, new_object_layer_fn};
use common::error::{Error, Result};
use futures::future::join_all;
use lazy_static::lazy_static;
@@ -132,6 +132,41 @@ impl NotificationSys {
}
}
}
pub async fn load_rebalance_meta(&self, start: bool) {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {
futures.push(client.load_rebalance_meta(start));
}
let results = join_all(futures).await;
for result in results {
if let Err(err) = result {
error!("notification load_rebalance_meta err {:?}", err);
}
}
}
pub async fn stop_rebalance(&self) {
let Some(store) = new_object_layer_fn() else {
error!("stop_rebalance: not init");
return;
};
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {
futures.push(client.stop_rebalance());
}
let results = join_all(futures).await;
for result in results {
if let Err(err) = result {
error!("notification stop_rebalance err {:?}", err);
}
}
let _ = store.stop_rebalance().await;
}
}
fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec<madmin::Disk> {

View File

@@ -1,5 +1,5 @@
use crate::bucket::versioning_sys::BucketVersioningSys;
use crate::cache_value::metacache_set::{list_path_raw, AgreedFn, ListPathRawOptions, PartialFn};
use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions};
use crate::config::com::{read_config, save_config, CONFIG_PREFIX};
use crate::config::error::ConfigError;
use crate::disk::{MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
@@ -8,7 +8,9 @@ use crate::heal::heal_commands::HealOpts;
use crate::new_object_layer_fn;
use crate::notification_sys::get_global_notification_sys;
use crate::set_disk::SetDisks;
use crate::store_api::{BucketOptions, GetObjectReader, MakeBucketOptions, ObjectIO, ObjectOptions, StorageAPI};
use crate::store_api::{
BucketOptions, CompletePart, GetObjectReader, MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI,
};
use crate::store_err::{
is_err_bucket_exists, is_err_data_movement_overwrite, is_err_object_not_found, is_err_version_not_found, StorageError,
};
@@ -16,22 +18,20 @@ use crate::utils::path::{encode_dir_object, path_join, SLASH_SEPARATOR};
use crate::{sets::Sets, store::ECStore};
use ::workers::workers::Workers;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use common::defer;
use common::error::{Error, Result};
use futures::future::BoxFuture;
use http::HeaderMap;
use rmp_serde::{Deserializer, Serializer};
use serde::{de, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
use std::future::Future;
use std::io::{Cursor, Write};
use std::path::PathBuf;
use std::pin::Pin;
use std::rc;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use tokio::sync::broadcast::Receiver as B_Receiver;
use tracing::{error, info, warn};
use uuid::Uuid;
use workers::workers;
pub const POOL_META_NAME: &str = "pool.bin";
pub const POOL_META_FORMAT: u16 = 1;
@@ -57,22 +57,37 @@ pub struct PoolMeta {
}
impl PoolMeta {
pub fn new(pools: Vec<Arc<Sets>>) -> Self {
let mut status = Vec::with_capacity(pools.len());
pub fn new(pools: &[Arc<Sets>], prev_meta: &PoolMeta) -> Self {
let mut new_meta = Self {
version: POOL_META_VERSION,
pools: Vec::new(),
..Default::default()
};
for (idx, pool) in pools.iter().enumerate() {
status.push(PoolStatus {
id: idx,
let mut skip = false;
for current_pool in prev_meta.pools.iter() {
if current_pool.cmd_line == pool.endpoints.cmd_line {
new_meta.pools.push(current_pool.clone());
skip = true;
break;
}
}
if skip {
continue;
}
new_meta.pools.push(PoolStatus {
cmd_line: pool.endpoints.cmd_line.clone(),
id: idx,
last_update: OffsetDateTime::now_utc(),
decommission: None,
});
}
Self {
version: POOL_META_VERSION,
pools: status,
dont_save: false,
}
new_meta
}
pub fn is_suspended(&self, idx: usize) -> bool {
@@ -83,10 +98,8 @@ impl PoolMeta {
self.pools[idx].decommission.is_some()
}
pub async fn load(&mut self) -> Result<()> {
let Some(store) = new_object_layer_fn() else { return Err(Error::msg("errServerNotInitialized")) };
let data = match read_config(store.clone(), POOL_META_NAME).await {
pub async fn load(&mut self, pool: Arc<Sets>, _pools: Vec<Arc<Sets>>) -> Result<()> {
let data = match read_config(pool, POOL_META_NAME).await {
Ok(data) => {
if data.is_empty() {
return Ok(());
@@ -215,7 +228,7 @@ impl PoolMeta {
if let Some(pool) = self.pools.get_mut(idx) {
if let Some(ref info) = pool.decommission {
if !info.complete && !info.failed && !info.canceled {
return Err(Error::msg("DecommissionAlreadyRunning"));
return Err(Error::new(StorageError::DecommissionAlreadyRunning));
}
}
@@ -289,7 +302,10 @@ impl PoolMeta {
}
pub fn track_current_bucket_object(&mut self, idx: usize, bucket: String, object: String) {
self.pools.get(idx).is_some_and(|v| v.decommission.is_some());
if !self.pools.get(idx).is_some_and(|v| v.decommission.is_some()) {
return;
}
if let Some(pool) = self.pools.get_mut(idx) {
if let Some(info) = pool.decommission.as_mut() {
info.object = object;
@@ -305,7 +321,7 @@ impl PoolMeta {
let now = OffsetDateTime::now_utc();
if now.unix_timestamp() - self.pools[idx].last_update.unix_timestamp() > duration.whole_seconds() as i64 {
if now.unix_timestamp() - self.pools[idx].last_update.unix_timestamp() > duration.whole_seconds() {
self.pools[idx].last_update = now;
self.save(pools).await?;
@@ -314,6 +330,96 @@ impl PoolMeta {
Ok(false)
}
#[allow(dead_code)]
pub fn validate(&self, pools: Vec<Arc<Sets>>) -> Result<bool> {
struct PoolInfo {
position: usize,
completed: bool,
decom_started: bool,
}
let mut remembered_pools = HashMap::new();
for (idx, pool) in self.pools.iter().enumerate() {
let mut complete = false;
let mut decom_started = false;
if let Some(decommission) = &pool.decommission {
if decommission.complete {
complete = true;
}
decom_started = true;
}
remembered_pools.insert(
pool.cmd_line.clone(),
PoolInfo {
position: idx,
completed: complete,
decom_started,
},
);
}
let mut specified_pools = HashMap::new();
for (idx, pool) in pools.iter().enumerate() {
specified_pools.insert(pool.endpoints.cmd_line.clone(), idx);
}
let mut update = false;
// 检查指定的池是否需要从已退役的池中移除。
for k in specified_pools.keys() {
if let Some(pi) = remembered_pools.get(k) {
if pi.completed {
error!(
"pool({}) = {} is decommissioned, please remove from server command line",
pi.position + 1,
k
);
// return Err(Error::msg(format!(
// "pool({}) = {} is decommissioned, please remove from server command line",
// pi.position + 1,
// k
// )));
}
} else {
// 如果之前记住的池不再存在,允许更新,因为可能是添加了一个新池。
update = true;
}
}
if specified_pools.len() == remembered_pools.len() {
for (k, pi) in remembered_pools.iter() {
if let Some(pos) = specified_pools.get(k) {
if *pos != pi.position {
update = true; // 池的顺序发生了变化,允许更新。
}
}
}
}
if !update {
update = specified_pools.len() != remembered_pools.len();
}
Ok(update)
}
pub fn return_resumable_pools(&self) -> Vec<PoolStatus> {
let mut new_pools = Vec::new();
for pool in &self.pools {
if let Some(decommission) = &pool.decommission {
if decommission.complete || decommission.canceled {
// 不需要恢复的情况:
// - 退役已完成
// - 退役已取消
continue;
}
// 其他情况需要恢复
new_pools.push(pool.clone());
}
}
new_pools
}
}
fn path2_bucket_object(name: &str) -> (String, String) {
@@ -543,7 +649,208 @@ impl ECStore {
Ok(())
}
async fn decommission_pool(&self, rx: B_Receiver<bool>, idx: usize, pool: Arc<Sets>, bi: DecomBucketInfo) -> Result<()> {
#[allow(unused_assignments)]
async fn decommission_entry(
self: &Arc<Self>,
idx: usize,
entry: MetaCacheEntry,
bucket: String,
set: Arc<SetDisks>,
wk: Arc<Workers>,
rcfg: Option<String>,
) {
wk.give().await;
if entry.is_dir() {
return;
}
let mut fivs = match entry.file_info_versions(&bucket) {
Ok(f) => f,
Err(err) => {
error!("decommission_pool: file_info_versions err {:?}", &err);
return;
}
};
fivs.versions.sort_by(|a, b| b.mod_time.cmp(&a.mod_time));
let mut decommissioned: usize = 0;
let expired: usize = 0;
for version in fivs.versions.iter() {
// TODO: filterLifecycle
let remaining_versions = fivs.versions.len() - expired;
if version.deleted && remaining_versions == 1 && rcfg.is_none() {
//
decommissioned += 1;
info!("decommission_pool: DELETE marked object with no other non-current versions will be skipped");
continue;
}
let version_id = version.version_id.map(|v| v.to_string());
let mut ignore = false;
let mut failure = false;
let mut error = None;
if version.deleted {
// TODO: other params
if let Err(err) = self
.delete_object(
bucket.as_str(),
&version.name,
ObjectOptions {
versioned: true,
version_id: version_id.clone(),
mod_time: version.mod_time,
src_pool_idx: idx,
data_movement: true,
delete_marker: true,
skip_decommissioned: true,
..Default::default()
},
)
.await
{
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
ignore = true;
continue;
}
failure = true;
error = Some(err)
}
{
self.pool_meta.write().await.count_item(idx, 0, failure);
}
if !failure {
decommissioned += 1;
}
info!(
"decommission_pool: DecomCopyDeleteMarker {} {} {:?} {:?}",
&bucket, &version.name, &version_id, error
);
continue;
}
for _i in 0..3 {
if version.is_remote() {
// TODO: DecomTieredObject
}
let bucket = bucket.clone();
let rd = match set
.get_object_reader(
bucket.as_str(),
&encode_dir_object(&version.name),
None,
HeaderMap::new(),
&ObjectOptions {
version_id: version_id.clone(),
no_lock: true,
..Default::default()
},
)
.await
{
Ok(rd) => rd,
Err(err) => {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) {
ignore = true;
break;
}
if !ignore {
//
if bucket == RUSTFS_META_BUCKET && version.name.contains(DATA_USAGE_CACHE_NAME) {
ignore = true;
error!("decommission_pool: ignore data usage cache {}", &version.name);
break;
}
}
failure = true;
error!("decommission_pool: get_object_reader err {:?}", &err);
continue;
}
};
if let Err(err) = self.clone().decommission_object(idx, bucket, rd).await {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) {
ignore = true;
break;
}
failure = true;
error!("decommission_pool: decommission_object err {:?}", &err);
continue;
}
failure = false;
break;
}
if ignore {
info!("decommission_pool: ignore {}", &version.name);
continue;
}
{
self.pool_meta.write().await.count_item(idx, decommissioned, failure);
}
if failure {
break;
}
decommissioned += 1;
}
if decommissioned == fivs.versions.len() {
if let Err(err) = set
.delete_object(
bucket.as_str(),
&encode_dir_object(&entry.name),
ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
..Default::default()
},
)
.await
{
error!("decommission_pool: delete_object err {:?}", &err);
}
}
{
let mut pool_meta = self.pool_meta.write().await;
pool_meta.track_current_bucket_object(idx, bucket.clone(), entry.name.clone());
let ok = pool_meta
.update_after(idx, self.pools.clone(), Duration::seconds(30))
.await
.unwrap_or_default();
if ok {
// TODO: ReloadPoolMeta
}
}
}
async fn decommission_pool(
self: &Arc<Self>,
rx: B_Receiver<bool>,
idx: usize,
pool: Arc<Sets>,
bi: DecomBucketInfo,
) -> Result<()> {
let wk = Workers::new(pool.disk_set.len() * 2).map_err(|v| Error::from_string(v))?;
// let mut vc = None;
@@ -558,226 +865,42 @@ impl ECStore {
// TODO: ReplicationConfig
}
for (idx, set) in pool.disk_set.iter().enumerate() {
let set = set.clone();
let wk_clone = wk.clone();
let bucket = bi.name.clone();
let rcfg = rcfg.clone();
let decommission_entry = |entry: MetaCacheEntry| async move {
wk_clone.give().await;
if entry.is_dir() {
return;
}
let mut fivs = match entry.file_info_versions(&bucket) {
Ok(f) => f,
Err(err) => {
error!("decommission_pool: file_info_versions err {:?}", &err);
return;
}
};
fivs.versions.sort_by(|a, b| b.mod_time.cmp(&a.mod_time));
let mut decommissioned: usize = 0;
let mut expired: usize = 0;
for version in fivs.versions.iter() {
// TODO: filterLifecycle
let remaining_versions = fivs.versions.len() - expired;
if version.deleted && remaining_versions == 1 && rcfg.is_none() {
//
decommissioned += 1;
info!("decommission_pool: DELETE marked object with no other non-current versions will be skipped");
continue;
}
let version_id = version.version_id.map(|v| v.to_string());
let mut ignore = false;
let mut failure = false;
let mut error = None;
if version.deleted {
// TODO: other params
if let Err(err) = self
.delete_object(
bucket.as_str(),
&version.name,
ObjectOptions {
versioned: true,
version_id: version_id.clone(),
mod_time: version.mod_time,
src_pool_idx: idx,
data_movement: true,
delete_marker: true,
skip_decommissioned: true,
..Default::default()
},
)
.await
{
if is_err_object_not_found(&err)
|| is_err_version_not_found(&err)
|| is_err_data_movement_overwrite(&err)
{
//
ignore = true;
continue;
}
failure = true;
error = Some(err)
}
{
self.pool_meta.write().await.count_item(idx, 0, failure);
}
if !failure {
decommissioned += 1;
}
info!(
"decommission_pool: DecomCopyDeleteMarker {} {} {:?} {:?}",
&bucket, &version.name, &version_id, error
);
continue;
}
for _i in 0..3 {
if version.is_remote() {
// TODO: DecomTieredObject
}
let bucket = bucket.clone();
let rd = match set
.get_object_reader(
bucket.as_str(),
&encode_dir_object(&version.name),
None,
HeaderMap::new(),
&ObjectOptions {
version_id: version_id.clone(),
no_lock: true,
..Default::default()
},
)
.await
{
Ok(rd) => rd,
Err(err) => {
if is_err_object_not_found(&err) || is_err_version_not_found(&err) {
ignore = true;
break;
}
if !ignore {
//
if bucket == RUSTFS_META_BUCKET && version.name.contains(DATA_USAGE_CACHE_NAME) {
ignore = true;
error!("decommission_pool: ignore data usage cache {}", &version.name);
break;
}
}
failure = true;
error!("decommission_pool: get_object_reader err {:?}", &err);
continue;
}
};
if let Err(err) = self.decommission_object(idx, bucket, rd).await {
if is_err_object_not_found(&err)
|| is_err_version_not_found(&err)
|| is_err_data_movement_overwrite(&err)
{
ignore = true;
break;
}
failure = true;
error!("decommission_pool: decommission_object err {:?}", &err);
continue;
}
failure = false;
break;
}
if ignore {
info!("decommission_pool: ignore {}", &version.name);
continue;
}
{
self.pool_meta.write().await.count_item(idx, decommissioned, failure);
}
if failure {
break;
}
decommissioned += 1;
}
if decommissioned == fivs.versions.len() {
if let Err(err) = set
.delete_object(
bucket.as_str(),
&encode_dir_object(&entry.name),
ObjectOptions {
delete_prefix: true,
delete_prefix_object: true,
..Default::default()
},
)
.await
{
error!("decommission_pool: delete_object err {:?}", &err);
}
}
{
let mut pool_meta = self.pool_meta.write().await;
pool_meta.track_current_bucket_object(idx, bucket.clone(), entry.name.clone());
let ok = pool_meta
.update_after(idx, self.pools.clone(), Duration::seconds(30))
.await
.unwrap_or_default();
if ok {
// TODO: ReloadPoolMeta
}
}
};
for (set_idx, set) in pool.disk_set.iter().enumerate() {
wk.clone().take().await;
// let set = set.clone();
// let mutrx = rx.resubscribe();
// tokio::spawn(async move {
// loop {
// if rx.try_recv().is_ok() {
// break;
// }
let decommission_entry: ListCallback = Arc::new({
let this = Arc::clone(self);
let bucket = bi.name.clone();
let wk = wk.clone();
let set = set.clone();
let rcfg = rcfg.clone();
move |entry: MetaCacheEntry| {
let this = this.clone();
let bucket = bucket.clone();
let wk = wk.clone();
let set = set.clone();
let rcfg = rcfg.clone();
Box::pin(async move { this.decommission_entry(idx, entry, bucket, set, wk, rcfg).await })
}
});
// let decommission_entry = Arc::new(move |entry: MetaCacheEntry| {
// let decommission_entry = decommission_entry.clone();
// Box::pin(async move {
// decommission_entry(entry).await;
// }) as Pin<Box<dyn Future<Output = ()> + Send>>
// });
// set.list_objects_to_decommission(rx, bi, decommission_entry).await.unwrap();
// }
// //
// });
let set = set.clone();
let mut rx = rx.resubscribe();
let bi = bi.clone();
let set_id = set_idx;
tokio::spawn(async move {
loop {
if rx.try_recv().is_ok() {
break;
}
if let Err(err) = set
.list_objects_to_decommission(rx.resubscribe(), bi.clone(), decommission_entry.clone())
.await
{
error!("decommission_pool: list_objects_to_decommission {} err {:?}", set_id, &err);
}
}
});
}
wk.wait().await;
@@ -785,7 +908,7 @@ impl ECStore {
Ok(())
}
async fn do_decommission_in_routine(&self, rx: B_Receiver<bool>, idx: usize) {
pub async fn do_decommission_in_routine(self: &Arc<Self>, rx: B_Receiver<bool>, idx: usize) {
if let Err(err) = self.decommission_in_background(rx, idx).await {
error!("decom err {:?}", &err);
if let Err(er) = self.decommission_failed(idx).await {
@@ -850,7 +973,7 @@ impl ECStore {
Ok(())
}
async fn decommission_in_background(&self, rx: B_Receiver<bool>, idx: usize) -> Result<()> {
async fn decommission_in_background(self: &Arc<Self>, rx: B_Receiver<bool>, idx: usize) -> Result<()> {
let pool = self.pools[idx].clone();
let pending = {
@@ -973,34 +1096,151 @@ impl ECStore {
Ok(ret)
}
async fn decommission_object(&self, id: usize, bucket: String, rd: GetObjectReader) -> Result<()> {
let object_info = rd.object_info;
let actual_size = object_info.get_actual_size()?;
async fn decommission_object(self: Arc<Self>, pool_idx: usize, bucket: String, rd: GetObjectReader) -> Result<()> {
let object_info = rd.object_info.clone();
if object_info.is_multipart() {}
// TODO: check : use size or actual_size ?
let _actual_size = object_info.get_actual_size()?;
unimplemented!()
if object_info.is_multipart() {
let res = match self
.new_multipart_upload(
&bucket,
&object_info.name,
&ObjectOptions {
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
user_defined: object_info.user_defined.clone(),
src_pool_idx: pool_idx,
data_movement: true,
..Default::default()
},
)
.await
{
Ok(res) => res,
Err(err) => {
error!("decommission_object: new_multipart_upload err {:?}", &err);
return Err(err);
}
};
// TODO: defer abort_multipart_upload
defer!(|| async {
if let Err(err) = self
.abort_multipart_upload(&bucket, &object_info.name, &res.upload_id, &ObjectOptions::default())
.await
{
error!("decommission_object: abort_multipart_upload err {:?}", &err);
}
});
let mut parts = vec![CompletePart::default(); object_info.parts.len()];
let mut reader = rd.stream;
for (i, part) in object_info.parts.iter().enumerate() {
// 每次从reader中读取一个part上传
let mut data = PutObjReader::new(reader, part.size);
let pi = match self
.put_object_part(
&bucket,
&object_info.name,
&res.upload_id,
part.number,
&mut data,
&ObjectOptions {
preserve_etag: part.e_tag.clone(),
..Default::default()
},
)
.await
{
Ok(pi) => pi,
Err(err) => {
error!("decommission_object: put_object_part err {:?}", &err);
return Err(err);
}
};
parts[i] = CompletePart {
part_num: pi.part_num,
e_tag: pi.etag,
};
// 把reader所有权拿回来?
reader = data.stream;
}
if let Err(err) = self
.complete_multipart_upload(
&bucket,
&object_info.name,
&res.upload_id,
parts,
&ObjectOptions {
data_movement: true,
mod_time: object_info.mod_time,
..Default::default()
},
)
.await
{
error!("decommission_object: complete_multipart_upload err {:?}", &err);
return Err(err);
}
return Ok(());
}
let mut data = PutObjReader::new(rd.stream, object_info.size);
if let Err(err) = self
.put_object(
&bucket,
&object_info.name,
&mut data,
&ObjectOptions {
src_pool_idx: pool_idx,
data_movement: true,
version_id: object_info.version_id.as_ref().map(|v| v.to_string()),
mod_time: object_info.mod_time,
user_defined: object_info.user_defined.clone(),
preserve_etag: object_info.etag.clone(),
..Default::default()
},
)
.await
{
error!("decommission_object: put_object err {:?}", &err);
return Err(err);
}
Ok(())
}
}
// impl Fn(MetaCacheEntry) -> impl Future<Output = Result<(), Error>>
type Listcb = Arc<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type ListCallback = Arc<dyn Fn(MetaCacheEntry) -> BoxFuture<'static, ()> + Send + Sync + 'static>;
impl SetDisks {
//
async fn list_objects_to_decommission(
self: &Arc<Self>,
mut rx: B_Receiver<bool>,
rx: B_Receiver<bool>,
bucket_info: DecomBucketInfo,
func: Listcb,
func: ListCallback,
) -> Result<()> {
let (disks, _) = self.get_online_disks_with_healing(false).await;
if disks.is_empty() {
return Err(Error::msg("errNoDiskAvailable"));
}
let listing_quorum = (self.set_drive_count + 1) / 2;
let listing_quorum = self.set_drive_count.div_ceil(2);
let resolver = MetadataResolutionParams {
dir_quorum: listing_quorum,
@@ -1019,13 +1259,11 @@ impl SetDisks {
min_disks: listing_quorum,
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<Error>]| {
let resolver = resolver.clone();
let func =
func.clone() as Arc<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
Box::pin(async move {
if let Ok(Some(entry)) = entries.resolve(resolver) {
func(entry);
}
})
if let Ok(Some(entry)) = entries.resolve(resolver) {
func(entry)
} else {
Box::pin(async {})
}
})),
..Default::default()
},
@@ -1061,33 +1299,3 @@ fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::Storage
}
capacity
}
#[test]
fn test_pool_meta() -> Result<()> {
let meta = PoolMeta::new(vec![]);
let mut data = Vec::new();
data.write_u16::<LittleEndian>(POOL_META_FORMAT).unwrap();
data.write_u16::<LittleEndian>(POOL_META_VERSION).unwrap();
let mut buf = Vec::new();
meta.serialize(&mut Serializer::new(&mut buf))?;
data.write_all(&buf)?;
let format = LittleEndian::read_u16(&data[0..2]);
if format != POOL_META_FORMAT {
return Err(Error::msg(format!("PoolMeta: unknown format: {}", format)));
}
let version = LittleEndian::read_u16(&data[2..4]);
if version != POOL_META_VERSION {
return Err(Error::msg(format!("PoolMeta: unknown version: {}", version)));
}
let mut buf = Deserializer::new(Cursor::new(&data[4..]));
let de_meta: PoolMeta = Deserialize::deserialize(&mut buf).unwrap();
if de_meta.version != POOL_META_VERSION {
return Err(Error::msg(format!("unexpected PoolMeta version: {}", de_meta.version)));
}
println!("meta: {:?}", de_meta);
Ok(())
}

1038
ecstore/src/rebalance.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,15 +1,15 @@
#![allow(clippy::map_entry)]
use crate::bucket::metadata_sys::{self, init_bucket_metadata_sys, set_bucket_metadata};
use crate::bucket::metadata_sys::{self, set_bucket_metadata};
use crate::bucket::utils::{check_valid_bucket_name, check_valid_bucket_name_strict, is_meta_bucketname};
use crate::config::storageclass;
use crate::config::GLOBAL_StorageClass;
use crate::config::{self, storageclass, GLOBAL_ConfigSys};
use crate::disk::endpoint::{Endpoint, EndpointType};
use crate::disk::{DiskAPI, DiskInfo, DiskInfoOptions, MetaCacheEntry};
use crate::error::clone_err;
use crate::global::{
is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION,
DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES,
get_global_endpoints, is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE,
DISK_FILL_FRACTION, DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES,
};
use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT};
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
@@ -18,10 +18,11 @@ use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::new_object_layer_fn;
use crate::notification_sys::get_global_notification_sys;
use crate::pools::PoolMeta;
use crate::rebalance::RebalanceMeta;
use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO};
use crate::store_err::{
is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found,
to_object_err, StorageError,
is_err_bucket_exists, is_err_decommission_already_running, is_err_invalid_upload_id, is_err_object_not_found,
is_err_read_quorum, is_err_version_not_found, to_object_err, StorageError,
};
use crate::store_init::ec_drives_no_config;
use crate::utils::crypto::base64_decode;
@@ -40,6 +41,7 @@ use crate::{
},
store_init, utils,
};
use common::error::{Error, Result};
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port};
use futures::future::join_all;
@@ -59,6 +61,7 @@ use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::{interval, sleep};
use tracing::error;
use tracing::{debug, info};
use uuid::Uuid;
@@ -73,6 +76,7 @@ pub struct ECStore {
pub peer_sys: S3PeerSys,
// pub local_disks: Vec<DiskStore>,
pub pool_meta: RwLock<PoolMeta>,
pub rebalance_meta: RwLock<Option<RebalanceMeta>>,
pub decommission_cancelers: Vec<Option<usize>>,
}
@@ -209,7 +213,7 @@ impl ECStore {
}
let peer_sys = S3PeerSys::new(&endpoint_pools);
let mut pool_meta = PoolMeta::new(pools.clone());
let mut pool_meta = PoolMeta::new(&pools, &PoolMeta::default());
pool_meta.dont_save = true;
let decommission_cancelers = vec![None; pools.len()];
@@ -219,35 +223,96 @@ impl ECStore {
pools,
peer_sys,
pool_meta: pool_meta.into(),
rebalance_meta: RwLock::new(None),
decommission_cancelers,
});
set_object_layer(ec.clone()).await;
if let Some(dep_id) = deployment_id {
set_global_deployment_id(dep_id);
}
let wait_sec = 5;
loop {
if let Err(err) = ec.init().await {
error!("init err: {}", err);
error!("retry after {} second", wait_sec);
sleep(Duration::from_secs(wait_sec)).await;
continue;
}
break;
}
set_object_layer(ec.clone()).await;
Ok(ec)
}
pub async fn init(api: Arc<ECStore>) -> Result<()> {
config::init();
GLOBAL_ConfigSys.init(api.clone()).await?;
pub async fn init(self: &Arc<Self>) -> Result<()> {
if self.load_rebalance_meta().await.is_ok() {
self.start_rebalance().await;
}
let buckets_list = api
.list_bucket(&BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.map_err(|err| Error::from_string(err.to_string()))?;
let mut meta = PoolMeta::default();
meta.load(self.pools[0].clone(), self.pools.clone()).await?;
let update = meta.validate(self.pools.clone())?;
let buckets = buckets_list.iter().map(|v| v.name.clone()).collect();
if update {
{
let mut pool_meta = self.pool_meta.write().await;
*pool_meta = meta.clone();
}
} else {
let new_meta = PoolMeta::new(&self.pools, &meta);
new_meta.save(self.pools.clone()).await?;
{
let mut pool_meta = self.pool_meta.write().await;
*pool_meta = new_meta;
}
}
// FIXME:
let pools = meta.return_resumable_pools();
let mut pool_indeces = Vec::with_capacity(pools.len());
init_bucket_metadata_sys(api.clone(), buckets).await;
let endpoints = get_global_endpoints();
for p in pools.iter() {
if let Some(idx) = endpoints.get_pool_idx(&p.cmd_line) {
pool_indeces.push(idx);
} else {
return Err(Error::msg(format!(
"unexpected state present for decommission status pool({}) not found",
p.cmd_line
)));
}
}
if !pool_indeces.is_empty() {
let idx = pool_indeces[0];
if endpoints.as_ref()[idx].endpoints.as_ref()[0].is_local {
let (_tx, rx) = broadcast::channel(1);
let store = self.clone();
tokio::spawn(async move {
// wait 3 minutes for cluster init
tokio::time::sleep(Duration::from_secs(60 * 3)).await;
if let Err(err) = store.decommission(rx.resubscribe(), pool_indeces.clone()).await {
if is_err_decommission_already_running(&err) {
for i in pool_indeces.iter() {
store.do_decommission_in_routine(rx.resubscribe(), *i).await;
}
return;
}
error!("store init decommission err: {}", err);
// TODO: check config err
}
});
}
}
Ok(())
}
@@ -942,7 +1007,7 @@ impl ECStore {
pub async fn reload_pool_meta(&self) -> Result<()> {
let mut meta = PoolMeta::default();
meta.load().await?;
meta.load(self.pools[0].clone(), self.pools.clone()).await?;
let mut pool_meta = self.pool_meta.write().await;
*pool_meta = meta;
@@ -1909,7 +1974,7 @@ impl StorageAPI for ECStore {
async fn abort_multipart_upload(&self, bucket: &str, object: &str, upload_id: &str, opts: &ObjectOptions) -> Result<()> {
check_abort_multipart_args(bucket, object, upload_id)?;
// TODO: defer
// TODO: defer DeleteUploadID
if self.single_pool() {
return self.pools[0].abort_multipart_upload(bucket, object, upload_id, opts).await;

View File

@@ -287,7 +287,7 @@ pub struct ObjectPartInfo {
pub size: usize,
pub actual_size: usize, // 源数据大小
pub mod_time: Option<OffsetDateTime>,
// pub index: Option<Vec<u8>>,
// pub index: Option<Vec<u8>>, // TODO: ???
// pub checksums: Option<std::collections::HashMap<String, String>>,
}
@@ -635,7 +635,7 @@ pub struct MultipartUploadResult {
pub upload_id: String,
}
#[derive(Debug)]
#[derive(Debug, Default, Clone)]
pub struct PartInfo {
pub part_num: usize,
pub last_mod: Option<OffsetDateTime>,
@@ -643,7 +643,7 @@ pub struct PartInfo {
pub etag: Option<String>,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct CompletePart {
pub part_num: usize,
pub e_tag: Option<String>,

View File

@@ -80,6 +80,8 @@ pub enum StorageError {
#[error("Decommission not started")]
DecommissionNotStarted,
#[error("Decommission already running")]
DecommissionAlreadyRunning,
#[error("DoneForNow")]
DoneForNow,
@@ -115,6 +117,7 @@ impl StorageError {
StorageError::InvalidPart(_, _, _) => 0x19,
StorageError::VolumeNotFound(_) => 0x20,
StorageError::DoneForNow => 0x21,
StorageError::DecommissionAlreadyRunning => 0x22,
}
}
@@ -151,6 +154,7 @@ impl StorageError {
0x19 => Some(StorageError::InvalidPart(Default::default(), Default::default(), Default::default())),
0x20 => Some(StorageError::VolumeNotFound(Default::default())),
0x21 => Some(StorageError::DoneForNow),
0x22 => Some(StorageError::DecommissionAlreadyRunning),
_ => None,
}
}
@@ -241,6 +245,14 @@ pub fn to_object_err(err: Error, params: Vec<&str>) -> Error {
err
}
pub fn is_err_decommission_already_running(err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<StorageError>() {
matches!(e, StorageError::DecommissionAlreadyRunning)
} else {
false
}
}
pub fn is_err_data_movement_overwrite(err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<StorageError>() {
matches!(e, StorageError::DataMovementOverwriteErr(_, _, _))

View File

@@ -50,6 +50,7 @@ use tracing::{error, info, warn};
pub mod group;
pub mod policys;
pub mod pools;
pub mod rebalance;
pub mod service_account;
pub mod sts;
pub mod trace;
@@ -735,40 +736,6 @@ impl Operation for BackgroundHealStatusHandler {
}
}
pub struct RebalanceStart {}
#[async_trait::async_trait]
impl Operation for RebalanceStart {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RebalanceStart");
return Err(s3_error!(NotImplemented));
}
}
// RebalanceStatus
pub struct RebalanceStatus {}
#[async_trait::async_trait]
impl Operation for RebalanceStatus {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RebalanceStatus");
return Err(s3_error!(NotImplemented));
}
}
// RebalanceStop
pub struct RebalanceStop {}
#[async_trait::async_trait]
impl Operation for RebalanceStop {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RebalanceStop");
return Err(s3_error!(NotImplemented));
}
}
#[cfg(test)]
mod test {
use ecstore::heal::heal_commands::HealOpts;

View File

@@ -0,0 +1,238 @@
use ecstore::{
new_object_layer_fn,
notification_sys::get_global_notification_sys,
rebalance::{DiskStat, RebalSaveOpt},
store_api::BucketOptions,
StorageAPI,
};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Request, S3Response, S3Result};
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime};
use tracing::warn;
use crate::admin::router::Operation;
use ecstore::rebalance::RebalanceMeta;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RebalanceResp {
pub id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RebalPoolProgress {
#[serde(rename = "objects")]
pub num_objects: u64,
#[serde(rename = "versions")]
pub num_versions: u64,
#[serde(rename = "bytes")]
pub bytes: u64,
#[serde(rename = "bucket")]
pub bucket: String,
#[serde(rename = "object")]
pub object: String,
#[serde(rename = "elapsed")]
pub elapsed: Duration,
#[serde(rename = "eta")]
pub eta: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RebalancePoolStatus {
#[serde(rename = "id")]
pub id: usize, // Pool index (zero-based)
#[serde(rename = "status")]
pub status: String, // Active if rebalance is running, empty otherwise
#[serde(rename = "used")]
pub used: f64, // Percentage used space
#[serde(rename = "progress")]
pub progress: Option<RebalPoolProgress>, // None when rebalance is not running
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RebalanceAdminStatus {
pub id: String, // Identifies the ongoing rebalance operation by a UUID
#[serde(rename = "pools")]
pub pools: Vec<RebalancePoolStatus>, // Contains all pools, including inactive
#[serde(rename = "stoppedAt")]
pub stopped_at: Option<SystemTime>, // Optional timestamp when rebalance was stopped
}
pub struct RebalanceStart {}
#[async_trait::async_trait]
impl Operation for RebalanceStart {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RebalanceStart");
let Some(store) = new_object_layer_fn() else {
return Err(s3_error!(InternalError, "Not init"));
};
if store.pools.len() == 1 {
return Err(s3_error!(NotImplemented));
}
if store.is_decommission_running().await {
return Err(s3_error!(
InternalError,
"Rebalance cannot be started, decommission is already in progress"
));
}
if store.is_rebalance_started().await {
return Err(s3_error!(InternalError, "Rebalance already in progress"));
}
let bucket_infos = store
.list_bucket(&BucketOptions::default())
.await
.map_err(|e| s3_error!(InternalError, "Failed to list buckets: {}", e))?;
let buckets: Vec<String> = bucket_infos.into_iter().map(|bucket| bucket.name).collect();
let id = match store.init_rebalance_meta(buckets).await {
Ok(id) => id,
Err(e) => {
return Err(s3_error!(InternalError, "Failed to init rebalance meta: {}", e));
}
};
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.load_rebalance_meta(true).await;
}
let resp = RebalanceResp { id };
let data = serde_json::to_string(&resp).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
// RebalanceStatus
pub struct RebalanceStatus {}
#[async_trait::async_trait]
impl Operation for RebalanceStatus {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RebalanceStatus");
let Some(store) = new_object_layer_fn() else {
return Err(s3_error!(InternalError, "Not init"));
};
let mut meta = RebalanceMeta::new();
meta.load(store.pools[0].clone())
.await
.map_err(|e| s3_error!(InternalError, "Failed to load rebalance meta: {}", e))?;
// Compute disk usage percentage
let si = store.storage_info().await;
let mut disk_stats = vec![DiskStat::default(); store.pools.len()];
for disk in si.disks.iter() {
if disk.pool_index < 0 || disk_stats.len() <= disk.pool_index as usize {
continue;
}
disk_stats[disk.pool_index as usize].available_space += disk.available_space;
disk_stats[disk.pool_index as usize].total_space += disk.total_space;
}
let stop_time = meta.stopped_at;
let mut admin_status = RebalanceAdminStatus {
id: meta.id.clone(),
stopped_at: meta.stopped_at,
pools: vec![RebalancePoolStatus::default(); meta.pool_stats.len()],
};
for (i, ps) in meta.pool_stats.iter().enumerate() {
admin_status.pools[i] = RebalancePoolStatus {
id: i,
status: ps.info.status.to_string(),
used: (disk_stats[i].total_space - disk_stats[i].available_space) as f64 / disk_stats[i].total_space as f64,
progress: None,
};
if !ps.participating {
continue;
}
// Calculate total bytes to be rebalanced
let total_bytes_to_rebal = ps.init_capacity as f64 * meta.percent_free_goal - ps.init_free_space as f64;
let elapsed = if let Some(start_time) = ps.info.start_time {
SystemTime::now()
.duration_since(start_time)
.map_err(|e| s3_error!(InternalError, "Failed to calculate elapsed time: {}", e))?
} else {
return Err(s3_error!(InternalError, "Start time is not available"));
};
let eta = if ps.bytes > 0 {
Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_secs_f64() / ps.bytes as f64)
} else {
Duration::ZERO
};
let stop_time = ps.info.end_time.unwrap_or(stop_time.unwrap_or(SystemTime::now()));
let elapsed = if ps.info.end_time.is_some() || meta.stopped_at.is_some() {
stop_time
.duration_since(ps.info.start_time.unwrap_or(stop_time))
.unwrap_or_default()
} else {
elapsed
};
admin_status.pools[i].progress = Some(RebalPoolProgress {
num_objects: ps.num_objects,
num_versions: ps.num_versions,
bytes: ps.bytes,
bucket: ps.bucket.clone(),
object: ps.object.clone(),
elapsed,
eta,
});
}
let data =
serde_json::to_string(&admin_status).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
}
}
// RebalanceStop
pub struct RebalanceStop {}
#[async_trait::async_trait]
impl Operation for RebalanceStop {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle RebalanceStop");
let Some(store) = new_object_layer_fn() else {
return Err(s3_error!(InternalError, "Not init"));
};
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.stop_rebalance().await;
}
store
.save_rebalance_stats(0, RebalSaveOpt::StoppedAt)
.await
.map_err(|e| s3_error!(InternalError, "Failed to stop rebalance: {}", e))?;
if let Some(notification_sys) = get_global_notification_sys() {
notification_sys.load_rebalance_meta(true).await;
}
return Err(s3_error!(NotImplemented));
}
}

View File

@@ -6,7 +6,7 @@ pub mod utils;
use common::error::Result;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::{
group, policys, pools,
group, policys, pools, rebalance,
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
sts, user,
};
@@ -94,17 +94,17 @@ pub fn make_admin_route() -> Result<impl S3Route> {
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/rebalance/start").as_str(),
AdminOperation(&handlers::RebalanceStart {}),
AdminOperation(&rebalance::RebalanceStart {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/rebalance/status").as_str(),
AdminOperation(&handlers::RebalanceStatus {}),
AdminOperation(&rebalance::RebalanceStatus {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/rebalance/stop").as_str(),
AdminOperation(&handlers::RebalanceStop {}),
AdminOperation(&rebalance::RebalanceStop {}),
)?;
// Some APIs are only available in EC mode

View File

@@ -2360,23 +2360,46 @@ impl Node for NodeService {
}
async fn stop_rebalance(&self, _request: Request<StopRebalanceRequest>) -> Result<Response<StopRebalanceResponse>, Status> {
let Some(_store) = new_object_layer_fn() else {
let Some(store) = new_object_layer_fn() else {
return Ok(tonic::Response::new(StopRebalanceResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
// todo
// store.stop_rebalance().await;
todo!()
let _ = store.stop_rebalance().await;
Ok(tonic::Response::new(StopRebalanceResponse {
success: true,
error_info: None,
}))
}
async fn load_rebalance_meta(
&self,
_request: Request<LoadRebalanceMetaRequest>,
request: Request<LoadRebalanceMetaRequest>,
) -> Result<Response<LoadRebalanceMetaResponse>, Status> {
todo!()
let Some(store) = new_object_layer_fn() else {
return Ok(tonic::Response::new(LoadRebalanceMetaResponse {
success: false,
error_info: Some("errServerNotInitialized".to_string()),
}));
};
let LoadRebalanceMetaRequest { start_rebalance } = request.into_inner();
store.load_rebalance_meta().await.map_err(|err| {
error!("load_rebalance_meta err {:?}", err);
Status::internal(err.to_string())
})?;
if start_rebalance {
let store = store.clone();
tokio::spawn(async move {
store.start_rebalance().await;
});
}
unimplemented!()
}
async fn load_transition_tier_config(

View File

@@ -21,8 +21,13 @@ use common::{
globals::set_global_addr,
};
use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::store_api::BucketOptions;
use ecstore::utils::net::{self, get_available_port};
use ecstore::StorageAPI;
use ecstore::{
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
@@ -379,11 +384,20 @@ async fn run(opt: config::Opt) -> Result<()> {
Error::from_string(err.to_string())
})?;
ECStore::init(store.clone()).await.map_err(|err| {
error!("ECStore init failed {:?}", &err);
Error::from_string(err.to_string())
})?;
debug!("init store success!");
ecconfig::init();
GLOBAL_ConfigSys.init(store.clone()).await?;
let buckets_list = store
.list_bucket(&BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.map_err(|err| Error::from_string(err.to_string()))?;
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
init_bucket_metadata_sys(store.clone(), buckets).await;
init_iam_sys(store.clone()).await?;

View File

@@ -51,6 +51,7 @@ pub fn to_s3_error(err: Error) -> S3Error {
object,
version_id
),
// extended
StorageError::ObjectExistsAsDirectory(bucket, object) => {
s3_error!(InvalidArgument, "Object exists on :{} as directory {}", bucket, object)
@@ -62,6 +63,7 @@ pub fn to_s3_error(err: Error) -> S3Error {
s3_error!(SlowDown, "Storage resources are insufficient for the write operation")
}
StorageError::DecommissionNotStarted => s3_error!(InvalidArgument, "Decommission Not Started"),
StorageError::DecommissionAlreadyRunning => s3_error!(InternalError, "Decommission already running"),
StorageError::VolumeNotFound(bucket) => {
s3_error!(NoSuchBucket, "bucket not found {}", bucket)