Merge pull request #89 from rustfs/adm-api-1

feat: introduce admin-api
This commit is contained in:
loverustfs
2024-10-16 20:41:24 +08:00
committed by GitHub
14 changed files with 392 additions and 56 deletions

37
Cargo.lock generated
View File

@@ -17,6 +17,21 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "admin"
version = "0.0.1"
dependencies = [
"axum",
"ecstore",
"futures-util",
"hyper",
"mime",
"serde",
"serde_json",
"time",
"tower 0.4.13",
]
[[package]]
name = "ahash"
version = "0.7.8"
@@ -165,6 +180,8 @@ dependencies = [
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
@@ -173,10 +190,15 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper 1.0.1",
"tokio",
"tower 0.5.1",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -197,6 +219,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -1604,7 +1627,9 @@ dependencies = [
name = "rustfs"
version = "0.1.0"
dependencies = [
"admin",
"async-trait",
"axum",
"bytes",
"clap",
"common",
@@ -1810,6 +1835,16 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6"
dependencies = [
"itoa",
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -2198,8 +2233,10 @@ dependencies = [
"futures-util",
"pin-project-lite",
"sync_wrapper 0.1.2",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]

View File

@@ -7,6 +7,7 @@ members = [
"common/common",
"common/lock",
"common/protos",
"api/admin",
]
[workspace.package]
@@ -77,3 +78,4 @@ uuid = { version = "1.10.0", features = [
"macro-diagnostics",
] }
log = "0.4.22"
axum = "0.7.7"

18
api/admin/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "admin"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
axum.workspace = true
mime.workspace = true
serde.workspace = true
serde_json.workspace = true
ecstore = { path = "../../ecstore" }
time = { workspace = true, features = ["serde"] }
tower.workspace = true
futures-util = "0.3.31"
hyper.workspace = true

98
api/admin/src/error.rs Normal file
View File

@@ -0,0 +1,98 @@
use axum::{
body::Body,
http::{header::CONTENT_TYPE, HeaderValue, StatusCode},
response::{IntoResponse, Response},
};
use mime::APPLICATION_JSON;
use serde::Serialize;
#[derive(Serialize, Default)]
#[serde(rename_all = "PascalCase")]
pub struct ErrorResponse {
pub code: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bucket_name: Option<String>,
pub resource: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
pub host_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub actual_object_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub range_requested: Option<String>,
}
impl IntoResponse for APIError {
fn into_response(self) -> Response {
let code = self.http_status_code;
let err_response = ErrorResponse::from(self);
let json_res = match serde_json::to_vec(&err_response) {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response(),
};
Response::builder()
.status(code)
.header(CONTENT_TYPE, HeaderValue::from_static(APPLICATION_JSON.as_ref()))
.body(Body::from(json_res))
.unwrap_or_else(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response())
}
}
#[derive(Default)]
pub struct APIError {
code: String,
description: String,
http_status_code: StatusCode,
object_size: Option<String>,
range_requested: Option<String>,
}
pub enum ErrorCode {
ErrNotImplemented,
ErrServerNotInitialized,
}
impl IntoResponse for ErrorCode {
fn into_response(self) -> Response {
APIError::from(self).into_response()
}
}
impl From<ErrorCode> for APIError {
fn from(value: ErrorCode) -> Self {
use ErrorCode::*;
match value {
ErrNotImplemented => APIError {
code: "NotImplemented".into(),
description: "A header you provided implies functionality that is not implemented.".into(),
http_status_code: StatusCode::NOT_IMPLEMENTED,
..Default::default()
},
ErrServerNotInitialized => APIError {
code: "ServerNotInitialized".into(),
description: "Server not initialized yet, please try again.".into(),
http_status_code: StatusCode::SERVICE_UNAVAILABLE,
..Default::default()
},
}
}
}
impl From<APIError> for ErrorResponse {
fn from(value: APIError) -> Self {
Self {
code: value.code,
message: value.description,
actual_object_size: value.object_size,
range_requested: value.range_requested,
..Default::default()
}
}
}

View File

@@ -0,0 +1 @@
pub mod list_pools;

View File

@@ -0,0 +1,79 @@
use crate::Result as LocalResult;
use crate::{error::ErrorCode, object_api::ObjectApi};
use axum::{extract::State, Json};
use serde::Serialize;
use time::OffsetDateTime;
#[derive(Serialize)]
pub struct PoolStatus {
id: i64,
cmdline: String,
#[serde(rename = "lastUpdate")]
#[serde(serialize_with = "time::serde::rfc3339::serialize")]
last_updat: OffsetDateTime,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "decommissionInfo")]
decommission_info: Option<PoolDecommissionInfo>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct PoolDecommissionInfo {
#[serde(serialize_with = "time::serde::rfc3339::serialize")]
start_time: OffsetDateTime,
start_size: i64,
total_size: i64,
current_size: i64,
complete: bool,
failed: bool,
canceled: bool,
#[serde(rename = "objectsDecommissioned")]
items_decommissioned: i64,
#[serde(rename = "objectsDecommissionedFailed")]
items_decommission_failed: i64,
#[serde(rename = "bytesDecommissioned")]
bytes_done: i64,
#[serde(rename = "bytesDecommissionedFailed")]
bytes_failed: i64,
}
pub async fn handler(State(ec_store): State<ObjectApi>) -> LocalResult<Json<Vec<PoolStatus>>> {
// if ecstore::is_legacy().await {
// return Err(ErrorCode::ErrNotImplemented);
// }
let pools = (*ec_store).as_ref().ok_or(ErrorCode::ErrNotImplemented)?;
// todo, 调用pool.status()接口获取每个池的数据
//
let mut result = Vec::new();
for (idx, _pool) in pools.pools.iter().enumerate() {
// 这里mock一下数据
result.push(PoolStatus {
id: idx as _,
cmdline: "cmdline".into(),
last_updat: OffsetDateTime::now_utc(),
decommission_info: if idx % 2 == 0 {
Some(PoolDecommissionInfo {
start_time: OffsetDateTime::now_utc(),
start_size: 1,
total_size: 2,
current_size: 2,
complete: true,
failed: true,
canceled: true,
items_decommissioned: 1,
items_decommission_failed: 1,
bytes_done: 1,
bytes_failed: 1,
})
} else {
None
},
})
}
Ok(Json(result))
}

21
api/admin/src/lib.rs Normal file
View File

@@ -0,0 +1,21 @@
pub mod error;
pub mod handlers;
pub mod object_api;
use axum::{extract::Request, response::Response, routing::get, BoxError, Router};
use ecstore::store::ECStore;
use error::ErrorCode;
use handlers::list_pools;
use object_api::ObjectApi;
use tower::Service;
pub type Result<T> = std::result::Result<T, ErrorCode>;
pub fn register_admin_router(
ec_store: Option<ECStore>,
) -> impl Service<Request, Response = Response, Error: Into<BoxError>, Future: Send> + Clone {
Router::new()
.nest("/admin/v3", Router::new().route("/pools/list", get(list_pools::handler)))
.with_state::<()>(ObjectApi::new(ec_store))
.into_service()
}

View File

View File

@@ -0,0 +1,20 @@
use std::ops::Deref;
use ecstore::store::ECStore;
#[derive(Clone)]
pub struct ObjectApi(Option<ECStore>);
impl Deref for ObjectApi {
type Target = Option<ECStore>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl ObjectApi {
pub fn new(t: Option<ECStore>) -> Self {
Self(t)
}
}

View File

@@ -58,4 +58,10 @@ pub async fn update_erasure_type(setup_type: SetupType) {
*is_erasure_sd = setup_type == SetupType::ErasureSD;
}
pub async fn is_legacy() -> bool {
let lock = GLOBAL_Endpoints.read().await;
let endpoints = lock.as_ref();
endpoints.len() == 1 && endpoints[0].legacy
}
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;

View File

@@ -19,6 +19,7 @@ mod utils;
pub mod bucket;
pub use global::is_legacy;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;
pub use global::update_erasure_type;

View File

@@ -49,6 +49,8 @@ tracing-error.workspace = true
tracing-subscriber.workspace = true
transform-stream.workspace = true
uuid = "1.10.0"
admin = { path = "../api/admin" }
axum.workspace = true
[build-dependencies]
prost-build.workspace = true
@@ -67,7 +69,6 @@ hyper-util = { version = "0.1.9", features = [
"server-auto",
"server-graceful",
] }
mime = "0.3.17"
transform-stream = "0.3.0"
netif = "0.1.6"
# pin-utils = "0.1.0"

View File

@@ -147,55 +147,59 @@ async fn run(opt: config::Opt) -> Result<()> {
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
tokio::spawn(async move {
let hyper_service = service.into_shared();
let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service));
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
info!("server is running at http://{local_addr}");
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
});
// init store
let store = ECStore::new(opt.address.clone(), endpoint_pools.clone())
.await
.map_err(|err| Error::from_string(err.to_string()))?;
info!(" init store success!");
tokio::spawn({
let store = store.clone();
async move {
let hyper_service = service.into_shared();
let adm_service = admin::register_admin_router(Some(store));
let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service, adm_service));
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
info!("server is running at http://{local_addr}");
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
}
});
let buckets_list = store
.list_bucket(&BucketOptions {
no_metadata: true,

View File

@@ -1,6 +1,7 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use axum::body::Body;
use futures::Future;
use http_body::Frame;
use hyper::body::Incoming;
@@ -11,35 +12,52 @@ use tower::Service;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Generate a [`HybridService`]
pub(crate) fn hybrid<MakeRest, Grpc>(make_rest: MakeRest, grpc: Grpc) -> HybridService<MakeRest, Grpc> {
HybridService { rest: make_rest, grpc }
pub(crate) fn hybrid<MakeRest, Grpc, Admin>(
make_rest: MakeRest,
grpc: Grpc,
admin: Admin,
) -> HybridService<MakeRest, Grpc, Admin> {
HybridService {
rest: make_rest,
grpc,
admin,
}
}
/// The service that can serve both gRPC and REST HTTP Requests
#[derive(Clone)]
pub struct HybridService<Rest, Grpc> {
pub struct HybridService<Rest, Grpc, Admin> {
rest: Rest,
grpc: Grpc,
admin: Admin,
}
impl<Rest, Grpc, RestBody, GrpcBody> Service<Request<Incoming>> for HybridService<Rest, Grpc>
impl<Rest, Grpc, Admin, RestBody, GrpcBody> Service<Request<Incoming>> for HybridService<Rest, Grpc, Admin>
where
Rest: Service<Request<Incoming>, Response = Response<RestBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Admin: Service<Request<Body>, Response = Response<Body>>,
Rest::Error: Into<BoxError>,
Grpc::Error: Into<BoxError>,
Admin::Error: Into<BoxError>,
{
type Response = Response<HybridBody<RestBody, GrpcBody>>;
type Response = Response<HybridBody<RestBody, GrpcBody, Body>>;
type Error = BoxError;
type Future = HybridFuture<Rest::Future, Grpc::Future>;
type Future = HybridFuture<Rest::Future, Grpc::Future, Admin::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.rest.poll_ready(cx) {
Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Ok(())) => match self.admin.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
}
@@ -53,6 +71,14 @@ where
(hyper::Version::HTTP_2, Some(hv)) if hv.as_bytes().starts_with(b"application/grpc") => HybridFuture::Grpc {
grpc_future: self.grpc.call(req),
},
_ if req.uri().path().starts_with("/admin/v3") => HybridFuture::Admin {
admin_future: self.admin.call({
let (parts, body) = req.into_parts();
Request::from_parts(parts, Body::new(body).into())
}),
},
_ => HybridFuture::Rest {
rest_future: self.rest.call(req),
},
@@ -64,7 +90,7 @@ pin_project! {
/// A hybrid HTTP body that will be used in the response type for the
/// [`HybridFuture`], i.e., the output of the [`HybridService`]
#[project = HybridBodyProj]
pub enum HybridBody<RestBody, GrpcBody> {
pub enum HybridBody<RestBody, GrpcBody, AdminBody> {
Rest {
#[pin]
rest_body: RestBody
@@ -73,15 +99,21 @@ pin_project! {
#[pin]
grpc_body: GrpcBody
},
Admin {
#[pin]
admin_body: AdminBody
},
}
}
impl<RestBody, GrpcBody> http_body::Body for HybridBody<RestBody, GrpcBody>
impl<RestBody, GrpcBody, AdminBody> http_body::Body for HybridBody<RestBody, GrpcBody, AdminBody>
where
RestBody: http_body::Body + Send + Unpin,
GrpcBody: http_body::Body<Data = RestBody::Data> + Send + Unpin,
AdminBody: http_body::Body<Data = RestBody::Data> + Send + Unpin,
RestBody::Error: Into<BoxError>,
GrpcBody::Error: Into<BoxError>,
AdminBody::Error: Into<BoxError>,
{
type Data = RestBody::Data;
type Error = BoxError;
@@ -90,6 +122,7 @@ where
match self {
Self::Rest { rest_body } => rest_body.is_end_stream(),
Self::Grpc { grpc_body } => grpc_body.is_end_stream(),
Self::Admin { admin_body } => admin_body.is_end_stream(),
}
}
@@ -97,6 +130,7 @@ where
match self.project() {
HybridBodyProj::Rest { rest_body } => rest_body.poll_frame(cx).map_err(Into::into),
HybridBodyProj::Grpc { grpc_body } => grpc_body.poll_frame(cx).map_err(Into::into),
HybridBodyProj::Admin { admin_body } => admin_body.poll_frame(cx).map_err(Into::into),
}
}
@@ -104,6 +138,7 @@ where
match self {
Self::Rest { rest_body } => rest_body.size_hint(),
Self::Grpc { grpc_body } => grpc_body.size_hint(),
Self::Admin { admin_body } => admin_body.size_hint(),
}
}
}
@@ -112,7 +147,7 @@ pin_project! {
/// A future that accepts an HTTP request as input and returns an HTTP
/// response as output for the [`HybridService`]
#[project = HybridFutureProj]
pub enum HybridFuture<RestFuture, GrpcFuture> {
pub enum HybridFuture<RestFuture, GrpcFuture, AdminFuture> {
Rest {
#[pin]
rest_future: RestFuture,
@@ -120,18 +155,26 @@ pin_project! {
Grpc {
#[pin]
grpc_future: GrpcFuture,
},
Admin {
#[pin]
admin_future: AdminFuture,
}
}
}
impl<RestFuture, GrpcFuture, RestBody, GrpcBody, RestError, GrpcError> Future for HybridFuture<RestFuture, GrpcFuture>
impl<RestFuture, GrpcFuture, AdminFuture, RestBody, GrpcBody, AdminBody, RestError, GrpcError, AdminError> Future
for HybridFuture<RestFuture, GrpcFuture, AdminFuture>
where
RestFuture: Future<Output = Result<Response<RestBody>, RestError>>,
GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
AdminFuture: Future<Output = Result<Response<AdminBody>, AdminError>>,
RestError: Into<BoxError>,
GrpcError: Into<BoxError>,
AdminError: Into<BoxError>,
{
type Output = Result<Response<HybridBody<RestBody, GrpcBody>>, BoxError>;
type Output = Result<Response<HybridBody<RestBody, GrpcBody, AdminBody>>, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
@@ -145,6 +188,11 @@ where
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
HybridFutureProj::Admin { admin_future } => match admin_future.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|admin_body| HybridBody::Admin { admin_body }))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
}
}
}