diff --git a/ecstore/src/config/com.rs b/ecstore/src/config/com.rs index d086f942..383359ea 100644 --- a/ecstore/src/config/com.rs +++ b/ecstore/src/config/com.rs @@ -94,7 +94,7 @@ pub async fn delete_config(api: Arc, file: &str) -> Result<()> } } -async fn save_config_with_opts(api: Arc, file: &str, data: Vec, opts: &ObjectOptions) -> Result<()> { +pub async fn save_config_with_opts(api: Arc, file: &str, data: Vec, opts: &ObjectOptions) -> Result<()> { let size = data.len(); let _ = api .put_object(RUSTFS_META_BUCKET, file, &mut PutObjReader::new(Box::new(Cursor::new(data)), size), opts) diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 14ea3dd0..55ecf320 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -21,6 +21,7 @@ pub mod peer; pub mod peer_rest_client; pub mod pools; mod quorum; +pub mod rebalance; pub mod set_disk; mod sets; pub mod store; diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index 017e346c..9bb73bc4 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -1,7 +1,7 @@ -use crate::endpoints::EndpointServerPools; use crate::global::get_global_endpoints; use crate::peer_rest_client::PeerRestClient; use crate::StorageAPI; +use crate::{endpoints::EndpointServerPools, new_object_layer_fn}; use common::error::{Error, Result}; use futures::future::join_all; use lazy_static::lazy_static; @@ -132,6 +132,41 @@ impl NotificationSys { } } } + + pub async fn load_rebalance_meta(&self, start: bool) { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().flatten() { + futures.push(client.load_rebalance_meta(start)); + } + + let results = join_all(futures).await; + for result in results { + if let Err(err) = result { + error!("notification load_rebalance_meta err {:?}", err); + } + } + } + + pub async fn stop_rebalance(&self) { + let Some(store) = new_object_layer_fn() else { + error!("stop_rebalance: not init"); + return; + }; + + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().flatten() { + futures.push(client.stop_rebalance()); + } + + let results = join_all(futures).await; + for result in results { + if let Err(err) = result { + error!("notification stop_rebalance err {:?}", err); + } + } + + let _ = store.stop_rebalance().await; + } } fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec { diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index 0f977d35..c3e2eed7 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -1,5 +1,5 @@ use crate::bucket::versioning_sys::BucketVersioningSys; -use crate::cache_value::metacache_set::{list_path_raw, AgreedFn, ListPathRawOptions, PartialFn}; +use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions}; use crate::config::com::{read_config, save_config, CONFIG_PREFIX}; use crate::config::error::ConfigError; use crate::disk::{MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; @@ -8,7 +8,9 @@ use crate::heal::heal_commands::HealOpts; use crate::new_object_layer_fn; use crate::notification_sys::get_global_notification_sys; use crate::set_disk::SetDisks; -use crate::store_api::{BucketOptions, GetObjectReader, MakeBucketOptions, ObjectIO, ObjectOptions, StorageAPI}; +use crate::store_api::{ + BucketOptions, CompletePart, GetObjectReader, MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI, +}; use crate::store_err::{ is_err_bucket_exists, is_err_data_movement_overwrite, is_err_object_not_found, is_err_version_not_found, StorageError, }; @@ -16,22 +18,20 @@ use crate::utils::path::{encode_dir_object, path_join, SLASH_SEPARATOR}; use crate::{sets::Sets, store::ECStore}; use ::workers::workers::Workers; use byteorder::{ByteOrder, LittleEndian, WriteBytesExt}; +use common::defer; use common::error::{Error, Result}; +use futures::future::BoxFuture; use http::HeaderMap; use rmp_serde::{Deserializer, Serializer}; -use serde::{de, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fmt::Display; -use std::future::Future; use std::io::{Cursor, Write}; use std::path::PathBuf; -use std::pin::Pin; -use std::rc; use std::sync::Arc; use time::{Duration, OffsetDateTime}; use tokio::sync::broadcast::Receiver as B_Receiver; use tracing::{error, info, warn}; -use uuid::Uuid; -use workers::workers; pub const POOL_META_NAME: &str = "pool.bin"; pub const POOL_META_FORMAT: u16 = 1; @@ -57,22 +57,37 @@ pub struct PoolMeta { } impl PoolMeta { - pub fn new(pools: Vec>) -> Self { - let mut status = Vec::with_capacity(pools.len()); + pub fn new(pools: &[Arc], prev_meta: &PoolMeta) -> Self { + let mut new_meta = Self { + version: POOL_META_VERSION, + pools: Vec::new(), + ..Default::default() + }; + for (idx, pool) in pools.iter().enumerate() { - status.push(PoolStatus { - id: idx, + let mut skip = false; + + for current_pool in prev_meta.pools.iter() { + if current_pool.cmd_line == pool.endpoints.cmd_line { + new_meta.pools.push(current_pool.clone()); + skip = true; + break; + } + } + + if skip { + continue; + } + + new_meta.pools.push(PoolStatus { cmd_line: pool.endpoints.cmd_line.clone(), + id: idx, last_update: OffsetDateTime::now_utc(), decommission: None, }); } - Self { - version: POOL_META_VERSION, - pools: status, - dont_save: false, - } + new_meta } pub fn is_suspended(&self, idx: usize) -> bool { @@ -83,10 +98,8 @@ impl PoolMeta { self.pools[idx].decommission.is_some() } - pub async fn load(&mut self) -> Result<()> { - let Some(store) = new_object_layer_fn() else { return Err(Error::msg("errServerNotInitialized")) }; - - let data = match read_config(store.clone(), POOL_META_NAME).await { + pub async fn load(&mut self, pool: Arc, _pools: Vec>) -> Result<()> { + let data = match read_config(pool, POOL_META_NAME).await { Ok(data) => { if data.is_empty() { return Ok(()); @@ -215,7 +228,7 @@ impl PoolMeta { if let Some(pool) = self.pools.get_mut(idx) { if let Some(ref info) = pool.decommission { if !info.complete && !info.failed && !info.canceled { - return Err(Error::msg("DecommissionAlreadyRunning")); + return Err(Error::new(StorageError::DecommissionAlreadyRunning)); } } @@ -289,7 +302,10 @@ impl PoolMeta { } pub fn track_current_bucket_object(&mut self, idx: usize, bucket: String, object: String) { - self.pools.get(idx).is_some_and(|v| v.decommission.is_some()); + if !self.pools.get(idx).is_some_and(|v| v.decommission.is_some()) { + return; + } + if let Some(pool) = self.pools.get_mut(idx) { if let Some(info) = pool.decommission.as_mut() { info.object = object; @@ -305,7 +321,7 @@ impl PoolMeta { let now = OffsetDateTime::now_utc(); - if now.unix_timestamp() - self.pools[idx].last_update.unix_timestamp() > duration.whole_seconds() as i64 { + if now.unix_timestamp() - self.pools[idx].last_update.unix_timestamp() > duration.whole_seconds() { self.pools[idx].last_update = now; self.save(pools).await?; @@ -314,6 +330,96 @@ impl PoolMeta { Ok(false) } + + #[allow(dead_code)] + pub fn validate(&self, pools: Vec>) -> Result { + struct PoolInfo { + position: usize, + completed: bool, + decom_started: bool, + } + + let mut remembered_pools = HashMap::new(); + for (idx, pool) in self.pools.iter().enumerate() { + let mut complete = false; + let mut decom_started = false; + if let Some(decommission) = &pool.decommission { + if decommission.complete { + complete = true; + } + decom_started = true; + } + remembered_pools.insert( + pool.cmd_line.clone(), + PoolInfo { + position: idx, + completed: complete, + decom_started, + }, + ); + } + + let mut specified_pools = HashMap::new(); + for (idx, pool) in pools.iter().enumerate() { + specified_pools.insert(pool.endpoints.cmd_line.clone(), idx); + } + + let mut update = false; + + // 检查指定的池是否需要从已退役的池中移除。 + for k in specified_pools.keys() { + if let Some(pi) = remembered_pools.get(k) { + if pi.completed { + error!( + "pool({}) = {} is decommissioned, please remove from server command line", + pi.position + 1, + k + ); + // return Err(Error::msg(format!( + // "pool({}) = {} is decommissioned, please remove from server command line", + // pi.position + 1, + // k + // ))); + } + } else { + // 如果之前记住的池不再存在,允许更新,因为可能是添加了一个新池。 + update = true; + } + } + + if specified_pools.len() == remembered_pools.len() { + for (k, pi) in remembered_pools.iter() { + if let Some(pos) = specified_pools.get(k) { + if *pos != pi.position { + update = true; // 池的顺序发生了变化,允许更新。 + } + } + } + } + + if !update { + update = specified_pools.len() != remembered_pools.len(); + } + + Ok(update) + } + + pub fn return_resumable_pools(&self) -> Vec { + let mut new_pools = Vec::new(); + for pool in &self.pools { + if let Some(decommission) = &pool.decommission { + if decommission.complete || decommission.canceled { + // 不需要恢复的情况: + // - 退役已完成 + // - 退役已取消 + continue; + } + // 其他情况需要恢复 + new_pools.push(pool.clone()); + } + } + new_pools + } } fn path2_bucket_object(name: &str) -> (String, String) { @@ -543,7 +649,208 @@ impl ECStore { Ok(()) } - async fn decommission_pool(&self, rx: B_Receiver, idx: usize, pool: Arc, bi: DecomBucketInfo) -> Result<()> { + #[allow(unused_assignments)] + async fn decommission_entry( + self: &Arc, + idx: usize, + entry: MetaCacheEntry, + bucket: String, + set: Arc, + wk: Arc, + rcfg: Option, + ) { + wk.give().await; + if entry.is_dir() { + return; + } + + let mut fivs = match entry.file_info_versions(&bucket) { + Ok(f) => f, + Err(err) => { + error!("decommission_pool: file_info_versions err {:?}", &err); + return; + } + }; + + fivs.versions.sort_by(|a, b| b.mod_time.cmp(&a.mod_time)); + + let mut decommissioned: usize = 0; + let expired: usize = 0; + + for version in fivs.versions.iter() { + // TODO: filterLifecycle + let remaining_versions = fivs.versions.len() - expired; + if version.deleted && remaining_versions == 1 && rcfg.is_none() { + // + decommissioned += 1; + info!("decommission_pool: DELETE marked object with no other non-current versions will be skipped"); + continue; + } + + let version_id = version.version_id.map(|v| v.to_string()); + + let mut ignore = false; + let mut failure = false; + let mut error = None; + if version.deleted { + // TODO: other params + if let Err(err) = self + .delete_object( + bucket.as_str(), + &version.name, + ObjectOptions { + versioned: true, + version_id: version_id.clone(), + mod_time: version.mod_time, + src_pool_idx: idx, + data_movement: true, + delete_marker: true, + skip_decommissioned: true, + ..Default::default() + }, + ) + .await + { + if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { + ignore = true; + continue; + } + + failure = true; + + error = Some(err) + } + + { + self.pool_meta.write().await.count_item(idx, 0, failure); + } + + if !failure { + decommissioned += 1; + } + + info!( + "decommission_pool: DecomCopyDeleteMarker {} {} {:?} {:?}", + &bucket, &version.name, &version_id, error + ); + continue; + } + + for _i in 0..3 { + if version.is_remote() { + // TODO: DecomTieredObject + } + + let bucket = bucket.clone(); + + let rd = match set + .get_object_reader( + bucket.as_str(), + &encode_dir_object(&version.name), + None, + HeaderMap::new(), + &ObjectOptions { + version_id: version_id.clone(), + no_lock: true, + ..Default::default() + }, + ) + .await + { + Ok(rd) => rd, + Err(err) => { + if is_err_object_not_found(&err) || is_err_version_not_found(&err) { + ignore = true; + break; + } + + if !ignore { + // + if bucket == RUSTFS_META_BUCKET && version.name.contains(DATA_USAGE_CACHE_NAME) { + ignore = true; + error!("decommission_pool: ignore data usage cache {}", &version.name); + break; + } + } + + failure = true; + error!("decommission_pool: get_object_reader err {:?}", &err); + continue; + } + }; + + if let Err(err) = self.clone().decommission_object(idx, bucket, rd).await { + if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { + ignore = true; + break; + } + + failure = true; + + error!("decommission_pool: decommission_object err {:?}", &err); + continue; + } + + failure = false; + break; + } + + if ignore { + info!("decommission_pool: ignore {}", &version.name); + continue; + } + + { + self.pool_meta.write().await.count_item(idx, decommissioned, failure); + } + + if failure { + break; + } + + decommissioned += 1; + } + + if decommissioned == fivs.versions.len() { + if let Err(err) = set + .delete_object( + bucket.as_str(), + &encode_dir_object(&entry.name), + ObjectOptions { + delete_prefix: true, + delete_prefix_object: true, + + ..Default::default() + }, + ) + .await + { + error!("decommission_pool: delete_object err {:?}", &err); + } + } + + { + let mut pool_meta = self.pool_meta.write().await; + + pool_meta.track_current_bucket_object(idx, bucket.clone(), entry.name.clone()); + + let ok = pool_meta + .update_after(idx, self.pools.clone(), Duration::seconds(30)) + .await + .unwrap_or_default(); + if ok { + // TODO: ReloadPoolMeta + } + } + } + + async fn decommission_pool( + self: &Arc, + rx: B_Receiver, + idx: usize, + pool: Arc, + bi: DecomBucketInfo, + ) -> Result<()> { let wk = Workers::new(pool.disk_set.len() * 2).map_err(|v| Error::from_string(v))?; // let mut vc = None; @@ -558,226 +865,42 @@ impl ECStore { // TODO: ReplicationConfig } - for (idx, set) in pool.disk_set.iter().enumerate() { - let set = set.clone(); - let wk_clone = wk.clone(); - let bucket = bi.name.clone(); - let rcfg = rcfg.clone(); - - let decommission_entry = |entry: MetaCacheEntry| async move { - wk_clone.give().await; - if entry.is_dir() { - return; - } - - let mut fivs = match entry.file_info_versions(&bucket) { - Ok(f) => f, - Err(err) => { - error!("decommission_pool: file_info_versions err {:?}", &err); - return; - } - }; - - fivs.versions.sort_by(|a, b| b.mod_time.cmp(&a.mod_time)); - - let mut decommissioned: usize = 0; - let mut expired: usize = 0; - - for version in fivs.versions.iter() { - // TODO: filterLifecycle - let remaining_versions = fivs.versions.len() - expired; - if version.deleted && remaining_versions == 1 && rcfg.is_none() { - // - decommissioned += 1; - info!("decommission_pool: DELETE marked object with no other non-current versions will be skipped"); - continue; - } - - let version_id = version.version_id.map(|v| v.to_string()); - - let mut ignore = false; - let mut failure = false; - let mut error = None; - if version.deleted { - // TODO: other params - if let Err(err) = self - .delete_object( - bucket.as_str(), - &version.name, - ObjectOptions { - versioned: true, - version_id: version_id.clone(), - mod_time: version.mod_time, - src_pool_idx: idx, - data_movement: true, - delete_marker: true, - skip_decommissioned: true, - ..Default::default() - }, - ) - .await - { - if is_err_object_not_found(&err) - || is_err_version_not_found(&err) - || is_err_data_movement_overwrite(&err) - { - // - ignore = true; - continue; - } - - failure = true; - - error = Some(err) - } - - { - self.pool_meta.write().await.count_item(idx, 0, failure); - } - - if !failure { - decommissioned += 1; - } - - info!( - "decommission_pool: DecomCopyDeleteMarker {} {} {:?} {:?}", - &bucket, &version.name, &version_id, error - ); - continue; - } - - for _i in 0..3 { - if version.is_remote() { - // TODO: DecomTieredObject - } - - let bucket = bucket.clone(); - - let rd = match set - .get_object_reader( - bucket.as_str(), - &encode_dir_object(&version.name), - None, - HeaderMap::new(), - &ObjectOptions { - version_id: version_id.clone(), - no_lock: true, - ..Default::default() - }, - ) - .await - { - Ok(rd) => rd, - Err(err) => { - if is_err_object_not_found(&err) || is_err_version_not_found(&err) { - ignore = true; - break; - } - - if !ignore { - // - if bucket == RUSTFS_META_BUCKET && version.name.contains(DATA_USAGE_CACHE_NAME) { - ignore = true; - error!("decommission_pool: ignore data usage cache {}", &version.name); - break; - } - } - - failure = true; - error!("decommission_pool: get_object_reader err {:?}", &err); - continue; - } - }; - - if let Err(err) = self.decommission_object(idx, bucket, rd).await { - if is_err_object_not_found(&err) - || is_err_version_not_found(&err) - || is_err_data_movement_overwrite(&err) - { - ignore = true; - break; - } - - failure = true; - - error!("decommission_pool: decommission_object err {:?}", &err); - continue; - } - - failure = false; - break; - } - - if ignore { - info!("decommission_pool: ignore {}", &version.name); - continue; - } - - { - self.pool_meta.write().await.count_item(idx, decommissioned, failure); - } - - if failure { - break; - } - - decommissioned += 1; - } - - if decommissioned == fivs.versions.len() { - if let Err(err) = set - .delete_object( - bucket.as_str(), - &encode_dir_object(&entry.name), - ObjectOptions { - delete_prefix: true, - delete_prefix_object: true, - - ..Default::default() - }, - ) - .await - { - error!("decommission_pool: delete_object err {:?}", &err); - } - } - - { - let mut pool_meta = self.pool_meta.write().await; - - pool_meta.track_current_bucket_object(idx, bucket.clone(), entry.name.clone()); - - let ok = pool_meta - .update_after(idx, self.pools.clone(), Duration::seconds(30)) - .await - .unwrap_or_default(); - if ok { - // TODO: ReloadPoolMeta - } - } - }; - + for (set_idx, set) in pool.disk_set.iter().enumerate() { wk.clone().take().await; - // let set = set.clone(); - // let mutrx = rx.resubscribe(); - // tokio::spawn(async move { - // loop { - // if rx.try_recv().is_ok() { - // break; - // } + let decommission_entry: ListCallback = Arc::new({ + let this = Arc::clone(self); + let bucket = bi.name.clone(); + let wk = wk.clone(); + let set = set.clone(); + let rcfg = rcfg.clone(); + move |entry: MetaCacheEntry| { + let this = this.clone(); + let bucket = bucket.clone(); + let wk = wk.clone(); + let set = set.clone(); + let rcfg = rcfg.clone(); + Box::pin(async move { this.decommission_entry(idx, entry, bucket, set, wk, rcfg).await }) + } + }); - // let decommission_entry = Arc::new(move |entry: MetaCacheEntry| { - // let decommission_entry = decommission_entry.clone(); - // Box::pin(async move { - // decommission_entry(entry).await; - // }) as Pin + Send>> - // }); - // set.list_objects_to_decommission(rx, bi, decommission_entry).await.unwrap(); - // } - - // // - // }); + let set = set.clone(); + let mut rx = rx.resubscribe(); + let bi = bi.clone(); + let set_id = set_idx; + tokio::spawn(async move { + loop { + if rx.try_recv().is_ok() { + break; + } + if let Err(err) = set + .list_objects_to_decommission(rx.resubscribe(), bi.clone(), decommission_entry.clone()) + .await + { + error!("decommission_pool: list_objects_to_decommission {} err {:?}", set_id, &err); + } + } + }); } wk.wait().await; @@ -785,7 +908,7 @@ impl ECStore { Ok(()) } - async fn do_decommission_in_routine(&self, rx: B_Receiver, idx: usize) { + pub async fn do_decommission_in_routine(self: &Arc, rx: B_Receiver, idx: usize) { if let Err(err) = self.decommission_in_background(rx, idx).await { error!("decom err {:?}", &err); if let Err(er) = self.decommission_failed(idx).await { @@ -850,7 +973,7 @@ impl ECStore { Ok(()) } - async fn decommission_in_background(&self, rx: B_Receiver, idx: usize) -> Result<()> { + async fn decommission_in_background(self: &Arc, rx: B_Receiver, idx: usize) -> Result<()> { let pool = self.pools[idx].clone(); let pending = { @@ -973,34 +1096,151 @@ impl ECStore { Ok(ret) } - async fn decommission_object(&self, id: usize, bucket: String, rd: GetObjectReader) -> Result<()> { - let object_info = rd.object_info; - let actual_size = object_info.get_actual_size()?; + async fn decommission_object(self: Arc, pool_idx: usize, bucket: String, rd: GetObjectReader) -> Result<()> { + let object_info = rd.object_info.clone(); - if object_info.is_multipart() {} + // TODO: check : use size or actual_size ? + let _actual_size = object_info.get_actual_size()?; - unimplemented!() + if object_info.is_multipart() { + let res = match self + .new_multipart_upload( + &bucket, + &object_info.name, + &ObjectOptions { + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + user_defined: object_info.user_defined.clone(), + src_pool_idx: pool_idx, + data_movement: true, + ..Default::default() + }, + ) + .await + { + Ok(res) => res, + Err(err) => { + error!("decommission_object: new_multipart_upload err {:?}", &err); + return Err(err); + } + }; + + // TODO: defer abort_multipart_upload + + defer!(|| async { + if let Err(err) = self + .abort_multipart_upload(&bucket, &object_info.name, &res.upload_id, &ObjectOptions::default()) + .await + { + error!("decommission_object: abort_multipart_upload err {:?}", &err); + } + }); + + let mut parts = vec![CompletePart::default(); object_info.parts.len()]; + + let mut reader = rd.stream; + + for (i, part) in object_info.parts.iter().enumerate() { + // 每次从reader中读取一个part上传 + + let mut data = PutObjReader::new(reader, part.size); + + let pi = match self + .put_object_part( + &bucket, + &object_info.name, + &res.upload_id, + part.number, + &mut data, + &ObjectOptions { + preserve_etag: part.e_tag.clone(), + ..Default::default() + }, + ) + .await + { + Ok(pi) => pi, + Err(err) => { + error!("decommission_object: put_object_part err {:?}", &err); + return Err(err); + } + }; + + parts[i] = CompletePart { + part_num: pi.part_num, + e_tag: pi.etag, + }; + + // 把reader所有权拿回来? + reader = data.stream; + } + + if let Err(err) = self + .complete_multipart_upload( + &bucket, + &object_info.name, + &res.upload_id, + parts, + &ObjectOptions { + data_movement: true, + mod_time: object_info.mod_time, + ..Default::default() + }, + ) + .await + { + error!("decommission_object: complete_multipart_upload err {:?}", &err); + return Err(err); + } + + return Ok(()); + } + + let mut data = PutObjReader::new(rd.stream, object_info.size); + + if let Err(err) = self + .put_object( + &bucket, + &object_info.name, + &mut data, + &ObjectOptions { + src_pool_idx: pool_idx, + data_movement: true, + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + mod_time: object_info.mod_time, + user_defined: object_info.user_defined.clone(), + preserve_etag: object_info.etag.clone(), + + ..Default::default() + }, + ) + .await + { + error!("decommission_object: put_object err {:?}", &err); + return Err(err); + } + + Ok(()) } } // impl Fn(MetaCacheEntry) -> impl Future> -type Listcb = Arc Pin + Send>> + Send + Sync + 'static>; +pub type ListCallback = Arc BoxFuture<'static, ()> + Send + Sync + 'static>; impl SetDisks { // async fn list_objects_to_decommission( self: &Arc, - mut rx: B_Receiver, + rx: B_Receiver, bucket_info: DecomBucketInfo, - func: Listcb, + func: ListCallback, ) -> Result<()> { let (disks, _) = self.get_online_disks_with_healing(false).await; if disks.is_empty() { return Err(Error::msg("errNoDiskAvailable")); } - let listing_quorum = (self.set_drive_count + 1) / 2; + let listing_quorum = self.set_drive_count.div_ceil(2); let resolver = MetadataResolutionParams { dir_quorum: listing_quorum, @@ -1019,13 +1259,11 @@ impl SetDisks { min_disks: listing_quorum, partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { let resolver = resolver.clone(); - let func = - func.clone() as Arc Pin + Send>> + Send + Sync>; - Box::pin(async move { - if let Ok(Some(entry)) = entries.resolve(resolver) { - func(entry); - } - }) + if let Ok(Some(entry)) = entries.resolve(resolver) { + func(entry) + } else { + Box::pin(async {}) + } })), ..Default::default() }, @@ -1061,33 +1299,3 @@ fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::Storage } capacity } - -#[test] -fn test_pool_meta() -> Result<()> { - let meta = PoolMeta::new(vec![]); - let mut data = Vec::new(); - data.write_u16::(POOL_META_FORMAT).unwrap(); - data.write_u16::(POOL_META_VERSION).unwrap(); - let mut buf = Vec::new(); - meta.serialize(&mut Serializer::new(&mut buf))?; - data.write_all(&buf)?; - - let format = LittleEndian::read_u16(&data[0..2]); - if format != POOL_META_FORMAT { - return Err(Error::msg(format!("PoolMeta: unknown format: {}", format))); - } - let version = LittleEndian::read_u16(&data[2..4]); - if version != POOL_META_VERSION { - return Err(Error::msg(format!("PoolMeta: unknown version: {}", version))); - } - - let mut buf = Deserializer::new(Cursor::new(&data[4..])); - let de_meta: PoolMeta = Deserialize::deserialize(&mut buf).unwrap(); - - if de_meta.version != POOL_META_VERSION { - return Err(Error::msg(format!("unexpected PoolMeta version: {}", de_meta.version))); - } - - println!("meta: {:?}", de_meta); - Ok(()) -} diff --git a/ecstore/src/rebalance.rs b/ecstore/src/rebalance.rs new file mode 100644 index 00000000..74cf21c4 --- /dev/null +++ b/ecstore/src/rebalance.rs @@ -0,0 +1,1038 @@ +use std::io::Cursor; +use std::sync::Arc; +use std::time::SystemTime; + +use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions}; +use crate::config::com::{read_config_with_metadata, save_config_with_opts}; +use crate::config::error::is_err_config_not_found; +use crate::disk::{MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams}; +use crate::global::get_global_endpoints; +use crate::pools::ListCallback; +use crate::set_disk::SetDisks; +use crate::store::ECStore; +use crate::store_api::{CompletePart, FileInfo, GetObjectReader, ObjectIO, ObjectOptions, PutObjReader}; +use crate::store_err::{is_err_data_movement_overwrite, is_err_object_not_found, is_err_version_not_found}; +use crate::utils::path::encode_dir_object; +use crate::StorageAPI; +use common::defer; +use common::error::{Error, Result}; +use http::HeaderMap; +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast::{self, Receiver as B_Receiver}; +use tokio::time::{Duration, Instant}; +use tracing::{error, info}; +use uuid::Uuid; +use workers::workers::Workers; + +const REBAL_META_FMT: u16 = 1; // Replace with actual format value +const REBAL_META_VER: u16 = 1; // Replace with actual version value +const REBAL_META_NAME: &str = "rebalance_meta"; + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct RebalanceStats { + #[serde(rename = "ifs")] + pub init_free_space: u64, // Pool free space at the start of rebalance + #[serde(rename = "ic")] + pub init_capacity: u64, // Pool capacity at the start of rebalance + #[serde(rename = "bus")] + pub buckets: Vec, // Buckets being rebalanced or to be rebalanced + #[serde(rename = "rbs")] + pub rebalanced_buckets: Vec, // Buckets rebalanced + #[serde(rename = "bu")] + pub bucket: String, // Last rebalanced bucket + #[serde(rename = "ob")] + pub object: String, // Last rebalanced object + #[serde(rename = "no")] + pub num_objects: u64, // Number of objects rebalanced + #[serde(rename = "nv")] + pub num_versions: u64, // Number of versions rebalanced + #[serde(rename = "bs")] + pub bytes: u64, // Number of bytes rebalanced + #[serde(rename = "par")] + pub participating: bool, // Whether the pool is participating in rebalance + #[serde(rename = "inf")] + pub info: RebalanceInfo, // Rebalance operation info +} + +impl RebalanceStats { + pub fn update(&mut self, bucket: String, fi: &FileInfo) { + if fi.is_latest { + self.num_objects += 1; + } + + self.num_versions += 1; + let on_disk_size = if !fi.deleted { + fi.size as i64 * (fi.erasure.data_blocks + fi.erasure.parity_blocks) as i64 / fi.erasure.data_blocks as i64 + } else { + 0 + }; + self.bytes += on_disk_size as u64; + self.bucket = bucket; + self.object = fi.name.clone(); + } +} + +pub type RStats = Vec>; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +pub enum RebalStatus { + #[default] + None, + Started, + Completed, + Stopped, + Failed, +} + +use std::fmt; + +impl fmt::Display for RebalStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let status = match self { + RebalStatus::None => "None", + RebalStatus::Started => "Started", + RebalStatus::Completed => "Completed", + RebalStatus::Stopped => "Stopped", + RebalStatus::Failed => "Failed", + }; + write!(f, "{}", status) + } +} + +impl From for RebalStatus { + fn from(value: u8) -> Self { + match value { + 1 => RebalStatus::Started, + 2 => RebalStatus::Completed, + 3 => RebalStatus::Stopped, + 4 => RebalStatus::Failed, + _ => RebalStatus::None, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +pub enum RebalSaveOpt { + #[default] + Stats, + StoppedAt, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct RebalanceInfo { + #[serde(rename = "startTs")] + pub start_time: Option, // Time at which rebalance-start was issued + #[serde(rename = "stopTs")] + pub end_time: Option, // Time at which rebalance operation completed or rebalance-stop was called + #[serde(rename = "status")] + pub status: RebalStatus, // Current state of rebalance operation +} + +#[allow(dead_code)] +#[derive(Debug, Clone, Default)] +pub struct DiskStat { + pub total_space: u64, + pub available_space: u64, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct RebalanceMeta { + #[serde(skip)] + pub cancel: Option>, // To be invoked on rebalance-stop + #[serde(skip)] + pub last_refreshed_at: Option, + #[serde(rename = "stopTs")] + pub stopped_at: Option, // Time when rebalance-stop was issued + #[serde(rename = "id")] + pub id: String, // ID of the ongoing rebalance operation + #[serde(rename = "pf")] + pub percent_free_goal: f64, // Computed from total free space and capacity at the start of rebalance + #[serde(rename = "rss")] + pub pool_stats: Vec, // Per-pool rebalance stats keyed by pool index +} + +impl RebalanceMeta { + pub fn new() -> Self { + Self::default() + } + pub async fn load(&mut self, store: Arc) -> Result<()> { + self.load_with_opts(store, ObjectOptions::default()).await + } + + pub async fn load_with_opts(&mut self, store: Arc, opts: ObjectOptions) -> Result<()> { + let (data, _) = read_config_with_metadata(store, REBAL_META_NAME, &opts).await?; + if data.is_empty() { + return Ok(()); + } + if data.len() <= 4 { + return Err(Error::msg("rebalanceMeta: no data")); + } + + // Read header + match u16::from_le_bytes([data[0], data[1]]) { + REBAL_META_FMT => {} + fmt => return Err(Error::msg(format!("rebalanceMeta: unknown format: {}", fmt))), + } + match u16::from_le_bytes([data[2], data[3]]) { + REBAL_META_VER => {} + ver => return Err(Error::msg(format!("rebalanceMeta: unknown version: {}", ver))), + } + + let meta: Self = rmp_serde::from_read(Cursor::new(&data[4..]))?; + *self = meta; + + self.last_refreshed_at = Some(SystemTime::now()); + + Ok(()) + } + + pub async fn save(&self, store: Arc) -> Result<()> { + self.save_with_opts(store, ObjectOptions::default()).await + } + + pub async fn save_with_opts(&self, store: Arc, opts: ObjectOptions) -> Result<()> { + if self.pool_stats.is_empty() { + return Ok(()); + } + + let mut data = Vec::new(); + + // Initialize the header + data.extend(&REBAL_META_FMT.to_le_bytes()); + data.extend(&REBAL_META_VER.to_le_bytes()); + + let msg = rmp_serde::to_vec(self)?; + data.extend(msg); + + save_config_with_opts(store, REBAL_META_NAME, data, &opts).await?; + + Ok(()) + } +} + +impl ECStore { + pub async fn load_rebalance_meta(&self) -> Result<()> { + let mut meta = RebalanceMeta::new(); + match meta.load(self.pools[0].clone()).await { + Ok(_) => { + let mut rebalance_meta = self.rebalance_meta.write().await; + + *rebalance_meta = Some(meta); + + if let Err(err) = self.update_rebalance_stats().await { + error!("Failed to update rebalance stats: {}", err); + } + } + Err(err) => { + if !is_err_config_not_found(&err) { + return Err(err); + } + } + } + + Ok(()) + } + + pub async fn update_rebalance_stats(&self) -> Result<()> { + let mut ok = false; + let mut rebalance_meta = self.rebalance_meta.write().await; + + for i in 0..self.pools.len() { + if self.find_index(i).await.is_none() { + if let Some(meta) = rebalance_meta.as_mut() { + meta.pool_stats.push(RebalanceStats::default()); + } + ok = true; + } + } + + if ok { + if let Some(meta) = rebalance_meta.as_mut() { + meta.save(self.pools[0].clone()).await?; + } + } + + Ok(()) + } + + async fn find_index(&self, index: usize) -> Option { + if let Some(meta) = self.rebalance_meta.read().await.as_ref() { + return meta.pool_stats.get(index).map(|_v| index); + } + + None + } + + pub async fn init_rebalance_meta(&self, bucktes: Vec) -> Result { + let si = self.storage_info().await; + + let mut disk_stats = vec![DiskStat::default(); self.pools.len()]; + + let mut total_cap = 0; + let mut total_free = 0; + for disk in si.disks.iter() { + if disk.pool_index < 0 || disk_stats.len() <= disk.pool_index as usize { + continue; + } + + total_cap += disk.total_space; + total_free += disk.available_space; + + disk_stats[disk.pool_index as usize].total_space += disk.total_space; + disk_stats[disk.pool_index as usize].available_space += disk.available_space; + } + + let percent_free_goal = total_free as f64 / total_cap as f64; + + let mut pool_stats = Vec::with_capacity(self.pools.len()); + + let now = SystemTime::now(); + + for disk_stat in disk_stats.iter() { + let mut pool_stat = RebalanceStats { + init_free_space: disk_stat.available_space, + init_capacity: disk_stat.total_space, + buckets: bucktes.clone(), + rebalanced_buckets: Vec::with_capacity(bucktes.len()), + ..Default::default() + }; + + if (disk_stat.available_space as f64 / disk_stat.total_space as f64) < percent_free_goal { + pool_stat.participating = true; + pool_stat.info = RebalanceInfo { + start_time: Some(now), + status: RebalStatus::Started, + ..Default::default() + }; + } + + pool_stats.push(pool_stat); + } + + let meta = RebalanceMeta { + id: Uuid::new_v4().to_string(), + percent_free_goal, + pool_stats, + ..Default::default() + }; + + meta.save(self.pools[0].clone()).await?; + + let id = meta.id.clone(); + + let mut rebalance_meta = self.rebalance_meta.write().await; + *rebalance_meta = Some(meta); + + Ok(id) + } + + pub async fn update_pool_stats(&self, pool_index: usize, bucket: String, fi: &FileInfo) -> Result<()> { + let mut rebalance_meta = self.rebalance_meta.write().await; + if let Some(meta) = rebalance_meta.as_mut() { + if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { + pool_stat.update(bucket, fi); + } + } + + Ok(()) + } + + pub async fn next_rebal_bucket(&self, pool_index: usize) -> Result> { + let rebalance_meta = self.rebalance_meta.read().await; + if let Some(meta) = rebalance_meta.as_ref() { + if let Some(pool_stat) = meta.pool_stats.get(pool_index) { + if pool_stat.info.status != RebalStatus::Completed || !pool_stat.participating { + return Ok(None); + } + + if pool_stat.buckets.is_empty() { + return Ok(None); + } + return Ok(Some(pool_stat.buckets[0].clone())); + } + } + + Ok(None) + } + + pub async fn bucket_rebalance_done(&self, pool_index: usize, bucket: String) -> Result<()> { + let mut rebalance_meta = self.rebalance_meta.write().await; + if let Some(meta) = rebalance_meta.as_mut() { + if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { + if let Some(idx) = pool_stat.buckets.iter().position(|b| *b == bucket) { + pool_stat.buckets.remove(idx); + pool_stat.rebalanced_buckets.push(bucket); + return Ok(()); + } + } + } + + Ok(()) + } + + pub async fn is_rebalance_started(&self) -> bool { + let rebalance_meta = self.rebalance_meta.read().await; + if let Some(ref meta) = *rebalance_meta { + if meta.stopped_at.is_some() { + return false; + } + + if meta + .pool_stats + .iter() + .any(|v| v.participating && v.info.status != RebalStatus::Completed) + { + return true; + } + } + + false + } + + pub async fn is_pool_rebalancing(&self, pool_index: usize) -> bool { + let rebalance_meta = self.rebalance_meta.read().await; + if let Some(ref meta) = *rebalance_meta { + if meta.stopped_at.is_some() { + return false; + } + + if let Some(pool_stat) = meta.pool_stats.get(pool_index) { + return pool_stat.participating && pool_stat.info.status == RebalStatus::Started; + } + } + + false + } + + pub async fn stop_rebalance(self: &Arc) -> Result<()> { + let rebalance_meta = self.rebalance_meta.read().await; + if let Some(meta) = rebalance_meta.as_ref() { + if let Some(tx) = meta.cancel.as_ref() { + let _ = tx.send(true); + } + } + + Ok(()) + } + + pub async fn start_rebalance(self: &Arc) { + // let rebalance_meta = self.rebalance_meta.read().await; + + let (tx, rx) = broadcast::channel::(1); + + { + let mut rebalance_meta = self.rebalance_meta.write().await; + if let Some(meta) = rebalance_meta.as_mut() { + meta.cancel = Some(tx) + } else { + return; + } + } + + let participants = { + if let Some(ref meta) = *self.rebalance_meta.read().await { + if meta.stopped_at.is_some() { + return; + } + + let mut participants = vec![false; meta.pool_stats.len()]; + for (i, pool_stat) in meta.pool_stats.iter().enumerate() { + if pool_stat.info.status == RebalStatus::Started { + participants[i] = pool_stat.participating; + } + } + participants + } else { + Vec::new() + } + }; + + for (idx, participating) in participants.iter().enumerate() { + if !*participating { + continue; + } + + if get_global_endpoints() + .as_ref() + .get(idx) + .map_or(true, |v| v.endpoints.as_ref().first().map_or(true, |e| e.is_local)) + { + continue; + } + + let pool_idx = idx; + let store = self.clone(); + let rx = rx.resubscribe(); + tokio::spawn(async move { + if let Err(err) = store.rebalance_buckets(rx, pool_idx).await { + error!("Rebalance failed for pool {}: {}", pool_idx, err); + } else { + info!("Rebalance completed for pool {}", pool_idx); + } + }); + } + } + + async fn rebalance_buckets(self: &Arc, rx: B_Receiver, pool_index: usize) -> Result<()> { + let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::>(1); + + // Save rebalance metadata periodically + let store = self.clone(); + let save_task = tokio::spawn(async move { + let mut timer = tokio::time::interval_at(Instant::now() + Duration::from_secs(10), Duration::from_secs(10)); + let mut msg: String; + let mut quit = false; + + loop { + tokio::select! { + // TODO: cancel rebalance + Some(result) = done_rx.recv() => { + quit = true; + let now = SystemTime::now(); + + + let state = match result { + Ok(_) => { + msg = format!("Rebalance completed at {:?}", now); + RebalStatus::Completed}, + Err(err) => { + // TODO: check stop + if err.to_string().contains("canceled") { + msg = format!("Rebalance stopped at {:?}", now); + RebalStatus::Stopped + } else { + msg = format!("Rebalance stopped at {:?} with err {:?}", now, err); + RebalStatus::Failed + } + } + }; + + { + let mut rebalance_meta = store.rebalance_meta.write().await; + + if let Some(rbm) = rebalance_meta.as_mut() { + rbm.pool_stats[pool_index].info.status = state; + rbm.pool_stats[pool_index].info.end_time = Some(now); + } + } + + + } + _ = timer.tick() => { + let now = SystemTime::now(); + msg = format!("Saving rebalance metadata at {:?}", now); + } + } + + if let Err(err) = store.save_rebalance_stats(pool_index, RebalSaveOpt::Stats).await { + error!("{} err: {:?}", msg, err); + } else { + info!(msg); + } + + if quit { + return; + } + + timer.reset(); + } + }); + + tracing::info!("Pool {} rebalancing is started", pool_index + 1); + + while let Some(bucket) = self.next_rebal_bucket(pool_index).await? { + tracing::info!("Rebalancing bucket: {}", bucket); + + if let Err(err) = self.rebalance_bucket(rx.resubscribe(), bucket.clone(), pool_index).await { + if err.to_string().contains("not initialized") { + continue; + } + tracing::error!("Error rebalancing bucket {}: {:?}", bucket, err); + done_tx.send(Err(err)).await.ok(); + break; + } + + self.bucket_rebalance_done(pool_index, bucket).await?; + } + + tracing::info!("Pool {} rebalancing is done", pool_index + 1); + + done_tx.send(Ok(())).await.ok(); + save_task.await.ok(); + + Ok(()) + } + + async fn check_if_rebalance_done(&self, pool_index: usize) -> bool { + let mut rebalance_meta = self.rebalance_meta.write().await; + + if let Some(meta) = rebalance_meta.as_mut() { + if let Some(pool_stat) = meta.pool_stats.get_mut(pool_index) { + // Check if the pool's rebalance status is already completed + if pool_stat.info.status == RebalStatus::Completed { + return true; + } + + // Calculate the percentage of free space improvement + let pfi = (pool_stat.init_free_space + pool_stat.bytes) as f64 / pool_stat.init_capacity as f64; + + // Mark pool rebalance as done if within 5% of the PercentFreeGoal + if (pfi - meta.percent_free_goal).abs() <= 0.05 { + pool_stat.info.status = RebalStatus::Completed; + pool_stat.info.end_time = Some(SystemTime::now()); + return true; + } + } + } + + false + } + + #[allow(unused_assignments)] + async fn rebalance_entry( + &self, + bucket: String, + pool_index: usize, + entry: MetaCacheEntry, + set: Arc, + wk: Arc, + ) { + defer!(|| async { + wk.give().await; + }); + + if entry.is_dir() { + return; + } + + if self.check_if_rebalance_done(pool_index).await { + return; + } + + let mut fivs = match entry.file_info_versions(&bucket) { + Ok(fivs) => fivs, + Err(err) => { + error!("rebalance_entry Error getting file info versions: {}", err); + return; + } + }; + + fivs.versions.sort_by(|a, b| b.mod_time.cmp(&a.mod_time)); + + let mut rebalanced: usize = 0; + let expired: usize = 0; + for version in fivs.versions.iter() { + if version.is_remote() { + info!("rebalance_entry Entry {} is remote, skipping", version.name); + continue; + } + // TODO: filterLifecycle + + let remaining_versions = fivs.versions.len() - expired; + if version.deleted && remaining_versions == 1 { + rebalanced += 1; + info!("rebalance_entry Entry {} is deleted and last version, skipping", version.name); + continue; + } + let version_id = version.version_id.map(|v| v.to_string()); + + let mut ignore = false; + let mut failure = false; + let mut error = None; + if version.deleted { + if let Err(err) = set + .delete_object( + &bucket, + &version.name, + ObjectOptions { + versioned: true, + version_id: version_id.clone(), + mod_time: version.mod_time, + src_pool_idx: pool_index, + data_movement: true, + delete_marker: true, + skip_decommissioned: true, + ..Default::default() + }, + ) + .await + { + if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { + ignore = true; + info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + continue; + } + error = Some(err); + failure = true; + } + + if !failure { + error!("rebalance_entry {} Entry {} deleted successfully", &bucket, &version.name); + let _ = self.update_pool_stats(pool_index, bucket.clone(), version).await; + + rebalanced += 1; + } else { + error!( + "rebalance_entry {} Error deleting entry {}/{:?}: {:?}", + &bucket, &version.name, &version.version_id, error + ); + } + + continue; + } + + for _i in 0..3 { + let rd = match set + .get_object_reader( + bucket.as_str(), + &encode_dir_object(&version.name), + None, + HeaderMap::new(), + &ObjectOptions { + version_id: version_id.clone(), + no_lock: true, // NoDecryption + ..Default::default() + }, + ) + .await + { + Ok(rd) => rd, + Err(err) => { + if is_err_object_not_found(&err) || is_err_version_not_found(&err) { + ignore = true; + break; + } + + failure = true; + error!("rebalance_entry: get_object_reader err {:?}", &err); + continue; + } + }; + + if let Err(err) = self.rebalance_object(pool_index, bucket.clone(), rd).await { + if is_err_object_not_found(&err) || is_err_version_not_found(&err) || is_err_data_movement_overwrite(&err) { + ignore = true; + info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + break; + } + + failure = true; + error!("rebalance_entry: rebalance_object err {:?}", &err); + continue; + } + + failure = false; + info!("rebalance_entry {} Entry {} rebalanced successfully", &bucket, &version.name); + break; + } + + if ignore { + info!("rebalance_entry {} Entry {} is already deleted, skipping", &bucket, version.name); + continue; + } + + if failure { + error!( + "rebalance_entry {} Error rebalancing entry {}/{:?}: {:?}", + &bucket, &version.name, &version.version_id, error + ); + break; + } + + let _ = self.update_pool_stats(pool_index, bucket.clone(), version).await; + rebalanced += 1; + } + + if rebalanced == fivs.versions.len() { + if let Err(err) = set + .delete_object( + bucket.as_str(), + &encode_dir_object(&entry.name), + ObjectOptions { + delete_prefix: true, + delete_prefix_object: true, + + ..Default::default() + }, + ) + .await + { + error!("rebalance_entry: delete_object err {:?}", &err); + } else { + info!("rebalance_entry {} Entry {} deleted successfully", &bucket, &entry.name); + } + } + } + async fn rebalance_object(&self, pool_idx: usize, bucket: String, rd: GetObjectReader) -> Result<()> { + let object_info = rd.object_info.clone(); + + // TODO: check : use size or actual_size ? + let _actual_size = object_info.get_actual_size()?; + + if object_info.is_multipart() { + let res = match self + .new_multipart_upload( + &bucket, + &object_info.name, + &ObjectOptions { + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + user_defined: object_info.user_defined.clone(), + src_pool_idx: pool_idx, + data_movement: true, + ..Default::default() + }, + ) + .await + { + Ok(res) => res, + Err(err) => { + error!("rebalance_object: new_multipart_upload err {:?}", &err); + return Err(err); + } + }; + + // TODO: defer abort_multipart_upload + + defer!(|| async { + if let Err(err) = self + .abort_multipart_upload(&bucket, &object_info.name, &res.upload_id, &ObjectOptions::default()) + .await + { + error!("rebalance_object: abort_multipart_upload err {:?}", &err); + } + }); + + let mut parts = vec![CompletePart::default(); object_info.parts.len()]; + + let mut reader = rd.stream; + + for (i, part) in object_info.parts.iter().enumerate() { + // 每次从reader中读取一个part上传 + + let mut data = PutObjReader::new(reader, part.size); + + let pi = match self + .put_object_part( + &bucket, + &object_info.name, + &res.upload_id, + part.number, + &mut data, + &ObjectOptions { + preserve_etag: part.e_tag.clone(), + ..Default::default() + }, + ) + .await + { + Ok(pi) => pi, + Err(err) => { + error!("rebalance_object: put_object_part err {:?}", &err); + return Err(err); + } + }; + + parts[i] = CompletePart { + part_num: pi.part_num, + e_tag: pi.etag, + }; + + // 把reader所有权拿回来? + reader = data.stream; + } + + if let Err(err) = self + .complete_multipart_upload( + &bucket, + &object_info.name, + &res.upload_id, + parts, + &ObjectOptions { + data_movement: true, + mod_time: object_info.mod_time, + ..Default::default() + }, + ) + .await + { + error!("rebalance_object: complete_multipart_upload err {:?}", &err); + return Err(err); + } + + return Ok(()); + } + + let mut data = PutObjReader::new(rd.stream, object_info.size); + + if let Err(err) = self + .put_object( + &bucket, + &object_info.name, + &mut data, + &ObjectOptions { + src_pool_idx: pool_idx, + data_movement: true, + version_id: object_info.version_id.as_ref().map(|v| v.to_string()), + mod_time: object_info.mod_time, + user_defined: object_info.user_defined.clone(), + preserve_etag: object_info.etag.clone(), + + ..Default::default() + }, + ) + .await + { + error!("rebalance_object: put_object err {:?}", &err); + return Err(err); + } + + Ok(()) + } + + async fn rebalance_bucket(self: &Arc, rx: B_Receiver, bucket: String, pool_index: usize) -> Result<()> { + // Placeholder for actual bucket rebalance logic + tracing::info!("Rebalancing bucket {} in pool {}", bucket, pool_index); + + // TODO: other config + // if bucket != RUSTFS_META_BUCKET{ + + // } + + let pool = self.pools[pool_index].clone(); + + let wk = Workers::new(pool.disk_set.len() * 2).map_err(|v| Error::from_string(v))?; + + for (set_idx, set) in pool.disk_set.iter().enumerate() { + wk.clone().take().await; + + let rebalance_entry: ListCallback = Arc::new({ + let this = Arc::clone(self); + let bucket = bucket.clone(); + let wk = wk.clone(); + let set = set.clone(); + move |entry: MetaCacheEntry| { + let this = this.clone(); + let bucket = bucket.clone(); + let wk = wk.clone(); + let set = set.clone(); + Box::pin(async move { + wk.take().await; + tokio::spawn(async move { + this.rebalance_entry(bucket, pool_index, entry, set, wk).await; + }); + }) + } + }); + + let set = set.clone(); + let rx = rx.resubscribe(); + let bucket = bucket.clone(); + let wk = wk.clone(); + tokio::spawn(async move { + defer!(|| async { + wk.clone().give().await; + }); + if let Err(err) = set.list_objects_to_rebalance(rx, bucket, rebalance_entry).await { + error!("Rebalance worker {} error: {}", set_idx, err); + } else { + info!("Rebalance worker {} done", set_idx); + } + }); + } + + wk.wait().await; + Ok(()) + } + + pub async fn save_rebalance_stats(&self, pool_idx: usize, opt: RebalSaveOpt) -> Result<()> { + // TODO: NSLOOK + + let mut meta = RebalanceMeta::new(); + meta.load_with_opts( + self.pools[0].clone(), + ObjectOptions { + no_lock: true, + ..Default::default() + }, + ) + .await?; + + if opt == RebalSaveOpt::StoppedAt { + meta.stopped_at = Some(SystemTime::now()); + } + + let mut rebalance_meta = self.rebalance_meta.write().await; + + if let Some(rb) = rebalance_meta.as_mut() { + if opt == RebalSaveOpt::Stats { + meta.pool_stats[pool_idx] = rb.pool_stats[pool_idx].clone(); + } + + *rb = meta; + } else { + *rebalance_meta = Some(meta); + } + + if let Some(meta) = rebalance_meta.as_mut() { + meta.save_with_opts( + self.pools[0].clone(), + ObjectOptions { + no_lock: true, + ..Default::default() + }, + ) + .await?; + } + + Ok(()) + } +} + +impl SetDisks { + pub async fn list_objects_to_rebalance( + self: &Arc, + rx: B_Receiver, + bucket: String, + cb: ListCallback, + ) -> Result<()> { + // Placeholder for actual object listing logic + let (disks, _) = self.get_online_disks_with_healing(false).await; + if disks.is_empty() { + return Err(Error::msg("errNoDiskAvailable")); + } + + let listing_quorum = self.set_drive_count.div_ceil(2); + + let resolver = MetadataResolutionParams { + dir_quorum: listing_quorum, + obj_quorum: listing_quorum, + bucket: bucket.clone(), + ..Default::default() + }; + + let cb1 = cb.clone(); + list_path_raw( + rx, + ListPathRawOptions { + disks: disks.iter().cloned().map(Some).collect(), + bucket: bucket.clone(), + recursice: true, + min_disks: listing_quorum, + agreed: Some(Box::new(move |entry: MetaCacheEntry| Box::pin(cb1(entry.clone())))), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + // let cb = cb.clone(); + let resolver = resolver.clone(); + if let Ok(Some(entry)) = entries.resolve(resolver) { + cb(entry) + } else { + Box::pin(async {}) + } + })), + ..Default::default() + }, + ) + .await?; + + Ok(()) + } +} diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 793e31a2..037f82c9 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,15 +1,15 @@ #![allow(clippy::map_entry)] -use crate::bucket::metadata_sys::{self, init_bucket_metadata_sys, set_bucket_metadata}; +use crate::bucket::metadata_sys::{self, set_bucket_metadata}; use crate::bucket::utils::{check_valid_bucket_name, check_valid_bucket_name_strict, is_meta_bucketname}; +use crate::config::storageclass; use crate::config::GLOBAL_StorageClass; -use crate::config::{self, storageclass, GLOBAL_ConfigSys}; use crate::disk::endpoint::{Endpoint, EndpointType}; use crate::disk::{DiskAPI, DiskInfo, DiskInfoOptions, MetaCacheEntry}; use crate::error::clone_err; use crate::global::{ - is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION, - DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, + get_global_endpoints, is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, + DISK_FILL_FRACTION, DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, }; use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT}; use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo}; @@ -18,10 +18,11 @@ use crate::heal::heal_ops::{HealEntryFn, HealSequence}; use crate::new_object_layer_fn; use crate::notification_sys::get_global_notification_sys; use crate::pools::PoolMeta; +use crate::rebalance::RebalanceMeta; use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO}; use crate::store_err::{ - is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found, - to_object_err, StorageError, + is_err_bucket_exists, is_err_decommission_already_running, is_err_invalid_upload_id, is_err_object_not_found, + is_err_read_quorum, is_err_version_not_found, to_object_err, StorageError, }; use crate::store_init::ec_drives_no_config; use crate::utils::crypto::base64_decode; @@ -40,6 +41,7 @@ use crate::{ }, store_init, utils, }; + use common::error::{Error, Result}; use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port}; use futures::future::join_all; @@ -59,6 +61,7 @@ use tokio::select; use tokio::sync::mpsc::Sender; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio::time::{interval, sleep}; +use tracing::error; use tracing::{debug, info}; use uuid::Uuid; @@ -73,6 +76,7 @@ pub struct ECStore { pub peer_sys: S3PeerSys, // pub local_disks: Vec, pub pool_meta: RwLock, + pub rebalance_meta: RwLock>, pub decommission_cancelers: Vec>, } @@ -209,7 +213,7 @@ impl ECStore { } let peer_sys = S3PeerSys::new(&endpoint_pools); - let mut pool_meta = PoolMeta::new(pools.clone()); + let mut pool_meta = PoolMeta::new(&pools, &PoolMeta::default()); pool_meta.dont_save = true; let decommission_cancelers = vec![None; pools.len()]; @@ -219,35 +223,96 @@ impl ECStore { pools, peer_sys, pool_meta: pool_meta.into(), + rebalance_meta: RwLock::new(None), decommission_cancelers, }); - set_object_layer(ec.clone()).await; - if let Some(dep_id) = deployment_id { set_global_deployment_id(dep_id); } + let wait_sec = 5; + loop { + if let Err(err) = ec.init().await { + error!("init err: {}", err); + error!("retry after {} second", wait_sec); + sleep(Duration::from_secs(wait_sec)).await; + continue; + } + + break; + } + + set_object_layer(ec.clone()).await; + Ok(ec) } - pub async fn init(api: Arc) -> Result<()> { - config::init(); - GLOBAL_ConfigSys.init(api.clone()).await?; + pub async fn init(self: &Arc) -> Result<()> { + if self.load_rebalance_meta().await.is_ok() { + self.start_rebalance().await; + } - let buckets_list = api - .list_bucket(&BucketOptions { - no_metadata: true, - ..Default::default() - }) - .await - .map_err(|err| Error::from_string(err.to_string()))?; + let mut meta = PoolMeta::default(); + meta.load(self.pools[0].clone(), self.pools.clone()).await?; + let update = meta.validate(self.pools.clone())?; - let buckets = buckets_list.iter().map(|v| v.name.clone()).collect(); + if update { + { + let mut pool_meta = self.pool_meta.write().await; + *pool_meta = meta.clone(); + } + } else { + let new_meta = PoolMeta::new(&self.pools, &meta); + new_meta.save(self.pools.clone()).await?; + { + let mut pool_meta = self.pool_meta.write().await; + *pool_meta = new_meta; + } + } - // FIXME: + let pools = meta.return_resumable_pools(); + let mut pool_indeces = Vec::with_capacity(pools.len()); - init_bucket_metadata_sys(api.clone(), buckets).await; + let endpoints = get_global_endpoints(); + + for p in pools.iter() { + if let Some(idx) = endpoints.get_pool_idx(&p.cmd_line) { + pool_indeces.push(idx); + } else { + return Err(Error::msg(format!( + "unexpected state present for decommission status pool({}) not found", + p.cmd_line + ))); + } + } + + if !pool_indeces.is_empty() { + let idx = pool_indeces[0]; + if endpoints.as_ref()[idx].endpoints.as_ref()[0].is_local { + let (_tx, rx) = broadcast::channel(1); + + let store = self.clone(); + + tokio::spawn(async move { + // wait 3 minutes for cluster init + tokio::time::sleep(Duration::from_secs(60 * 3)).await; + + if let Err(err) = store.decommission(rx.resubscribe(), pool_indeces.clone()).await { + if is_err_decommission_already_running(&err) { + for i in pool_indeces.iter() { + store.do_decommission_in_routine(rx.resubscribe(), *i).await; + } + return; + } + + error!("store init decommission err: {}", err); + + // TODO: check config err + } + }); + } + } Ok(()) } @@ -942,7 +1007,7 @@ impl ECStore { pub async fn reload_pool_meta(&self) -> Result<()> { let mut meta = PoolMeta::default(); - meta.load().await?; + meta.load(self.pools[0].clone(), self.pools.clone()).await?; let mut pool_meta = self.pool_meta.write().await; *pool_meta = meta; @@ -1909,7 +1974,7 @@ impl StorageAPI for ECStore { async fn abort_multipart_upload(&self, bucket: &str, object: &str, upload_id: &str, opts: &ObjectOptions) -> Result<()> { check_abort_multipart_args(bucket, object, upload_id)?; - // TODO: defer + // TODO: defer DeleteUploadID if self.single_pool() { return self.pools[0].abort_multipart_upload(bucket, object, upload_id, opts).await; diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index b91a9ab2..54661697 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -287,7 +287,7 @@ pub struct ObjectPartInfo { pub size: usize, pub actual_size: usize, // 源数据大小 pub mod_time: Option, - // pub index: Option>, + // pub index: Option>, // TODO: ??? // pub checksums: Option>, } @@ -635,7 +635,7 @@ pub struct MultipartUploadResult { pub upload_id: String, } -#[derive(Debug)] +#[derive(Debug, Default, Clone)] pub struct PartInfo { pub part_num: usize, pub last_mod: Option, @@ -643,7 +643,7 @@ pub struct PartInfo { pub etag: Option, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct CompletePart { pub part_num: usize, pub e_tag: Option, diff --git a/ecstore/src/store_err.rs b/ecstore/src/store_err.rs index be945e63..e45349b3 100644 --- a/ecstore/src/store_err.rs +++ b/ecstore/src/store_err.rs @@ -80,6 +80,8 @@ pub enum StorageError { #[error("Decommission not started")] DecommissionNotStarted, + #[error("Decommission already running")] + DecommissionAlreadyRunning, #[error("DoneForNow")] DoneForNow, @@ -115,6 +117,7 @@ impl StorageError { StorageError::InvalidPart(_, _, _) => 0x19, StorageError::VolumeNotFound(_) => 0x20, StorageError::DoneForNow => 0x21, + StorageError::DecommissionAlreadyRunning => 0x22, } } @@ -151,6 +154,7 @@ impl StorageError { 0x19 => Some(StorageError::InvalidPart(Default::default(), Default::default(), Default::default())), 0x20 => Some(StorageError::VolumeNotFound(Default::default())), 0x21 => Some(StorageError::DoneForNow), + 0x22 => Some(StorageError::DecommissionAlreadyRunning), _ => None, } } @@ -241,6 +245,14 @@ pub fn to_object_err(err: Error, params: Vec<&str>) -> Error { err } +pub fn is_err_decommission_already_running(err: &Error) -> bool { + if let Some(e) = err.downcast_ref::() { + matches!(e, StorageError::DecommissionAlreadyRunning) + } else { + false + } +} + pub fn is_err_data_movement_overwrite(err: &Error) -> bool { if let Some(e) = err.downcast_ref::() { matches!(e, StorageError::DataMovementOverwriteErr(_, _, _)) diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 1152f2ee..376c5743 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -50,6 +50,7 @@ use tracing::{error, info, warn}; pub mod group; pub mod policys; pub mod pools; +pub mod rebalance; pub mod service_account; pub mod sts; pub mod trace; @@ -735,40 +736,6 @@ impl Operation for BackgroundHealStatusHandler { } } -pub struct RebalanceStart {} - -#[async_trait::async_trait] -impl Operation for RebalanceStart { - async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle RebalanceStart"); - - return Err(s3_error!(NotImplemented)); - } -} - -// RebalanceStatus -pub struct RebalanceStatus {} - -#[async_trait::async_trait] -impl Operation for RebalanceStatus { - async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle RebalanceStatus"); - - return Err(s3_error!(NotImplemented)); - } -} -// RebalanceStop -pub struct RebalanceStop {} - -#[async_trait::async_trait] -impl Operation for RebalanceStop { - async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle RebalanceStop"); - - return Err(s3_error!(NotImplemented)); - } -} - #[cfg(test)] mod test { use ecstore::heal::heal_commands::HealOpts; diff --git a/rustfs/src/admin/handlers/rebalance.rs b/rustfs/src/admin/handlers/rebalance.rs new file mode 100644 index 00000000..06b71e67 --- /dev/null +++ b/rustfs/src/admin/handlers/rebalance.rs @@ -0,0 +1,238 @@ +use ecstore::{ + new_object_layer_fn, + notification_sys::get_global_notification_sys, + rebalance::{DiskStat, RebalSaveOpt}, + store_api::BucketOptions, + StorageAPI, +}; +use http::{HeaderMap, StatusCode}; +use matchit::Params; +use s3s::{header::CONTENT_TYPE, s3_error, Body, S3Request, S3Response, S3Result}; +use serde::{Deserialize, Serialize}; +use std::time::{Duration, SystemTime}; +use tracing::warn; + +use crate::admin::router::Operation; +use ecstore::rebalance::RebalanceMeta; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RebalanceResp { + pub id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct RebalPoolProgress { + #[serde(rename = "objects")] + pub num_objects: u64, + #[serde(rename = "versions")] + pub num_versions: u64, + #[serde(rename = "bytes")] + pub bytes: u64, + #[serde(rename = "bucket")] + pub bucket: String, + #[serde(rename = "object")] + pub object: String, + #[serde(rename = "elapsed")] + pub elapsed: Duration, + #[serde(rename = "eta")] + pub eta: Duration, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct RebalancePoolStatus { + #[serde(rename = "id")] + pub id: usize, // Pool index (zero-based) + #[serde(rename = "status")] + pub status: String, // Active if rebalance is running, empty otherwise + #[serde(rename = "used")] + pub used: f64, // Percentage used space + #[serde(rename = "progress")] + pub progress: Option, // None when rebalance is not running +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct RebalanceAdminStatus { + pub id: String, // Identifies the ongoing rebalance operation by a UUID + #[serde(rename = "pools")] + pub pools: Vec, // Contains all pools, including inactive + #[serde(rename = "stoppedAt")] + pub stopped_at: Option, // Optional timestamp when rebalance was stopped +} + +pub struct RebalanceStart {} + +#[async_trait::async_trait] +impl Operation for RebalanceStart { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle RebalanceStart"); + + let Some(store) = new_object_layer_fn() else { + return Err(s3_error!(InternalError, "Not init")); + }; + + if store.pools.len() == 1 { + return Err(s3_error!(NotImplemented)); + } + + if store.is_decommission_running().await { + return Err(s3_error!( + InternalError, + "Rebalance cannot be started, decommission is already in progress" + )); + } + + if store.is_rebalance_started().await { + return Err(s3_error!(InternalError, "Rebalance already in progress")); + } + + let bucket_infos = store + .list_bucket(&BucketOptions::default()) + .await + .map_err(|e| s3_error!(InternalError, "Failed to list buckets: {}", e))?; + + let buckets: Vec = bucket_infos.into_iter().map(|bucket| bucket.name).collect(); + + let id = match store.init_rebalance_meta(buckets).await { + Ok(id) => id, + Err(e) => { + return Err(s3_error!(InternalError, "Failed to init rebalance meta: {}", e)); + } + }; + + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.load_rebalance_meta(true).await; + } + + let resp = RebalanceResp { id }; + let data = serde_json::to_string(&resp).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) + } +} + +// RebalanceStatus +pub struct RebalanceStatus {} + +#[async_trait::async_trait] +impl Operation for RebalanceStatus { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle RebalanceStatus"); + + let Some(store) = new_object_layer_fn() else { + return Err(s3_error!(InternalError, "Not init")); + }; + + let mut meta = RebalanceMeta::new(); + meta.load(store.pools[0].clone()) + .await + .map_err(|e| s3_error!(InternalError, "Failed to load rebalance meta: {}", e))?; + + // Compute disk usage percentage + let si = store.storage_info().await; + let mut disk_stats = vec![DiskStat::default(); store.pools.len()]; + + for disk in si.disks.iter() { + if disk.pool_index < 0 || disk_stats.len() <= disk.pool_index as usize { + continue; + } + disk_stats[disk.pool_index as usize].available_space += disk.available_space; + disk_stats[disk.pool_index as usize].total_space += disk.total_space; + } + + let stop_time = meta.stopped_at; + let mut admin_status = RebalanceAdminStatus { + id: meta.id.clone(), + stopped_at: meta.stopped_at, + pools: vec![RebalancePoolStatus::default(); meta.pool_stats.len()], + }; + + for (i, ps) in meta.pool_stats.iter().enumerate() { + admin_status.pools[i] = RebalancePoolStatus { + id: i, + status: ps.info.status.to_string(), + used: (disk_stats[i].total_space - disk_stats[i].available_space) as f64 / disk_stats[i].total_space as f64, + progress: None, + }; + + if !ps.participating { + continue; + } + + // Calculate total bytes to be rebalanced + let total_bytes_to_rebal = ps.init_capacity as f64 * meta.percent_free_goal - ps.init_free_space as f64; + + let elapsed = if let Some(start_time) = ps.info.start_time { + SystemTime::now() + .duration_since(start_time) + .map_err(|e| s3_error!(InternalError, "Failed to calculate elapsed time: {}", e))? + } else { + return Err(s3_error!(InternalError, "Start time is not available")); + }; + + let eta = if ps.bytes > 0 { + Duration::from_secs_f64(total_bytes_to_rebal * elapsed.as_secs_f64() / ps.bytes as f64) + } else { + Duration::ZERO + }; + + let stop_time = ps.info.end_time.unwrap_or(stop_time.unwrap_or(SystemTime::now())); + + let elapsed = if ps.info.end_time.is_some() || meta.stopped_at.is_some() { + stop_time + .duration_since(ps.info.start_time.unwrap_or(stop_time)) + .unwrap_or_default() + } else { + elapsed + }; + + admin_status.pools[i].progress = Some(RebalPoolProgress { + num_objects: ps.num_objects, + num_versions: ps.num_versions, + bytes: ps.bytes, + bucket: ps.bucket.clone(), + object: ps.object.clone(), + elapsed, + eta, + }); + } + + let data = + serde_json::to_string(&admin_status).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) + } +} + +// RebalanceStop +pub struct RebalanceStop {} + +#[async_trait::async_trait] +impl Operation for RebalanceStop { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle RebalanceStop"); + + let Some(store) = new_object_layer_fn() else { + return Err(s3_error!(InternalError, "Not init")); + }; + + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.stop_rebalance().await; + } + + store + .save_rebalance_stats(0, RebalSaveOpt::StoppedAt) + .await + .map_err(|e| s3_error!(InternalError, "Failed to stop rebalance: {}", e))?; + + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.load_rebalance_meta(true).await; + } + + return Err(s3_error!(NotImplemented)); + } +} diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 0289cce2..57293d4e 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -6,7 +6,7 @@ pub mod utils; use common::error::Result; // use ecstore::global::{is_dist_erasure, is_erasure}; use handlers::{ - group, policys, pools, + group, policys, pools, rebalance, service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount}, sts, user, }; @@ -94,17 +94,17 @@ pub fn make_admin_route() -> Result { r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/rebalance/start").as_str(), - AdminOperation(&handlers::RebalanceStart {}), + AdminOperation(&rebalance::RebalanceStart {}), )?; r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/rebalance/status").as_str(), - AdminOperation(&handlers::RebalanceStatus {}), + AdminOperation(&rebalance::RebalanceStatus {}), )?; r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/rebalance/stop").as_str(), - AdminOperation(&handlers::RebalanceStop {}), + AdminOperation(&rebalance::RebalanceStop {}), )?; // Some APIs are only available in EC mode diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index c1787915..f2fde2c2 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -2360,23 +2360,46 @@ impl Node for NodeService { } async fn stop_rebalance(&self, _request: Request) -> Result, Status> { - let Some(_store) = new_object_layer_fn() else { + let Some(store) = new_object_layer_fn() else { return Ok(tonic::Response::new(StopRebalanceResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; - // todo - // store.stop_rebalance().await; - todo!() + let _ = store.stop_rebalance().await; + Ok(tonic::Response::new(StopRebalanceResponse { + success: true, + error_info: None, + })) } async fn load_rebalance_meta( &self, - _request: Request, + request: Request, ) -> Result, Status> { - todo!() + let Some(store) = new_object_layer_fn() else { + return Ok(tonic::Response::new(LoadRebalanceMetaResponse { + success: false, + error_info: Some("errServerNotInitialized".to_string()), + })); + }; + + let LoadRebalanceMetaRequest { start_rebalance } = request.into_inner(); + + store.load_rebalance_meta().await.map_err(|err| { + error!("load_rebalance_meta err {:?}", err); + Status::internal(err.to_string()) + })?; + + if start_rebalance { + let store = store.clone(); + tokio::spawn(async move { + store.start_rebalance().await; + }); + } + + unimplemented!() } async fn load_transition_tier_config( diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index d7ea61d1..8492d057 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -21,8 +21,13 @@ use common::{ globals::set_global_addr, }; use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +use ecstore::bucket::metadata_sys::init_bucket_metadata_sys; +use ecstore::config as ecconfig; +use ecstore::config::GLOBAL_ConfigSys; use ecstore::heal::background_heal_ops::init_auto_heal; +use ecstore::store_api::BucketOptions; use ecstore::utils::net::{self, get_available_port}; +use ecstore::StorageAPI; use ecstore::{ endpoints::EndpointServerPools, heal::data_scanner::init_data_scanner, @@ -379,11 +384,20 @@ async fn run(opt: config::Opt) -> Result<()> { Error::from_string(err.to_string()) })?; - ECStore::init(store.clone()).await.map_err(|err| { - error!("ECStore init failed {:?}", &err); - Error::from_string(err.to_string()) - })?; - debug!("init store success!"); + ecconfig::init(); + GLOBAL_ConfigSys.init(store.clone()).await?; + + let buckets_list = store + .list_bucket(&BucketOptions { + no_metadata: true, + ..Default::default() + }) + .await + .map_err(|err| Error::from_string(err.to_string()))?; + + let buckets = buckets_list.into_iter().map(|v| v.name).collect(); + + init_bucket_metadata_sys(store.clone(), buckets).await; init_iam_sys(store.clone()).await?; diff --git a/rustfs/src/storage/error.rs b/rustfs/src/storage/error.rs index c902390e..ec5a5838 100644 --- a/rustfs/src/storage/error.rs +++ b/rustfs/src/storage/error.rs @@ -51,6 +51,7 @@ pub fn to_s3_error(err: Error) -> S3Error { object, version_id ), + // extended StorageError::ObjectExistsAsDirectory(bucket, object) => { s3_error!(InvalidArgument, "Object exists on :{} as directory {}", bucket, object) @@ -62,6 +63,7 @@ pub fn to_s3_error(err: Error) -> S3Error { s3_error!(SlowDown, "Storage resources are insufficient for the write operation") } StorageError::DecommissionNotStarted => s3_error!(InvalidArgument, "Decommission Not Started"), + StorageError::DecommissionAlreadyRunning => s3_error!(InternalError, "Decommission already running"), StorageError::VolumeNotFound(bucket) => { s3_error!(NoSuchBucket, "bucket not found {}", bucket)