From b79b9670361195669f5c77cd3cbcf23a05ab4106 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 26 Nov 2024 17:34:18 +0800 Subject: [PATCH] move admin router to rustfs, add pool stats/list admin api --- Cargo.lock | 41 +---- Cargo.toml | 2 +- ecstore/src/bucket/metadata_sys.rs | 7 +- ecstore/src/endpoints.rs | 13 +- ecstore/src/global.rs | 28 ++-- ecstore/src/lib.rs | 2 +- ecstore/src/pools.rs | 110 ++++++++++++-- ecstore/src/store.rs | 5 +- ecstore/src/store_err.rs | 3 + router/Cargo.toml | 24 --- rustfs/Cargo.toml | 3 +- {router/src => rustfs/src/admin}/handlers.rs | 152 ++++++++++++++++++- router/src/lib.rs => rustfs/src/admin/mod.rs | 0 {router/src => rustfs/src/admin}/router.rs | 2 +- rustfs/src/main.rs | 5 +- rustfs/src/storage/error.rs | 1 + 16 files changed, 295 insertions(+), 103 deletions(-) delete mode 100644 router/Cargo.toml rename {router/src => rustfs/src/admin}/handlers.rs (66%) rename router/src/lib.rs => rustfs/src/admin/mod.rs (100%) rename {router/src => rustfs/src/admin}/router.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index 7d74c4d5..3a57111a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2104,26 +2104,6 @@ dependencies = [ "serde", ] -[[package]] -name = "router" -version = "0.0.1" -dependencies = [ - "async-trait", - "common", - "ecstore", - "hyper", - "matchit 0.8.5", - "pin-project-lite", - "quick-xml", - "s3s", - "serde", - "serde-xml-rs", - "serde_json", - "serde_urlencoded", - "time", - "tracing", -] - [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2172,9 +2152,10 @@ dependencies = [ "prost-types", "protobuf", "protos", - "router", "s3s", + "serde", "serde_json", + "serde_urlencoded", "shadow-rs", "time", "tokio", @@ -2335,18 +2316,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-xml-rs" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb3aa78ecda1ebc9ec9847d5d3aba7d618823446a049ba2491940506da6e2782" -dependencies = [ - "log", - "serde", - "thiserror", - "xml-rs", -] - [[package]] name = "serde_derive" version = "1.0.214" @@ -3230,12 +3199,6 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" -[[package]] -name = "xml-rs" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af310deaae937e48a26602b730250b4949e125f468f11e6990be3e5304ddd96f" - [[package]] name = "xxhash-rust" version = "0.8.12" diff --git a/Cargo.toml b/Cargo.toml index a930c063..f0309c78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ "common/protos", "api/admin", "reader", - "router", "common/workers", + "common/workers", ] [workspace.package] diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 03ece0d3..fb45c269 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -149,8 +149,11 @@ impl BucketMetadataSys { } async fn init_internal(&self, buckets: Vec) -> Result<()> { let count = { - let endpoints = GLOBAL_Endpoints.read().await; - endpoints.es_count() * 10 + if let Some(endpoints) = GLOBAL_Endpoints.get() { + endpoints.es_count() * 10 + } else { + return Err(Error::msg("GLOBAL_Endpoints not init")); + } }; let mut failed_buckets: HashSet = HashSet::new(); diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index eeca1614..5bdc086c 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -405,7 +405,7 @@ pub struct PoolEndpoints { } /// list of list of endpoints -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct EndpointServerPools(pub Vec); impl From> for EndpointServerPools { @@ -430,6 +430,17 @@ impl EndpointServerPools { pub fn reset(&mut self, eps: Vec) { self.0 = eps; } + pub fn legacy(&self) -> bool { + self.0.len() == 1 && self.0[0].legacy + } + pub fn get_pool_idx(&self, cmd_line: &str) -> Option { + for (idx, eps) in self.0.iter().enumerate() { + if eps.cmd_line.as_str() == cmd_line { + return Some(idx); + } + } + None + } pub fn from_volumes(server_addr: &str, endpoints: Vec) -> Result<(EndpointServerPools, SetupType)> { let layouts = DisksLayout::from_volumes(endpoints.as_slice())?; diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index e2c46907..1aaad8a6 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -1,11 +1,15 @@ use lazy_static::lazy_static; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, OnceLock}, +}; use tokio::sync::RwLock; use uuid::Uuid; use crate::{ disk::DiskStore, endpoints::{EndpointServerPools, PoolEndpoints, SetupType}, + error::{Error, Result}, heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState}, store::ECStore, }; @@ -23,7 +27,7 @@ lazy_static! { pub static ref GLOBAL_IsErasureSD: RwLock = RwLock::new(false); pub static ref GLOBAL_LOCAL_DISK_MAP: Arc>>> = Arc::new(RwLock::new(HashMap::new())); pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); - pub static ref GLOBAL_Endpoints: RwLock = RwLock::new(EndpointServerPools(Vec::new())); + pub static ref GLOBAL_Endpoints: OnceLock = OnceLock::new(); pub static ref GLOBAL_RootDiskThreshold: RwLock = RwLock::new(0); pub static ref GLOBAL_BackgroundHealRoutine: Arc> = HealRoutine::new(); pub static ref GLOBAL_BackgroundHealState: Arc> = AllHealState::new(false); @@ -40,9 +44,11 @@ pub async fn get_global_deployment_id() -> Uuid { *id_ptr } -pub async fn set_global_endpoints(eps: Vec) { - let mut endpoints = GLOBAL_Endpoints.write().await; - endpoints.reset(eps); +pub fn set_global_endpoints(eps: Vec) -> Result<()> { + GLOBAL_Endpoints + .set(EndpointServerPools::from(eps)) + .map_err(|_| Error::msg("GLOBAL_Endpoints set faild"))?; + Ok(()) } pub fn new_object_layer_fn() -> Arc>> { @@ -84,10 +90,12 @@ pub async fn update_erasure_type(setup_type: SetupType) { *is_erasure_sd = setup_type == SetupType::ErasureSD; } -pub async fn is_legacy() -> bool { - let lock = GLOBAL_Endpoints.read().await; - let endpoints = lock.as_ref(); - endpoints.len() == 1 && endpoints[0].legacy -} +// pub fn is_legacy() -> bool { +// if let Some(endpoints) = GLOBAL_Endpoints.get() { +// endpoints.as_ref().len() == 1 && endpoints.as_ref()[0].legacy +// } else { +// false +// } +// } type TypeLocalDiskSetDrives = Vec>>>; diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 595e7899..0edb1a46 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -26,7 +26,7 @@ pub mod pools; pub mod store_err; pub mod xhttp; -pub use global::is_legacy; pub use global::new_object_layer_fn; pub use global::set_global_endpoints; pub use global::update_erasure_type; +pub use global::GLOBAL_Endpoints; diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index 44646a4c..53a22392 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -1,10 +1,12 @@ use crate::error::{Error, Result}; use crate::store_api::{StorageAPI, StorageDisk, StorageInfo}; +use crate::store_err::StorageError; use crate::{sets::Sets, store::ECStore}; +use serde::Serialize; use std::sync::Arc; use time::OffsetDateTime; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct PoolStatus { pub id: usize, pub cmd_line: String, @@ -43,11 +45,36 @@ impl PoolMeta { self.pools[idx].decommission.is_some() } + + pub fn decommission_cancel(&mut self, idx: usize) -> bool { + if let Some(stats) = self.pools.get_mut(idx) { + if let Some(d) = &stats.decommission { + if !d.canceled { + stats.last_update = OffsetDateTime::now_utc(); + + let mut pd = d.clone(); + pd.start_time = None; + pd.canceled = true; + pd.failed = false; + pd.complete = false; + + stats.decommission = Some(pd); + true + } else { + false + } + } else { + false + } + } else { + false + } + } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Default)] pub struct PoolDecommissionInfo { - pub start_time: OffsetDateTime, + pub start_time: Option, pub start_size: usize, pub total_size: usize, pub current_size: usize, @@ -74,27 +101,84 @@ pub struct PoolSpaceInfo { } impl ECStore { - pub fn status(&self, _idx: usize) -> Result { - unimplemented!() + pub async fn status(&self, idx: usize) -> Result { + let space_info = self.get_decommission_pool_space_info(idx).await?; + let mut pool_info = self.pool_meta.pools[idx].clone(); + if let Some(d) = pool_info.decommission.as_mut() { + d.total_size = space_info.total; + d.current_size = space_info.free; + } else { + pool_info.decommission = Some(PoolDecommissionInfo { + total_size: space_info.total, + current_size: space_info.free, + ..Default::default() + }); + } + + Ok(pool_info) } - async fn _get_decommission_pool_space_info(&self, idx: usize) -> Result { + async fn get_decommission_pool_space_info(&self, idx: usize) -> Result { if let Some(sets) = self.pools.get(idx) { let mut info = sets.storage_info().await; info.backend = self.backend_info().await; - unimplemented!() + let total = get_total_usable_capacity(&info.disks, &info); + let free = get_total_usable_capacity_free(&info.disks, &info); + + Ok(PoolSpaceInfo { + free, + total, + used: total - free, + }) } else { Err(Error::msg("InvalidArgument")) } } + + pub async fn decommission_cancel(&mut self, idx: usize) -> Result<()> { + if self.single_pool() { + return Err(Error::msg("InvalidArgument")); + } + + let Some(has_canceler) = self.decommission_cancelers.get(idx) else { + return Err(Error::msg("InvalidArgument")); + }; + + if has_canceler.is_none() { + return Err(Error::new(StorageError::DecommissionNotStarted)); + } + + if self.pool_meta.decommission_cancel(idx) { + // FIXME: + } + + unimplemented!() + } } -fn _get_total_usable_capacity(disks: &Vec, _info: &StorageInfo) -> usize { - for _disk in disks.iter() { - // if disk.pool_index < 0 || info.backend.standard_scdata.len() <= disk.pool_index { - // continue; - // } +fn get_total_usable_capacity(disks: &Vec, info: &StorageInfo) -> usize { + let mut capacity = 0; + for disk in disks.iter() { + if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { + continue; + } + if (disk.disk_index as usize) < info.backend.standard_sc_data[disk.pool_index as usize] { + capacity += disk.total_space as usize; + } } - unimplemented!() + capacity +} + +fn get_total_usable_capacity_free(disks: &Vec, info: &StorageInfo) -> usize { + let mut capacity = 0; + for disk in disks.iter() { + if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { + continue; + } + if (disk.disk_index as usize) < info.backend.standard_sc_data[disk.pool_index as usize] { + capacity += disk.available_space as usize; + } + } + capacity } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 0ac0784b..927bcdc7 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -77,6 +77,7 @@ pub struct ECStore { pub peer_sys: S3PeerSys, // pub local_disks: Vec, pub pool_meta: PoolMeta, + pub decommission_cancelers: Vec>, } impl ECStore { @@ -197,12 +198,14 @@ impl ECStore { let mut pool_meta = PoolMeta::new(pools.clone()); pool_meta.dont_save = true; + let decommission_cancelers = vec![None; pools.len()]; let ec = ECStore { id: deployment_id.unwrap(), disk_map, pools, peer_sys, pool_meta, + decommission_cancelers, }; set_object_layer(ec.clone()).await; @@ -239,7 +242,7 @@ impl ECStore { // self.local_disks.clone() // } - fn single_pool(&self) -> bool { + pub fn single_pool(&self) -> bool { self.pools.len() == 1 } diff --git a/ecstore/src/store_err.rs b/ecstore/src/store_err.rs index 2ca4af46..ca7f5877 100644 --- a/ecstore/src/store_err.rs +++ b/ecstore/src/store_err.rs @@ -71,6 +71,9 @@ pub enum StorageError { #[error("Storage resources are insufficient for the write operation")] InsufficientWriteQuorum, + + #[error("Decommission not started")] + DecommissionNotStarted, } pub fn to_object_err(err: Error, params: Vec<&str>) -> Error { diff --git a/router/Cargo.toml b/router/Cargo.toml deleted file mode 100644 index 447cd0ec..00000000 --- a/router/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "router" -edition.workspace = true -license.workspace = true -repository.workspace = true -rust-version.workspace = true -version.workspace = true - - -[dependencies] -async-trait.workspace = true -tracing.workspace = true -s3s.workspace = true -hyper.workspace = true -matchit = "0.8.4" -pin-project-lite.workspace = true -common.workspace = true -serde_urlencoded = "0.7.1" -serde.workspace = true -quick-xml = "0.37.0" -serde-xml-rs = "0.6.0" -ecstore.workspace = true -time.workspace = true -serde_json.workspace = true diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index ee30bf12..57f84ab1 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -53,11 +53,12 @@ transform-stream.workspace = true uuid = "1.11.0" admin = { path = "../api/admin" } axum.workspace = true -router = { version = "0.0.1", path = "../router" } matchit = "0.8.4" shadow-rs = "0.35.2" const-str = { version = "0.5.7", features = ["std", "proc"] } atoi = "2.0.0" +serde.workspace = true +serde_urlencoded = "0.7.1" [build-dependencies] prost-build.workspace = true diff --git a/router/src/handlers.rs b/rustfs/src/admin/handlers.rs similarity index 66% rename from router/src/handlers.rs rename to rustfs/src/admin/handlers.rs index ef334d12..377f355d 100644 --- a/router/src/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -1,12 +1,12 @@ -use std::collections::HashSet; - -use crate::router::Operation; +use super::router::Operation; +use crate::storage::error::to_s3_error; use ecstore::bucket::policy::action::{Action, ActionSet}; use ecstore::bucket::policy::bucket_policy::{BPStatement, BucketPolicy}; use ecstore::bucket::policy::effect::Effect; use ecstore::bucket::policy::resource::{Resource, ResourceSet}; use ecstore::store_api::StorageAPI; use ecstore::utils::xml; +use ecstore::GLOBAL_Endpoints; use ecstore::{new_object_layer_fn, store_api::BackendInfo}; use hyper::StatusCode; use matchit::Params; @@ -17,8 +17,10 @@ use s3s::{ }; use serde::{Deserialize, Serialize}; use serde_urlencoded::from_bytes; +use std::collections::HashSet; use time::{Duration, OffsetDateTime}; use tracing::warn; + #[derive(Deserialize, Debug, Default)] #[serde(rename_all = "PascalCase", default)] pub struct AssumeRoleRequest { @@ -251,21 +253,107 @@ pub struct ListPools {} #[async_trait::async_trait] impl Operation for ListPools { + // GET //pools/list async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle ListPools"); - return Err(s3_error!(NotImplemented)); + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())), + }; + + let Some(endpoints) = GLOBAL_Endpoints.get() else { + return Err(s3_error!(NotImplemented)); + }; + + if endpoints.legacy() { + return Err(s3_error!(NotImplemented)); + } + + let mut pools_status = Vec::new(); + + for (idx, _) in endpoints.as_ref().iter().enumerate() { + let state = store.status(idx).await.map_err(to_s3_error)?; + + pools_status.push(state); + } + + let output = serde_json::to_string(&pools_status) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } +#[derive(Debug, Deserialize, Default)] +#[serde(default)] +pub struct StatusPoolQuery { + pub pool: String, + #[serde(rename = "by-id")] + pub by_id: String, +} + pub struct StatusPool {} #[async_trait::async_trait] impl Operation for StatusPool { - async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + // GET //pools/status?pool=http://server{1...4}/disk{1...4} + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle StatusPool"); - return Err(s3_error!(NotImplemented)); + let Some(endpoints) = GLOBAL_Endpoints.get() else { + return Err(s3_error!(NotImplemented)); + }; + + if endpoints.legacy() { + return Err(s3_error!(NotImplemented)); + } + + let query = { + if let Some(query) = req.uri.query() { + let input: StatusPoolQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidRequest, "get body failed"))?; + input + } else { + StatusPoolQuery::default() + } + }; + + let is_byid = query.by_id.as_str() == "true"; + + let has_idx = { + if is_byid { + let a = query.pool.parse::().unwrap_or_default(); + if a < endpoints.as_ref().len() { + Some(a) + } else { + None + } + } else { + endpoints.get_pool_idx(&query.pool) + } + }; + + let Some(idx) = has_idx else { + warn!("specified pool {} not found, please specify a valid pool", &query.pool); + return Err(s3_error!(InvalidArgument)); + }; + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())), + }; + + let pools_status = store.status(idx).await.map_err(to_s3_error)?; + + let output = serde_json::to_string(&pools_status) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } @@ -273,6 +361,7 @@ pub struct StartDecommission {} #[async_trait::async_trait] impl Operation for StartDecommission { + // POST //pools/decommission?pool=http://server{1...4}/disk{1...4} async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle StartDecommission"); @@ -284,9 +373,58 @@ pub struct CancelDecommission {} #[async_trait::async_trait] impl Operation for CancelDecommission { - async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + // POST //pools/cancel?pool=http://server{1...4}/disk{1...4} + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle CancelDecommission"); + let Some(endpoints) = GLOBAL_Endpoints.get() else { + return Err(s3_error!(NotImplemented)); + }; + + if endpoints.legacy() { + return Err(s3_error!(NotImplemented)); + } + + let query = { + if let Some(query) = req.uri.query() { + let input: StatusPoolQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidRequest, "get body failed"))?; + input + } else { + StatusPoolQuery::default() + } + }; + + let is_byid = query.by_id.as_str() == "true"; + + let has_idx = { + if is_byid { + let a = query.pool.parse::().unwrap_or_default(); + if a < endpoints.as_ref().len() { + Some(a) + } else { + None + } + } else { + endpoints.get_pool_idx(&query.pool) + } + }; + + let Some(_idx) = has_idx else { + warn!("specified pool {} not found, please specify a valid pool", &query.pool); + return Err(s3_error!(InvalidArgument)); + }; + + let layer = new_object_layer_fn(); + let lock = layer.write().await; + let _store = match lock.as_ref() { + Some(s) => s, + None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())), + }; + + // FIXME: + // store.decommission_cancel(idx).await; + return Err(s3_error!(NotImplemented)); } } diff --git a/router/src/lib.rs b/rustfs/src/admin/mod.rs similarity index 100% rename from router/src/lib.rs rename to rustfs/src/admin/mod.rs diff --git a/router/src/router.rs b/rustfs/src/admin/router.rs similarity index 98% rename from router/src/router.rs rename to rustfs/src/admin/router.rs index 9ac06501..18193d09 100644 --- a/router/src/router.rs +++ b/rustfs/src/admin/router.rs @@ -14,7 +14,7 @@ use s3s::S3Request; use s3s::S3Response; use s3s::S3Result; -use crate::ADMIN_PREFIX; +use super::ADMIN_PREFIX; pub struct S3Router { router: Router, diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 381ce46a..00cee910 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -1,3 +1,4 @@ +mod admin; mod config; mod grpc; mod service; @@ -84,7 +85,7 @@ async fn run(opt: config::Opt) -> Result<()> { ); } - set_global_endpoints(endpoint_pools.as_ref().clone()).await; + set_global_endpoints(endpoint_pools.as_ref().clone()).map_err(|err| Error::from_string(err.to_string()))?; update_erasure_type(setup_type).await; // 初始化本地磁盘 @@ -114,7 +115,7 @@ async fn run(opt: config::Opt) -> Result<()> { b.set_access(store.clone()); - b.set_route(router::make_admin_route()?); + b.set_route(admin::make_admin_route()?); // // Enable parsing virtual-hosted-style requests // if let Some(dm) = opt.domain_name { diff --git a/rustfs/src/storage/error.rs b/rustfs/src/storage/error.rs index c767339b..53aad658 100644 --- a/rustfs/src/storage/error.rs +++ b/rustfs/src/storage/error.rs @@ -61,6 +61,7 @@ pub fn to_s3_error(err: Error) -> S3Error { StorageError::InsufficientWriteQuorum => { s3_error!(SlowDown, "Storage resources are insufficient for the write operation") } + StorageError::DecommissionNotStarted => s3_error!(InvalidArgument, "Decommission Not Started"), }; }