move admin router to rustfs, add pool stats/list admin api

This commit is contained in:
weisd
2024-11-26 17:34:18 +08:00
parent d0a06d0b92
commit b53bb46bdf
16 changed files with 295 additions and 103 deletions

41
Cargo.lock generated
View File

@@ -2104,26 +2104,6 @@ dependencies = [
"serde",
]
[[package]]
name = "router"
version = "0.0.1"
dependencies = [
"async-trait",
"common",
"ecstore",
"hyper",
"matchit 0.8.5",
"pin-project-lite",
"quick-xml",
"s3s",
"serde",
"serde-xml-rs",
"serde_json",
"serde_urlencoded",
"time",
"tracing",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@@ -2172,9 +2152,10 @@ dependencies = [
"prost-types",
"protobuf",
"protos",
"router",
"s3s",
"serde",
"serde_json",
"serde_urlencoded",
"shadow-rs",
"time",
"tokio",
@@ -2335,18 +2316,6 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-xml-rs"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb3aa78ecda1ebc9ec9847d5d3aba7d618823446a049ba2491940506da6e2782"
dependencies = [
"log",
"serde",
"thiserror",
"xml-rs",
]
[[package]]
name = "serde_derive"
version = "1.0.214"
@@ -3230,12 +3199,6 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "xml-rs"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af310deaae937e48a26602b730250b4949e125f468f11e6990be3e5304ddd96f"
[[package]]
name = "xxhash-rust"
version = "0.8.12"

View File

@@ -9,7 +9,7 @@ members = [
"common/protos",
"api/admin",
"reader",
"router", "common/workers",
"common/workers",
]
[workspace.package]

View File

@@ -149,8 +149,11 @@ impl BucketMetadataSys {
}
async fn init_internal(&self, buckets: Vec<String>) -> Result<()> {
let count = {
let endpoints = GLOBAL_Endpoints.read().await;
endpoints.es_count() * 10
if let Some(endpoints) = GLOBAL_Endpoints.get() {
endpoints.es_count() * 10
} else {
return Err(Error::msg("GLOBAL_Endpoints not init"));
}
};
let mut failed_buckets: HashSet<String> = HashSet::new();

View File

@@ -405,7 +405,7 @@ pub struct PoolEndpoints {
}
/// list of list of endpoints
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct EndpointServerPools(pub Vec<PoolEndpoints>);
impl From<Vec<PoolEndpoints>> for EndpointServerPools {
@@ -430,6 +430,17 @@ impl EndpointServerPools {
pub fn reset(&mut self, eps: Vec<PoolEndpoints>) {
self.0 = eps;
}
pub fn legacy(&self) -> bool {
self.0.len() == 1 && self.0[0].legacy
}
pub fn get_pool_idx(&self, cmd_line: &str) -> Option<usize> {
for (idx, eps) in self.0.iter().enumerate() {
if eps.cmd_line.as_str() == cmd_line {
return Some(idx);
}
}
None
}
pub fn from_volumes(server_addr: &str, endpoints: Vec<String>) -> Result<(EndpointServerPools, SetupType)> {
let layouts = DisksLayout::from_volumes(endpoints.as_slice())?;

View File

@@ -1,11 +1,15 @@
use lazy_static::lazy_static;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
disk::DiskStore,
endpoints::{EndpointServerPools, PoolEndpoints, SetupType},
error::{Error, Result},
heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState},
store::ECStore,
};
@@ -23,7 +27,7 @@ lazy_static! {
pub static ref GLOBAL_IsErasureSD: RwLock<bool> = RwLock::new(false);
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
pub static ref GLOBAL_Endpoints: RwLock<EndpointServerPools> = RwLock::new(EndpointServerPools(Vec::new()));
pub static ref GLOBAL_Endpoints: OnceLock<EndpointServerPools> = OnceLock::new();
pub static ref GLOBAL_RootDiskThreshold: RwLock<u64> = RwLock::new(0);
pub static ref GLOBAL_BackgroundHealRoutine: Arc<RwLock<HealRoutine>> = HealRoutine::new();
pub static ref GLOBAL_BackgroundHealState: Arc<RwLock<AllHealState>> = AllHealState::new(false);
@@ -40,9 +44,11 @@ pub async fn get_global_deployment_id() -> Uuid {
*id_ptr
}
pub async fn set_global_endpoints(eps: Vec<PoolEndpoints>) {
let mut endpoints = GLOBAL_Endpoints.write().await;
endpoints.reset(eps);
pub fn set_global_endpoints(eps: Vec<PoolEndpoints>) -> Result<()> {
GLOBAL_Endpoints
.set(EndpointServerPools::from(eps))
.map_err(|_| Error::msg("GLOBAL_Endpoints set faild"))?;
Ok(())
}
pub fn new_object_layer_fn() -> Arc<RwLock<Option<ECStore>>> {
@@ -84,10 +90,12 @@ pub async fn update_erasure_type(setup_type: SetupType) {
*is_erasure_sd = setup_type == SetupType::ErasureSD;
}
pub async fn is_legacy() -> bool {
let lock = GLOBAL_Endpoints.read().await;
let endpoints = lock.as_ref();
endpoints.len() == 1 && endpoints[0].legacy
}
// pub fn is_legacy() -> bool {
// if let Some(endpoints) = GLOBAL_Endpoints.get() {
// endpoints.as_ref().len() == 1 && endpoints.as_ref()[0].legacy
// } else {
// false
// }
// }
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;

View File

@@ -26,7 +26,7 @@ pub mod pools;
pub mod store_err;
pub mod xhttp;
pub use global::is_legacy;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;
pub use global::update_erasure_type;
pub use global::GLOBAL_Endpoints;

View File

@@ -1,10 +1,12 @@
use crate::error::{Error, Result};
use crate::store_api::{StorageAPI, StorageDisk, StorageInfo};
use crate::store_err::StorageError;
use crate::{sets::Sets, store::ECStore};
use serde::Serialize;
use std::sync::Arc;
use time::OffsetDateTime;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct PoolStatus {
pub id: usize,
pub cmd_line: String,
@@ -43,11 +45,36 @@ impl PoolMeta {
self.pools[idx].decommission.is_some()
}
pub fn decommission_cancel(&mut self, idx: usize) -> bool {
if let Some(stats) = self.pools.get_mut(idx) {
if let Some(d) = &stats.decommission {
if !d.canceled {
stats.last_update = OffsetDateTime::now_utc();
let mut pd = d.clone();
pd.start_time = None;
pd.canceled = true;
pd.failed = false;
pd.complete = false;
stats.decommission = Some(pd);
true
} else {
false
}
} else {
false
}
} else {
false
}
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Default)]
pub struct PoolDecommissionInfo {
pub start_time: OffsetDateTime,
pub start_time: Option<OffsetDateTime>,
pub start_size: usize,
pub total_size: usize,
pub current_size: usize,
@@ -74,27 +101,84 @@ pub struct PoolSpaceInfo {
}
impl ECStore {
pub fn status(&self, _idx: usize) -> Result<PoolStatus> {
unimplemented!()
pub async fn status(&self, idx: usize) -> Result<PoolStatus> {
let space_info = self.get_decommission_pool_space_info(idx).await?;
let mut pool_info = self.pool_meta.pools[idx].clone();
if let Some(d) = pool_info.decommission.as_mut() {
d.total_size = space_info.total;
d.current_size = space_info.free;
} else {
pool_info.decommission = Some(PoolDecommissionInfo {
total_size: space_info.total,
current_size: space_info.free,
..Default::default()
});
}
Ok(pool_info)
}
async fn _get_decommission_pool_space_info(&self, idx: usize) -> Result<PoolSpaceInfo> {
async fn get_decommission_pool_space_info(&self, idx: usize) -> Result<PoolSpaceInfo> {
if let Some(sets) = self.pools.get(idx) {
let mut info = sets.storage_info().await;
info.backend = self.backend_info().await;
unimplemented!()
let total = get_total_usable_capacity(&info.disks, &info);
let free = get_total_usable_capacity_free(&info.disks, &info);
Ok(PoolSpaceInfo {
free,
total,
used: total - free,
})
} else {
Err(Error::msg("InvalidArgument"))
}
}
pub async fn decommission_cancel(&mut self, idx: usize) -> Result<()> {
if self.single_pool() {
return Err(Error::msg("InvalidArgument"));
}
let Some(has_canceler) = self.decommission_cancelers.get(idx) else {
return Err(Error::msg("InvalidArgument"));
};
if has_canceler.is_none() {
return Err(Error::new(StorageError::DecommissionNotStarted));
}
if self.pool_meta.decommission_cancel(idx) {
// FIXME:
}
unimplemented!()
}
}
fn _get_total_usable_capacity(disks: &Vec<StorageDisk>, _info: &StorageInfo) -> usize {
for _disk in disks.iter() {
// if disk.pool_index < 0 || info.backend.standard_scdata.len() <= disk.pool_index {
// continue;
// }
fn get_total_usable_capacity(disks: &Vec<StorageDisk>, info: &StorageInfo) -> usize {
let mut capacity = 0;
for disk in disks.iter() {
if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize {
continue;
}
if (disk.disk_index as usize) < info.backend.standard_sc_data[disk.pool_index as usize] {
capacity += disk.total_space as usize;
}
}
unimplemented!()
capacity
}
fn get_total_usable_capacity_free(disks: &Vec<StorageDisk>, info: &StorageInfo) -> usize {
let mut capacity = 0;
for disk in disks.iter() {
if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize {
continue;
}
if (disk.disk_index as usize) < info.backend.standard_sc_data[disk.pool_index as usize] {
capacity += disk.available_space as usize;
}
}
capacity
}

View File

@@ -77,6 +77,7 @@ pub struct ECStore {
pub peer_sys: S3PeerSys,
// pub local_disks: Vec<DiskStore>,
pub pool_meta: PoolMeta,
pub decommission_cancelers: Vec<Option<usize>>,
}
impl ECStore {
@@ -197,12 +198,14 @@ impl ECStore {
let mut pool_meta = PoolMeta::new(pools.clone());
pool_meta.dont_save = true;
let decommission_cancelers = vec![None; pools.len()];
let ec = ECStore {
id: deployment_id.unwrap(),
disk_map,
pools,
peer_sys,
pool_meta,
decommission_cancelers,
};
set_object_layer(ec.clone()).await;
@@ -239,7 +242,7 @@ impl ECStore {
// self.local_disks.clone()
// }
fn single_pool(&self) -> bool {
pub fn single_pool(&self) -> bool {
self.pools.len() == 1
}

View File

@@ -71,6 +71,9 @@ pub enum StorageError {
#[error("Storage resources are insufficient for the write operation")]
InsufficientWriteQuorum,
#[error("Decommission not started")]
DecommissionNotStarted,
}
pub fn to_object_err(err: Error, params: Vec<&str>) -> Error {

View File

@@ -1,24 +0,0 @@
[package]
name = "router"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
async-trait.workspace = true
tracing.workspace = true
s3s.workspace = true
hyper.workspace = true
matchit = "0.8.4"
pin-project-lite.workspace = true
common.workspace = true
serde_urlencoded = "0.7.1"
serde.workspace = true
quick-xml = "0.37.0"
serde-xml-rs = "0.6.0"
ecstore.workspace = true
time.workspace = true
serde_json.workspace = true

View File

@@ -53,11 +53,12 @@ transform-stream.workspace = true
uuid = "1.11.0"
admin = { path = "../api/admin" }
axum.workspace = true
router = { version = "0.0.1", path = "../router" }
matchit = "0.8.4"
shadow-rs = "0.35.2"
const-str = { version = "0.5.7", features = ["std", "proc"] }
atoi = "2.0.0"
serde.workspace = true
serde_urlencoded = "0.7.1"
[build-dependencies]
prost-build.workspace = true

View File

@@ -1,12 +1,12 @@
use std::collections::HashSet;
use crate::router::Operation;
use super::router::Operation;
use crate::storage::error::to_s3_error;
use ecstore::bucket::policy::action::{Action, ActionSet};
use ecstore::bucket::policy::bucket_policy::{BPStatement, BucketPolicy};
use ecstore::bucket::policy::effect::Effect;
use ecstore::bucket::policy::resource::{Resource, ResourceSet};
use ecstore::store_api::StorageAPI;
use ecstore::utils::xml;
use ecstore::GLOBAL_Endpoints;
use ecstore::{new_object_layer_fn, store_api::BackendInfo};
use hyper::StatusCode;
use matchit::Params;
@@ -17,8 +17,10 @@ use s3s::{
};
use serde::{Deserialize, Serialize};
use serde_urlencoded::from_bytes;
use std::collections::HashSet;
use time::{Duration, OffsetDateTime};
use tracing::warn;
#[derive(Deserialize, Debug, Default)]
#[serde(rename_all = "PascalCase", default)]
pub struct AssumeRoleRequest {
@@ -251,21 +253,107 @@ pub struct ListPools {}
#[async_trait::async_trait]
impl Operation for ListPools {
// GET <endpoint>/<admin-API>/pools/list
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ListPools");
return Err(s3_error!(NotImplemented));
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
let Some(endpoints) = GLOBAL_Endpoints.get() else {
return Err(s3_error!(NotImplemented));
};
if endpoints.legacy() {
return Err(s3_error!(NotImplemented));
}
let mut pools_status = Vec::new();
for (idx, _) in endpoints.as_ref().iter().enumerate() {
let state = store.status(idx).await.map_err(to_s3_error)?;
pools_status.push(state);
}
let output = serde_json::to_string(&pools_status)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}
#[derive(Debug, Deserialize, Default)]
#[serde(default)]
pub struct StatusPoolQuery {
pub pool: String,
#[serde(rename = "by-id")]
pub by_id: String,
}
pub struct StatusPool {}
#[async_trait::async_trait]
impl Operation for StatusPool {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// GET <endpoint>/<admin-API>/pools/status?pool=http://server{1...4}/disk{1...4}
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle StatusPool");
return Err(s3_error!(NotImplemented));
let Some(endpoints) = GLOBAL_Endpoints.get() else {
return Err(s3_error!(NotImplemented));
};
if endpoints.legacy() {
return Err(s3_error!(NotImplemented));
}
let query = {
if let Some(query) = req.uri.query() {
let input: StatusPoolQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidRequest, "get body failed"))?;
input
} else {
StatusPoolQuery::default()
}
};
let is_byid = query.by_id.as_str() == "true";
let has_idx = {
if is_byid {
let a = query.pool.parse::<usize>().unwrap_or_default();
if a < endpoints.as_ref().len() {
Some(a)
} else {
None
}
} else {
endpoints.get_pool_idx(&query.pool)
}
};
let Some(idx) = has_idx else {
warn!("specified pool {} not found, please specify a valid pool", &query.pool);
return Err(s3_error!(InvalidArgument));
};
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
let pools_status = store.status(idx).await.map_err(to_s3_error)?;
let output = serde_json::to_string(&pools_status)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}
@@ -273,6 +361,7 @@ pub struct StartDecommission {}
#[async_trait::async_trait]
impl Operation for StartDecommission {
// POST <endpoint>/<admin-API>/pools/decommission?pool=http://server{1...4}/disk{1...4}
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle StartDecommission");
@@ -284,9 +373,58 @@ pub struct CancelDecommission {}
#[async_trait::async_trait]
impl Operation for CancelDecommission {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
// POST <endpoint>/<admin-API>/pools/cancel?pool=http://server{1...4}/disk{1...4}
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle CancelDecommission");
let Some(endpoints) = GLOBAL_Endpoints.get() else {
return Err(s3_error!(NotImplemented));
};
if endpoints.legacy() {
return Err(s3_error!(NotImplemented));
}
let query = {
if let Some(query) = req.uri.query() {
let input: StatusPoolQuery =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidRequest, "get body failed"))?;
input
} else {
StatusPoolQuery::default()
}
};
let is_byid = query.by_id.as_str() == "true";
let has_idx = {
if is_byid {
let a = query.pool.parse::<usize>().unwrap_or_default();
if a < endpoints.as_ref().len() {
Some(a)
} else {
None
}
} else {
endpoints.get_pool_idx(&query.pool)
}
};
let Some(_idx) = has_idx else {
warn!("specified pool {} not found, please specify a valid pool", &query.pool);
return Err(s3_error!(InvalidArgument));
};
let layer = new_object_layer_fn();
let lock = layer.write().await;
let _store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
// FIXME:
// store.decommission_cancel(idx).await;
return Err(s3_error!(NotImplemented));
}
}

View File

@@ -14,7 +14,7 @@ use s3s::S3Request;
use s3s::S3Response;
use s3s::S3Result;
use crate::ADMIN_PREFIX;
use super::ADMIN_PREFIX;
pub struct S3Router<T> {
router: Router<T>,

View File

@@ -1,3 +1,4 @@
mod admin;
mod config;
mod grpc;
mod service;
@@ -84,7 +85,7 @@ async fn run(opt: config::Opt) -> Result<()> {
);
}
set_global_endpoints(endpoint_pools.as_ref().clone()).await;
set_global_endpoints(endpoint_pools.as_ref().clone()).map_err(|err| Error::from_string(err.to_string()))?;
update_erasure_type(setup_type).await;
// 初始化本地磁盘
@@ -114,7 +115,7 @@ async fn run(opt: config::Opt) -> Result<()> {
b.set_access(store.clone());
b.set_route(router::make_admin_route()?);
b.set_route(admin::make_admin_route()?);
// // Enable parsing virtual-hosted-style requests
// if let Some(dm) = opt.domain_name {

View File

@@ -61,6 +61,7 @@ pub fn to_s3_error(err: Error) -> S3Error {
StorageError::InsufficientWriteQuorum => {
s3_error!(SlowDown, "Storage resources are insufficient for the write operation")
}
StorageError::DecommissionNotStarted => s3_error!(InvalidArgument, "Decommission Not Started"),
};
}