console web server and the s3 api share the same port (#163)

* merge console router

* make code happy

* Scanner (#156)

* feat: integrate CancellationToken for unified background services management

- Consolidate data scanner and auto heal cancellation tokens into single unified token
- Move GLOBAL_BACKGROUND_SERVICES_CANCEL_TOKEN to global.rs for centralized management
- Add graceful shutdown support to MRF heal routine with MinIO-compatible logic
- Implement heal_routine_with_cancel method preserving original healing logic
- Update main.rs to use unified background services shutdown mechanism
- Enhance error handling with proper ecstore Result types
- Fix clippy warnings for needless return statements
- Maintain backward compatibility while adding modern cancellation support

This change provides a cleaner architecture for background service lifecycle management
and ensures all healing services can be gracefully shut down through a single token.

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix: Refact heal and scanner design

Signed-off-by: junxiang Mu <1948535941@qq.com>

* refact: step 2

Signed-off-by: junxiang Mu <1948535941@qq.com>

* feat: refactor scanner module and add data usage statistics

- Move scanner code to scanner/ subdirectory for better organization
- Add data usage statistics collection and persistence
- Implement histogram support for size and version distribution
- Add global cancel token management for scanner operations
- Integrate scanner with ECStore for comprehensive data analysis
- Update error handling and improve test isolation
- Add data usage API endpoints and backend integration

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Chore: fix ref and fix comment

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix: fix clippy

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: dandan <dandan@dandandeMac-Studio.local>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com>
Co-authored-by: dandan <dandan@dandandeMac-Studio.local>
This commit is contained in:
weisd
2025-07-10 23:31:42 +08:00
committed by GitHub
parent 2832f0e089
commit 84f5a4cb48
11 changed files with 989 additions and 919 deletions

401
rustfs/src/admin/console.rs Normal file
View File

@@ -0,0 +1,401 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// use crate::license::get_license;
use axum::{
// Router,
body::Body,
http::{Response, StatusCode},
response::IntoResponse,
// routing::get,
};
// use axum_extra::extract::Host;
// use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
// use rustfs_utils::net::parse_and_resolve_address;
// use std::io;
use http::Uri;
// use axum::response::Redirect;
// use axum_server::tls_rustls::RustlsConfig;
// use http::{HeaderMap, HeaderName, Uri, header};
use mime_guess::from_path;
use rust_embed::RustEmbed;
// use serde::Serialize;
// use shadow_rs::shadow;
// use std::net::{IpAddr, SocketAddr};
// use std::sync::OnceLock;
// use std::time::Duration;
// use tokio::signal;
// use tower_http::cors::{Any, CorsLayer};
// use tower_http::trace::TraceLayer;
// use tracing::{debug, error, info, instrument};
// shadow!(build);
// const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3";
#[derive(RustEmbed)]
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct StaticFiles;
/// Static file handler
pub(crate) async fn static_handler(uri: Uri) -> impl IntoResponse {
let mut path = uri.path().trim_start_matches('/');
if path.is_empty() {
path = "index.html"
}
if let Some(file) = StaticFiles::get(path) {
let mime_type = from_path(path).first_or_octet_stream();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", mime_type.to_string())
.body(Body::from(file.data))
.unwrap()
} else if let Some(file) = StaticFiles::get("index.html") {
let mime_type = from_path("index.html").first_or_octet_stream();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", mime_type.to_string())
.body(Body::from(file.data))
.unwrap()
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("404 Not Found"))
.unwrap()
}
}
// #[derive(Debug, Serialize, Clone)]
// pub(crate) struct Config {
// #[serde(skip)]
// port: u16,
// api: Api,
// s3: S3,
// release: Release,
// license: License,
// doc: String,
// }
// impl Config {
// fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self {
// Config {
// port,
// api: Api {
// base_url: format!("http://{local_ip}:{port}/{RUSTFS_ADMIN_PREFIX}"),
// },
// s3: S3 {
// endpoint: format!("http://{local_ip}:{port}"),
// region: "cn-east-1".to_owned(),
// },
// release: Release {
// version: version.to_string(),
// date: date.to_string(),
// },
// license: License {
// name: "Apache-2.0".to_string(),
// url: "https://www.apache.org/licenses/LICENSE-2.0".to_string(),
// },
// doc: "https://rustfs.com/docs/".to_string(),
// }
// }
// fn to_json(&self) -> String {
// serde_json::to_string(self).unwrap_or_default()
// }
// pub(crate) fn version_info(&self) -> String {
// format!(
// "RELEASE.{}@{} (rust {} {})",
// self.release.date.clone(),
// self.release.version.clone().trim_start_matches('@'),
// build::RUST_VERSION,
// build::BUILD_TARGET
// )
// }
// pub(crate) fn version(&self) -> String {
// self.release.version.clone()
// }
// pub(crate) fn license(&self) -> String {
// format!("{} {}", self.license.name.clone(), self.license.url.clone())
// }
// pub(crate) fn doc(&self) -> String {
// self.doc.clone()
// }
// }
// #[derive(Debug, Serialize, Clone)]
// struct Api {
// #[serde(rename = "baseURL")]
// base_url: String,
// }
// #[derive(Debug, Serialize, Clone)]
// struct S3 {
// endpoint: String,
// region: String,
// }
// #[derive(Debug, Serialize, Clone)]
// struct Release {
// version: String,
// date: String,
// }
// #[derive(Debug, Serialize, Clone)]
// struct License {
// name: String,
// url: String,
// }
// pub(crate) static CONSOLE_CONFIG: OnceLock<Config> = OnceLock::new();
// #[allow(clippy::const_is_empty)]
// pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) {
// CONSOLE_CONFIG.get_or_init(|| {
// let ver = {
// if !build::TAG.is_empty() {
// build::TAG.to_string()
// } else if !build::SHORT_COMMIT.is_empty() {
// format!("@{}", build::SHORT_COMMIT)
// } else {
// build::PKG_VERSION.to_string()
// }
// };
// Config::new(local_ip, port, ver.as_str(), build::COMMIT_DATE_3339)
// });
// }
// // fn is_socket_addr_or_ip_addr(host: &str) -> bool {
// // host.parse::<SocketAddr>().is_ok() || host.parse::<IpAddr>().is_ok()
// // }
// #[allow(dead_code)]
// async fn license_handler() -> impl IntoResponse {
// let license = get_license().unwrap_or_default();
// Response::builder()
// .header("content-type", "application/json")
// .status(StatusCode::OK)
// .body(Body::from(serde_json::to_string(&license).unwrap_or_default()))
// .unwrap()
// }
// fn _is_private_ip(ip: IpAddr) -> bool {
// match ip {
// IpAddr::V4(ip) => {
// let octets = ip.octets();
// // 10.0.0.0/8
// octets[0] == 10 ||
// // 172.16.0.0/12
// (octets[0] == 172 && (octets[1] >= 16 && octets[1] <= 31)) ||
// // 192.168.0.0/16
// (octets[0] == 192 && octets[1] == 168)
// }
// IpAddr::V6(_) => false,
// }
// }
// #[allow(clippy::const_is_empty)]
// #[allow(dead_code)]
// #[instrument(fields(host))]
// async fn config_handler(uri: Uri, Host(host): Host, headers: HeaderMap) -> impl IntoResponse {
// // Get the scheme from the headers or use the URI scheme
// let scheme = headers
// .get(HeaderName::from_static("x-forwarded-proto"))
// .and_then(|value| value.to_str().ok())
// .unwrap_or_else(|| uri.scheme().map(|s| s.as_str()).unwrap_or("http"));
// // Print logs for debugging
// info!("Scheme: {}, ", scheme);
// // Get the host from the uri and use the value of the host extractor if it doesn't have one
// let host = uri.host().unwrap_or(host.as_str());
// let host = if let Ok(socket_addr) = host.parse::<SocketAddr>() {
// // Successfully parsed, it's in IP:Port format.
// // For IPv6, we need to enclose it in brackets to form a valid URL.
// let ip = socket_addr.ip();
// if ip.is_ipv6() { format!("[{ip}]") } else { format!("{ip}") }
// } else {
// // Failed to parse, it might be a domain name or a bare IP, use it as is.
// host.to_string()
// };
// // Make a copy of the current configuration
// let mut cfg = match CONSOLE_CONFIG.get() {
// Some(cfg) => cfg.clone(),
// None => {
// error!("Console configuration not initialized");
// return Response::builder()
// .status(StatusCode::INTERNAL_SERVER_ERROR)
// .body(Body::from("Console configuration not initialized"))
// .unwrap();
// }
// };
// let url = format!("{}://{}:{}", scheme, host, cfg.port);
// cfg.api.base_url = format!("{url}{RUSTFS_ADMIN_PREFIX}");
// cfg.s3.endpoint = url;
// Response::builder()
// .header("content-type", "application/json")
// .status(StatusCode::OK)
// .body(Body::from(cfg.to_json()))
// .unwrap()
// }
// pub fn register_router() -> Router {
// Router::new()
// // .route("/license", get(license_handler))
// // .route("/config.json", get(config_handler))
// .fallback_service(get(static_handler))
// }
// #[allow(dead_code)]
// pub async fn start_static_file_server(
// addrs: &str,
// local_ip: IpAddr,
// access_key: &str,
// secret_key: &str,
// tls_path: Option<String>,
// ) {
// // Configure CORS
// let cors = CorsLayer::new()
// .allow_origin(Any) // In the production environment, we recommend that you specify a specific domain name
// .allow_methods([http::Method::GET, http::Method::POST])
// .allow_headers([header::CONTENT_TYPE]);
// // Create a route
// let app = register_router()
// .layer(cors)
// .layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true))
// .layer(TraceLayer::new_for_http());
// let server_addr = parse_and_resolve_address(addrs).expect("Failed to parse socket address");
// let server_port = server_addr.port();
// let server_address = server_addr.to_string();
// info!(
// "WebUI: http://{}:{} http://127.0.0.1:{} http://{}",
// local_ip, server_port, server_port, server_address
// );
// info!(" RootUser: {}", access_key);
// info!(" RootPass: {}", secret_key);
// // Check and start the HTTPS/HTTP server
// match start_server(server_addr, tls_path, app.clone()).await {
// Ok(_) => info!("Server shutdown gracefully"),
// Err(e) => error!("Server error: {}", e),
// }
// }
// async fn start_server(server_addr: SocketAddr, tls_path: Option<String>, app: Router) -> io::Result<()> {
// let tls_path = tls_path.unwrap_or_default();
// let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
// let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
// let handle = axum_server::Handle::new();
// // create a signal off listening task
// let handle_clone = handle.clone();
// tokio::spawn(async move {
// shutdown_signal().await;
// info!("Initiating graceful shutdown...");
// handle_clone.graceful_shutdown(Some(Duration::from_secs(10)));
// });
// let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok();
// info!("Console TLS certs: {:?}", has_tls_certs);
// if has_tls_certs {
// info!("Found TLS certificates, starting with HTTPS");
// match RustlsConfig::from_pem_file(cert_path, key_path).await {
// Ok(config) => {
// info!("Starting HTTPS server...");
// axum_server::bind_rustls(server_addr, config)
// .handle(handle.clone())
// .serve(app.into_make_service())
// .await
// .map_err(io::Error::other)?;
// info!("HTTPS server running on https://{}", server_addr);
// Ok(())
// }
// Err(e) => {
// error!("Failed to create TLS config: {}", e);
// start_http_server(server_addr, app, handle).await
// }
// }
// } else {
// info!("TLS certificates not found at {} and {}", key_path, cert_path);
// start_http_server(server_addr, app, handle).await
// }
// }
// #[allow(dead_code)]
// /// 308 redirect for HTTP to HTTPS
// fn redirect_to_https(https_port: u16) -> Router {
// Router::new().route(
// "/*path",
// get({
// move |uri: Uri, req: http::Request<Body>| async move {
// let host = req
// .headers()
// .get("host")
// .map_or("localhost", |h| h.to_str().unwrap_or("localhost"));
// let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("");
// let https_url = format!("https://{host}:{https_port}{path}");
// Redirect::permanent(&https_url)
// }
// }),
// )
// }
// async fn start_http_server(addr: SocketAddr, app: Router, handle: axum_server::Handle) -> io::Result<()> {
// debug!("Starting HTTP server...");
// axum_server::bind(addr)
// .handle(handle)
// .serve(app.into_make_service())
// .await
// .map_err(io::Error::other)
// }
// async fn shutdown_signal() {
// let ctrl_c = async {
// signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
// };
// #[cfg(unix)]
// let terminate = async {
// signal::unix::signal(signal::unix::SignalKind::terminate())
// .expect("failed to install signal handler")
// .recv()
// .await;
// };
// #[cfg(not(unix))]
// let terminate = std::future::pending::<()>();
// tokio::select! {
// _ = ctrl_c => {
// info!("shutdown_signal ctrl_c")
// },
// _ = terminate => {
// info!("shutdown_signal terminate")
// },
// }
// }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod console;
pub mod handlers;
pub mod router;
mod rpc;
@@ -32,8 +33,8 @@ use s3s::route::S3Route;
const ADMIN_PREFIX: &str = "/rustfs/admin";
pub fn make_admin_route() -> std::io::Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new();
pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new(console_enabled);
// 1
r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::routing::get;
use hyper::HeaderMap;
use hyper::Method;
use hyper::StatusCode;
@@ -27,20 +28,41 @@ use s3s::S3Result;
use s3s::header;
use s3s::route::S3Route;
use s3s::s3_error;
use tower::Service;
use tracing::error;
use super::ADMIN_PREFIX;
use super::rpc::RPC_PREFIX;
use crate::admin::ADMIN_PREFIX;
use crate::admin::console;
use crate::admin::rpc::RPC_PREFIX;
const CONSOLE_PREFIX: &str = "/rustfs/console";
pub struct S3Router<T> {
router: Router<T>,
console_enabled: bool,
console_router: Option<axum::routing::RouterIntoService<Body>>,
}
impl<T: Operation> S3Router<T> {
pub fn new() -> Self {
pub fn new(console_enabled: bool) -> Self {
let router = Router::new();
Self { router }
let console_router = if console_enabled {
Some(
axum::Router::new()
.nest(CONSOLE_PREFIX, axum::Router::new().fallback_service(get(console::static_handler)))
.fallback_service(get(console::static_handler))
.into_service::<Body>(),
)
} else {
None
};
Self {
router,
console_enabled,
console_router,
}
}
pub fn insert(&mut self, method: Method, path: &str, operation: T) -> std::io::Result<()> {
@@ -60,7 +82,7 @@ impl<T: Operation> S3Router<T> {
impl<T: Operation> Default for S3Router<T> {
fn default() -> Self {
Self::new()
Self::new(false)
}
}
@@ -79,10 +101,23 @@ where
}
}
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX)
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(CONSOLE_PREFIX)
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
if let Some(console_router) = &self.console_router {
let mut console_router = console_router.clone();
let req = convert_request(req);
let result = console_router.call(req).await;
return match result {
Ok(resp) => Ok(convert_response(resp)),
Err(e) => Err(s3_error!(InternalError, "{}", e)),
};
}
return Err(s3_error!(InternalError, "console is not enabled"));
}
let uri = format!("{}|{}", &req.method, req.uri.path());
// warn!("get uri {}", &uri);
@@ -99,6 +134,10 @@ where
// check_access before call
async fn check_access(&self, req: &mut S3Request<Body>) -> S3Result<()> {
if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) {
return Ok(());
}
// Check RPC signature verification
if req.uri.path().starts_with(RPC_PREFIX) {
// Skip signature verification for HEAD requests (health checks)
@@ -134,3 +173,34 @@ impl Operation for AdminOperation {
self.0.call(req, params).await
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct Extra {
pub credentials: Option<s3s::auth::Credentials>,
pub region: Option<String>,
pub service: Option<String>,
}
fn convert_request(req: S3Request<Body>) -> http::Request<Body> {
let (mut parts, _) = http::Request::new(Body::empty()).into_parts();
parts.method = req.method;
parts.uri = req.uri;
parts.headers = req.headers;
parts.extensions = req.extensions;
parts.extensions.insert(Extra {
credentials: req.credentials,
region: req.region,
service: req.service,
});
http::Request::from_parts(parts, req.input)
}
fn convert_response(resp: http::Response<axum::body::Body>) -> S3Response<Body> {
let (parts, body) = resp.into_parts();
let mut s3_resp = S3Response::new(Body::http_body_unsync(body));
s3_resp.status = Some(parts.status);
s3_resp.headers = parts.headers;
s3_resp.extensions = parts.extensions;
s3_resp
}

View File

@@ -1,391 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::license::get_license;
use axum::{
Router,
body::Body,
http::{Response, StatusCode},
response::IntoResponse,
routing::get,
};
use axum_extra::extract::Host;
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_utils::net::parse_and_resolve_address;
use std::io;
use axum::response::Redirect;
use axum_server::tls_rustls::RustlsConfig;
use http::{HeaderMap, HeaderName, Uri, header};
use mime_guess::from_path;
use rust_embed::RustEmbed;
use serde::Serialize;
use shadow_rs::shadow;
use std::net::{IpAddr, SocketAddr};
use std::sync::OnceLock;
use std::time::Duration;
use tokio::signal;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, instrument};
shadow!(build);
const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3";
#[derive(RustEmbed)]
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct StaticFiles;
/// Static file handler
async fn static_handler(uri: Uri) -> impl IntoResponse {
let mut path = uri.path().trim_start_matches('/');
if path.is_empty() {
path = "index.html"
}
if let Some(file) = StaticFiles::get(path) {
let mime_type = from_path(path).first_or_octet_stream();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", mime_type.to_string())
.body(Body::from(file.data))
.unwrap()
} else if let Some(file) = StaticFiles::get("index.html") {
let mime_type = from_path("index.html").first_or_octet_stream();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", mime_type.to_string())
.body(Body::from(file.data))
.unwrap()
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("404 Not Found"))
.unwrap()
}
}
#[derive(Debug, Serialize, Clone)]
pub(crate) struct Config {
#[serde(skip)]
port: u16,
api: Api,
s3: S3,
release: Release,
license: License,
doc: String,
}
impl Config {
fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self {
Config {
port,
api: Api {
base_url: format!("http://{local_ip}:{port}/{RUSTFS_ADMIN_PREFIX}"),
},
s3: S3 {
endpoint: format!("http://{local_ip}:{port}"),
region: "cn-east-1".to_owned(),
},
release: Release {
version: version.to_string(),
date: date.to_string(),
},
license: License {
name: "Apache-2.0".to_string(),
url: "https://www.apache.org/licenses/LICENSE-2.0".to_string(),
},
doc: "https://rustfs.com/docs/".to_string(),
}
}
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_default()
}
pub(crate) fn version_info(&self) -> String {
format!(
"RELEASE.{}@{} (rust {} {})",
self.release.date.clone(),
self.release.version.clone().trim_start_matches('@'),
build::RUST_VERSION,
build::BUILD_TARGET
)
}
pub(crate) fn version(&self) -> String {
self.release.version.clone()
}
pub(crate) fn license(&self) -> String {
format!("{} {}", self.license.name.clone(), self.license.url.clone())
}
pub(crate) fn doc(&self) -> String {
self.doc.clone()
}
}
#[derive(Debug, Serialize, Clone)]
struct Api {
#[serde(rename = "baseURL")]
base_url: String,
}
#[derive(Debug, Serialize, Clone)]
struct S3 {
endpoint: String,
region: String,
}
#[derive(Debug, Serialize, Clone)]
struct Release {
version: String,
date: String,
}
#[derive(Debug, Serialize, Clone)]
struct License {
name: String,
url: String,
}
pub(crate) static CONSOLE_CONFIG: OnceLock<Config> = OnceLock::new();
#[allow(clippy::const_is_empty)]
pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) {
CONSOLE_CONFIG.get_or_init(|| {
let ver = {
if !build::TAG.is_empty() {
build::TAG.to_string()
} else if !build::SHORT_COMMIT.is_empty() {
format!("@{}", build::SHORT_COMMIT)
} else {
build::PKG_VERSION.to_string()
}
};
Config::new(local_ip, port, ver.as_str(), build::COMMIT_DATE_3339)
});
}
// fn is_socket_addr_or_ip_addr(host: &str) -> bool {
// host.parse::<SocketAddr>().is_ok() || host.parse::<IpAddr>().is_ok()
// }
async fn license_handler() -> impl IntoResponse {
let license = get_license().unwrap_or_default();
Response::builder()
.header("content-type", "application/json")
.status(StatusCode::OK)
.body(Body::from(serde_json::to_string(&license).unwrap_or_default()))
.unwrap()
}
fn _is_private_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(ip) => {
let octets = ip.octets();
// 10.0.0.0/8
octets[0] == 10 ||
// 172.16.0.0/12
(octets[0] == 172 && (octets[1] >= 16 && octets[1] <= 31)) ||
// 192.168.0.0/16
(octets[0] == 192 && octets[1] == 168)
}
IpAddr::V6(_) => false,
}
}
#[allow(clippy::const_is_empty)]
#[instrument(fields(host))]
async fn config_handler(uri: Uri, Host(host): Host, headers: HeaderMap) -> impl IntoResponse {
// Get the scheme from the headers or use the URI scheme
let scheme = headers
.get(HeaderName::from_static("x-forwarded-proto"))
.and_then(|value| value.to_str().ok())
.unwrap_or_else(|| uri.scheme().map(|s| s.as_str()).unwrap_or("http"));
// Print logs for debugging
info!("Scheme: {}, ", scheme);
// Get the host from the uri and use the value of the host extractor if it doesn't have one
let host = uri.host().unwrap_or(host.as_str());
let host = if let Ok(socket_addr) = host.parse::<SocketAddr>() {
// Successfully parsed, it's in IP:Port format.
// For IPv6, we need to enclose it in brackets to form a valid URL.
let ip = socket_addr.ip();
if ip.is_ipv6() { format!("[{ip}]") } else { format!("{ip}") }
} else {
// Failed to parse, it might be a domain name or a bare IP, use it as is.
host.to_string()
};
// Make a copy of the current configuration
let mut cfg = match CONSOLE_CONFIG.get() {
Some(cfg) => cfg.clone(),
None => {
error!("Console configuration not initialized");
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from("Console configuration not initialized"))
.unwrap();
}
};
let url = format!("{}://{}:{}", scheme, host, cfg.port);
cfg.api.base_url = format!("{url}{RUSTFS_ADMIN_PREFIX}");
cfg.s3.endpoint = url;
Response::builder()
.header("content-type", "application/json")
.status(StatusCode::OK)
.body(Body::from(cfg.to_json()))
.unwrap()
}
pub async fn start_static_file_server(
addrs: &str,
local_ip: IpAddr,
access_key: &str,
secret_key: &str,
tls_path: Option<String>,
) {
// Configure CORS
let cors = CorsLayer::new()
.allow_origin(Any) // In the production environment, we recommend that you specify a specific domain name
.allow_methods([http::Method::GET, http::Method::POST])
.allow_headers([header::CONTENT_TYPE]);
// Create a route
let app = Router::new()
.route("/license", get(license_handler))
.route("/config.json", get(config_handler))
.fallback_service(get(static_handler))
.layer(cors)
.layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true))
.layer(TraceLayer::new_for_http());
let server_addr = parse_and_resolve_address(addrs).expect("Failed to parse socket address");
let server_port = server_addr.port();
let server_address = server_addr.to_string();
info!(
"WebUI: http://{}:{} http://127.0.0.1:{} http://{}",
local_ip, server_port, server_port, server_address
);
info!(" RootUser: {}", access_key);
info!(" RootPass: {}", secret_key);
// Check and start the HTTPS/HTTP server
match start_server(server_addr, tls_path, app.clone()).await {
Ok(_) => info!("Server shutdown gracefully"),
Err(e) => error!("Server error: {}", e),
}
}
async fn start_server(server_addr: SocketAddr, tls_path: Option<String>, app: Router) -> io::Result<()> {
let tls_path = tls_path.unwrap_or_default();
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
let handle = axum_server::Handle::new();
// create a signal off listening task
let handle_clone = handle.clone();
tokio::spawn(async move {
shutdown_signal().await;
info!("Initiating graceful shutdown...");
handle_clone.graceful_shutdown(Some(Duration::from_secs(10)));
});
let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok();
info!("Console TLS certs: {:?}", has_tls_certs);
if has_tls_certs {
info!("Found TLS certificates, starting with HTTPS");
match RustlsConfig::from_pem_file(cert_path, key_path).await {
Ok(config) => {
info!("Starting HTTPS server...");
axum_server::bind_rustls(server_addr, config)
.handle(handle.clone())
.serve(app.into_make_service())
.await
.map_err(io::Error::other)?;
info!("HTTPS server running on https://{}", server_addr);
Ok(())
}
Err(e) => {
error!("Failed to create TLS config: {}", e);
start_http_server(server_addr, app, handle).await
}
}
} else {
info!("TLS certificates not found at {} and {}", key_path, cert_path);
start_http_server(server_addr, app, handle).await
}
}
#[allow(dead_code)]
/// 308 redirect for HTTP to HTTPS
fn redirect_to_https(https_port: u16) -> Router {
Router::new().route(
"/*path",
get({
move |uri: Uri, req: http::Request<Body>| async move {
let host = req
.headers()
.get("host")
.map_or("localhost", |h| h.to_str().unwrap_or("localhost"));
let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("");
let https_url = format!("https://{host}:{https_port}{path}");
Redirect::permanent(&https_url)
}
}),
)
}
async fn start_http_server(addr: SocketAddr, app: Router, handle: axum_server::Handle) -> io::Result<()> {
debug!("Starting HTTP server...");
axum_server::bind(addr)
.handle(handle)
.serve(app.into_make_service())
.await
.map_err(io::Error::other)
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
info!("shutdown_signal ctrl_c")
},
_ = terminate => {
info!("shutdown_signal terminate")
},
}
}

View File

@@ -1,78 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_ecstore::config::GLOBAL_ServerConfig;
use tracing::{error, info, instrument};
#[instrument]
pub(crate) async fn init_event_notifier() {
info!("Initializing event notifier...");
// 1. Get the global configuration loaded by ecstore
let server_config = match GLOBAL_ServerConfig.get() {
Some(config) => config.clone(), // Clone the config to pass ownership
None => {
error!("Event notifier initialization failed: Global server config not loaded.");
return;
}
};
info!("Global server configuration loaded successfully. config: {:?}", server_config);
// 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't
if server_config
.get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
|| server_config
.get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
{
info!("'notify' subsystem not configured, skipping event notifier initialization.");
return;
}
info!("Event notifier configuration found, proceeding with initialization.");
// 3. Initialize the notification system asynchronously with a global configuration
// Put it into a separate task to avoid blocking the main initialization process
tokio::spawn(async move {
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
});
}
/// Shuts down the event notifier system gracefully
pub async fn shutdown_event_notifier() {
info!("Shutting down event notifier system...");
if !rustfs_notify::is_notification_system_initialized() {
info!("Event notifier system is not initialized, nothing to shut down.");
return;
}
let system = match rustfs_notify::notification_system() {
Some(sys) => sys,
None => {
error!("Event notifier system is not initialized.");
return;
}
};
// Call the shutdown function from the rustfs_notify module
system.shutdown().await;
info!("Event notifier system shut down successfully.");
}

View File

@@ -15,42 +15,29 @@
mod admin;
mod auth;
mod config;
mod console;
mod error;
mod event;
// mod grpc;
pub mod license;
mod logging;
mod server;
mod service;
mod storage;
mod update_checker;
mod update;
mod version;
use crate::auth::IAMAuth;
use crate::console::{CONSOLE_CONFIG, init_console_cfg};
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_ecstore::config::GLOBAL_ServerConfig;
// Ensure the correct path for parse_license is imported
use crate::event::shutdown_event_notifier;
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown};
use bytes::Bytes;
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, start_http_server, wait_for_shutdown};
use chrono::Datelike;
use clap::Parser;
use http::{HeaderMap, Request as HttpRequest, Response};
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
service::TowerToHyperService,
};
use license::init_license;
use rustfs_ahm::scanner::data_scanner::ScannerConfig;
use rustfs_ahm::{Scanner, create_ahm_services_cancel_token, shutdown_ahm_services};
use rustfs_common::globals::set_global_addr;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use rustfs_ecstore::config as ecconfig;
use rustfs_ecstore::config::GLOBAL_ConfigSys;
use rustfs_ecstore::rpc::make_server;
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::{
StorageAPI,
@@ -63,56 +50,25 @@ use rustfs_ecstore::{
update_erasure_type,
};
use rustfs_iam::init_iam_sys;
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::service::S3Service;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use socket2::SockRef;
use std::io::{Error, Result};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};
use tokio_rustls::TlsAcceptor;
use tonic::{Request, Status, metadata::MetadataValue};
use tower::ServiceBuilder;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, debug, error, info, instrument, warn};
const MI_B: usize = 1024 * 1024;
use tracing::{debug, error, info, instrument, warn};
#[cfg(all(target_os = "linux", target_env = "gnu"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(clippy::result_large_err)]
fn check_auth(req: Request<()>) -> std::result::Result<Request<()>, Status> {
let token: MetadataValue<_> = "rustfs rpc".parse().unwrap();
match req.metadata().get("authorization") {
Some(t) if token == t => Ok(req),
_ => Err(Status::unauthenticated("No valid auth token")),
}
}
#[instrument]
fn print_server_info() {
let cfg = CONSOLE_CONFIG.get().unwrap();
let current_year = chrono::Utc::now().year();
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
info!("License: {}", cfg.license());
info!("Version: {}", cfg.version_info());
info!("Docs: {}", cfg.doc());
info!("License: Apache-2.0 https://www.apache.org/licenses/LICENSE-2.0");
info!("Version: {}", version::get_version());
info!("Docs: https://rustfs.com/docs/");
}
#[tokio::main]
@@ -133,55 +89,12 @@ async fn main() -> Result<()> {
run(opt).await
}
/// Sets up the TLS acceptor if certificates are available.
#[instrument(skip(tls_path))]
async fn setup_tls_acceptor(tls_path: &str) -> Result<Option<TlsAcceptor>> {
if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() {
debug!("TLS path is not provided or does not exist, starting with HTTP");
return Ok(None);
}
debug!("Found TLS directory, checking for certificates");
// 1. Try to load all certificates from the directory (multi-cert support)
if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) {
if !cert_key_pairs.is_empty() {
debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
}
}
// 2. Fallback to legacy single certificate mode
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
}
debug!("No valid TLS certificates found in the directory, starting with HTTP");
Ok(None)
}
#[instrument(skip(opt))]
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
if let Some(region) = opt.region {
rustfs_ecstore::global::set_global_region(region);
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
}
let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?;
@@ -195,21 +108,7 @@ async fn run(opt: config::Opt) -> Result<()> {
set_global_rustfs_port(server_port);
// The listening address and port are obtained from the parameters
let listener = TcpListener::bind(server_address.clone()).await?;
// Obtain the listener address
let local_addr: SocketAddr = listener.local_addr()?;
debug!("Listening on: {}", local_addr);
let local_ip = match rustfs_utils::get_local_ip() {
Some(ip) => {
debug!("Obtained local IP address: {}", ip);
ip
}
None => {
warn!("Unable to obtain local IP address, using fallback IP: {}", local_addr.ip());
local_addr.ip()
}
};
set_global_addr(&opt.address).await;
// For RPC
let (endpoint_pools, setup_type) =
@@ -228,19 +127,6 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
// Detailed endpoint information (showing all API endpoints)
let api_endpoints = format!("http://{local_ip}:{server_port}");
let localhost_endpoint = format!("http://127.0.0.1:{server_port}");
info!(" API: {} {}", api_endpoints, localhost_endpoint);
info!(" RootUser: {}", opt.access_key.clone());
info!(" RootPass: {}", opt.secret_key.clone());
if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) {
warn!(
"Detected default credentials '{}:{}', we recommend that you change these values with 'RUSTFS_ACCESS_KEY' and 'RUSTFS_SECRET_KEY' environment variables",
DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY
);
}
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}",
@@ -252,7 +138,11 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
set_global_addr(&opt.address).await;
let state_manager = ServiceStateManager::new();
// Update service status to Starting
state_manager.update(ServiceState::Starting);
let shutdown_tx = start_http_server(&opt, state_manager.clone()).await?;
set_global_endpoints(endpoint_pools.as_ref().clone());
update_erasure_type(setup_type).await;
@@ -260,163 +150,6 @@ async fn run(opt: config::Opt) -> Result<()> {
// Initialize the local disk
init_local_disks(endpoint_pools.clone()).await.map_err(Error::other)?;
// Setup S3 service
// This project uses the S3S library to implement S3 services
let s3_service = {
let store = storage::ecfs::FS::new();
let mut b = S3ServiceBuilder::new(store.clone());
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
debug!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(IAMAuth::new(access_key, secret_key));
b.set_access(store.clone());
b.set_route(admin::make_admin_route()?);
if !opt.server_domains.is_empty() {
info!("virtual-hosted-style requests are enabled use domain_name {:?}", &opt.server_domains);
b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?);
}
b.build()
};
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let meter = opentelemetry::global::meter("system");
let obs_result = SystemObserver::init_process_observer(meter).await;
match obs_result {
Ok(_) => {
info!("Process observer initialized successfully");
}
Err(e) => {
error!("Failed to initialize process observer: {}", e);
}
}
});
let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?;
let state_manager = ServiceStateManager::new();
let worker_state_manager = state_manager.clone();
// Update service status to Starting
state_manager.update(ServiceState::Starting);
// Create shutdown channel
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1);
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
#[cfg(unix)]
let (mut sigterm_inner, mut sigint_inner) = {
// Unix platform specific code
let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler");
let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler");
(sigterm_inner, sigint_inner)
};
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = Arc::new(GracefulShutdown::new());
debug!("graceful initiated");
// service ready
worker_state_manager.update(ServiceState::Ready);
let tls_acceptor = tls_acceptor.map(Arc::new);
loop {
debug!("Waiting for new connection...");
let (socket, _) = {
#[cfg(unix)]
{
tokio::select! {
res = listener.accept() => match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
},
_ = ctrl_c.as_mut() => {
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
Some(_) = sigint_inner.recv() => {
info!("SIGINT received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
Some(_) = sigterm_inner.recv() => {
info!("SIGTERM received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
}
}
}
#[cfg(not(unix))]
{
tokio::select! {
res = listener.accept() => match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
},
_ = ctrl_c.as_mut() => {
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
}
}
}
};
let socket_ref = SockRef::from(&socket);
if let Err(err) = socket_ref.set_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) {
warn!(?err, "Failed to set set_recv_buffer_size");
}
if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) {
warn!(?err, "Failed to set set_send_buffer_size");
}
process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone());
}
worker_state_manager.update(ServiceState::Stopping);
match Arc::try_unwrap(graceful) {
Ok(g) => {
tokio::select! {
() = g.shutdown() => {
debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(Duration::from_secs(10)) => {
debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
}
Err(arc_graceful) => {
error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful);
tokio::time::sleep(Duration::from_secs(10)).await;
debug!("Timeout reached, forcing shutdown");
}
}
worker_state_manager.update(ServiceState::Stopped);
});
// init store
let store = ECStore::new(server_addr, endpoint_pools.clone()).await.inspect_err(|err| {
error!("ECStore::new {:?}", err);
@@ -427,7 +160,7 @@ async fn run(opt: config::Opt) -> Result<()> {
GLOBAL_ConfigSys.init(store.clone()).await?;
// Initialize event notifier
event::init_event_notifier().await;
init_event_notifier().await;
let buckets_list = store
.list_bucket(&BucketOptions {
@@ -455,15 +188,12 @@ async fn run(opt: config::Opt) -> Result<()> {
let _ = create_ahm_services_cancel_token();
let scanner = Scanner::new(Some(ScannerConfig::default()));
scanner.start().await?;
// init console configuration
init_console_cfg(local_ip, server_port);
print_server_info();
init_bucket_replication_pool().await;
// Async update check (optional)
tokio::spawn(async {
use crate::update_checker::{UpdateCheckError, check_updates};
use crate::update::{UpdateCheckError, check_updates};
match check_updates().await {
Ok(result) => {
@@ -493,23 +223,6 @@ async fn run(opt: config::Opt) -> Result<()> {
}
});
if opt.console_enable {
debug!("console is enabled");
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
let console_address = opt.console_address.clone();
let tls_path = opt.tls_path.clone();
if console_address.is_empty() {
error!("console_address is empty");
return Err(Error::other("console_address is empty".to_string()));
}
tokio::spawn(async move {
console::start_static_file_server(&console_address, local_ip, &access_key, &secret_key, tls_path).await;
});
}
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal
@@ -528,103 +241,6 @@ async fn run(opt: config::Opt) -> Result<()> {
Ok(())
}
/// Process a single incoming TCP connection.
///
/// This function is executed in a new Tokio task and it will:
/// 1. If TLS is configured, perform TLS handshake.
/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware.
/// 3. Use Hyper to handle HTTP requests on this connection.
/// 4. Incorporate connections into the management of elegant closures.
#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))]
fn process_connection(
socket: TcpStream,
tls_acceptor: Option<Arc<TlsAcceptor>>,
http_server: Arc<ConnBuilder<TokioExecutor>>,
s3_service: S3Service,
graceful: Arc<GracefulShutdown>,
) {
tokio::spawn(async move {
// Build services inside each connected task to avoid passing complex service types across tasks,
// It also ensures that each connection has an independent service instance.
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let hybrid_service = ServiceBuilder::new()
.layer(CatchPanicLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::info_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
);
for (header_name, header_value) in request.headers() {
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" {
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
}
}
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
info!(
counter.rustfs_api_requests_total = 1_u64,
key_request_method = %request.method().to_string(),
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
.layer(CorsLayer::permissive())
.service(hybrid(s3_service, rpc_service));
let hybrid_service = TowerToHyperService::new(hybrid_service);
// Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor
if let Some(acceptor) = tls_acceptor {
debug!("TLS handshake start");
match acceptor.accept(socket).await {
Ok(tls_socket) => {
debug!("TLS handshake successful");
let stream = TokioIo::new(tls_socket);
let conn = http_server.serve_connection(stream, hybrid_service);
if let Err(err) = graceful.watch(conn).await {
handle_connection_error(&*err);
}
}
Err(err) => {
error!(?err, "TLS handshake failed");
return; // Failed to end the task directly
}
}
debug!("TLS handshake success");
} else {
debug!("Http handshake start");
let stream = TokioIo::new(socket);
let conn = http_server.serve_connection(stream, hybrid_service);
if let Err(err) = graceful.watch(conn).await {
handle_connection_error(&*err);
}
debug!("Http handshake success");
};
});
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!("Shutdown signal received in main thread");
@@ -653,25 +269,63 @@ async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &toki
info!("Server stopped current ");
}
/// Handles connection errors by logging them with appropriate severity
fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if hyper_err.is_incomplete_message() {
warn!("The HTTP connection is closed prematurely and the message is not completed:{}", hyper_err);
} else if hyper_err.is_closed() {
warn!("The HTTP connection is closed:{}", hyper_err);
} else if hyper_err.is_parse() {
error!("HTTP message parsing failed:{}", hyper_err);
} else if hyper_err.is_user() {
error!("HTTP user-custom error:{}", hyper_err);
} else if hyper_err.is_canceled() {
warn!("The HTTP connection is canceled:{}", hyper_err);
} else {
error!("Unknown hyper error:{:?}", hyper_err);
#[instrument]
pub(crate) async fn init_event_notifier() {
info!("Initializing event notifier...");
// 1. Get the global configuration loaded by ecstore
let server_config = match GLOBAL_ServerConfig.get() {
Some(config) => config.clone(), // Clone the config to pass ownership
None => {
error!("Event notifier initialization failed: Global server config not loaded.");
return;
}
} else if let Some(io_err) = err.downcast_ref::<Error>() {
error!("Unknown connection IO error:{}", io_err);
} else {
error!("Unknown connection error type:{:?}", err);
};
info!("Global server configuration loaded successfully. config: {:?}", server_config);
// 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't
if server_config
.get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
|| server_config
.get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
{
info!("'notify' subsystem not configured, skipping event notifier initialization.");
return;
}
info!("Event notifier configuration found, proceeding with initialization.");
// 3. Initialize the notification system asynchronously with a global configuration
// Put it into a separate task to avoid blocking the main initialization process
tokio::spawn(async move {
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
});
}
/// Shuts down the event notifier system gracefully
pub async fn shutdown_event_notifier() {
info!("Shutting down event notifier system...");
if !rustfs_notify::is_notification_system_initialized() {
info!("Event notifier system is not initialized, nothing to shut down.");
return;
}
let system = match rustfs_notify::notification_system() {
Some(sys) => sys,
None => {
error!("Event notifier system is not initialized.");
return;
}
};
// Call the shutdown function from the rustfs_notify module
system.shutdown().await;
info!("Event notifier system shut down successfully.");
}

413
rustfs/src/server/http.rs Normal file
View File

@@ -0,0 +1,413 @@
// use crate::admin::console::{CONSOLE_CONFIG, init_console_cfg};
use crate::auth::IAMAuth;
// Ensure the correct path for parse_license is imported
use crate::admin;
use crate::config;
use crate::server::hybrid::hybrid;
use crate::server::{ServiceState, ServiceStateManager};
use crate::storage;
use bytes::Bytes;
use http::{HeaderMap, Request as HttpRequest, Response};
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
service::TowerToHyperService,
};
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_ecstore::rpc::make_server;
use rustfs_obs::SystemObserver;
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::service::S3Service;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use socket2::SockRef;
use std::io::{Error, Result};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};
use tokio_rustls::TlsAcceptor;
use tonic::{Request, Status, metadata::MetadataValue};
use tower::ServiceBuilder;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, debug, error, info, instrument, warn};
const MI_B: usize = 1024 * 1024;
pub async fn start_http_server(
opt: &config::Opt,
worker_state_manager: ServiceStateManager,
) -> std::io::Result<tokio::sync::broadcast::Sender<()>> {
let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?;
let server_port = server_addr.port();
let server_address = server_addr.to_string();
// The listening address and port are obtained from the parameters
let listener = TcpListener::bind(server_address.clone()).await?;
// Obtain the listener address
let local_addr: SocketAddr = listener.local_addr()?;
debug!("Listening on: {}", local_addr);
let local_ip = match rustfs_utils::get_local_ip() {
Some(ip) => {
debug!("Obtained local IP address: {}", ip);
ip
}
None => {
warn!("Unable to obtain local IP address, using fallback IP: {}", local_addr.ip());
local_addr.ip()
}
};
// Detailed endpoint information (showing all API endpoints)
let api_endpoints = format!("http://{local_ip}:{server_port}");
let localhost_endpoint = format!("http://127.0.0.1:{server_port}");
info!(" API: {} {}", api_endpoints, localhost_endpoint);
if opt.console_enable {
info!(
" WebUI: http://{}:{}/rustfs/console/index.html http://127.0.0.1:{}/rustfs/console/index.html http://{}/rustfs/console/index.html",
local_ip, server_port, server_port, server_address
);
}
info!(" RootUser: {}", opt.access_key.clone());
info!(" RootPass: {}", opt.secret_key.clone());
if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) {
warn!(
"Detected default credentials '{}:{}', we recommend that you change these values with 'RUSTFS_ACCESS_KEY' and 'RUSTFS_SECRET_KEY' environment variables",
DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY
);
}
// Setup S3 service
// This project uses the S3S library to implement S3 services
let s3_service = {
let store = storage::ecfs::FS::new();
let mut b = S3ServiceBuilder::new(store.clone());
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
debug!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(IAMAuth::new(access_key, secret_key));
b.set_access(store.clone());
b.set_route(admin::make_admin_route(opt.console_enable)?);
if !opt.server_domains.is_empty() {
info!("virtual-hosted-style requests are enabled use domain_name {:?}", &opt.server_domains);
b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?);
}
b.build()
};
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let meter = opentelemetry::global::meter("system");
let obs_result = SystemObserver::init_process_observer(meter).await;
match obs_result {
Ok(_) => {
info!("Process observer initialized successfully");
}
Err(e) => {
error!("Failed to initialize process observer: {}", e);
}
}
});
let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?;
// Create shutdown channel
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1);
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
#[cfg(unix)]
let (mut sigterm_inner, mut sigint_inner) = {
// Unix platform specific code
let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler");
let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler");
(sigterm_inner, sigint_inner)
};
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = Arc::new(GracefulShutdown::new());
debug!("graceful initiated");
// service ready
worker_state_manager.update(ServiceState::Ready);
let tls_acceptor = tls_acceptor.map(Arc::new);
loop {
debug!("Waiting for new connection...");
let (socket, _) = {
#[cfg(unix)]
{
tokio::select! {
res = listener.accept() => match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
},
_ = ctrl_c.as_mut() => {
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
Some(_) = sigint_inner.recv() => {
info!("SIGINT received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
Some(_) = sigterm_inner.recv() => {
info!("SIGTERM received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
}
}
}
#[cfg(not(unix))]
{
tokio::select! {
res = listener.accept() => match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
},
_ = ctrl_c.as_mut() => {
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
},
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
}
}
}
};
let socket_ref = SockRef::from(&socket);
if let Err(err) = socket_ref.set_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) {
warn!(?err, "Failed to set set_recv_buffer_size");
}
if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) {
warn!(?err, "Failed to set set_send_buffer_size");
}
process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone());
}
worker_state_manager.update(ServiceState::Stopping);
match Arc::try_unwrap(graceful) {
Ok(g) => {
tokio::select! {
() = g.shutdown() => {
debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(Duration::from_secs(10)) => {
debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
}
Err(arc_graceful) => {
error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful);
tokio::time::sleep(Duration::from_secs(10)).await;
debug!("Timeout reached, forcing shutdown");
}
}
worker_state_manager.update(ServiceState::Stopped);
});
Ok(shutdown_tx)
}
/// Sets up the TLS acceptor if certificates are available.
#[instrument(skip(tls_path))]
async fn setup_tls_acceptor(tls_path: &str) -> Result<Option<TlsAcceptor>> {
if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() {
debug!("TLS path is not provided or does not exist, starting with HTTP");
return Ok(None);
}
debug!("Found TLS directory, checking for certificates");
// 1. Try to load all certificates from the directory (multi-cert support)
if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) {
if !cert_key_pairs.is_empty() {
debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
}
}
// 2. Fallback to legacy single certificate mode
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
}
debug!("No valid TLS certificates found in the directory, starting with HTTP");
Ok(None)
}
/// Process a single incoming TCP connection.
///
/// This function is executed in a new Tokio task and it will:
/// 1. If TLS is configured, perform TLS handshake.
/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware.
/// 3. Use Hyper to handle HTTP requests on this connection.
/// 4. Incorporate connections into the management of elegant closures.
#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))]
fn process_connection(
socket: TcpStream,
tls_acceptor: Option<Arc<TlsAcceptor>>,
http_server: Arc<ConnBuilder<TokioExecutor>>,
s3_service: S3Service,
graceful: Arc<GracefulShutdown>,
) {
tokio::spawn(async move {
// Build services inside each connected task to avoid passing complex service types across tasks,
// It also ensures that each connection has an independent service instance.
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let service = hybrid(s3_service, rpc_service);
let hybrid_service = ServiceBuilder::new()
.layer(CatchPanicLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::info_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
);
for (header_name, header_value) in request.headers() {
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" {
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
}
}
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
info!(
counter.rustfs_api_requests_total = 1_u64,
key_request_method = %request.method().to_string(),
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
.layer(CorsLayer::permissive())
.service(service);
let hybrid_service = TowerToHyperService::new(hybrid_service);
// Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor
if let Some(acceptor) = tls_acceptor {
debug!("TLS handshake start");
match acceptor.accept(socket).await {
Ok(tls_socket) => {
debug!("TLS handshake successful");
let stream = TokioIo::new(tls_socket);
let conn = http_server.serve_connection(stream, hybrid_service);
if let Err(err) = graceful.watch(conn).await {
handle_connection_error(&*err);
}
}
Err(err) => {
error!(?err, "TLS handshake failed");
return; // Failed to end the task directly
}
}
debug!("TLS handshake success");
} else {
debug!("Http handshake start");
let stream = TokioIo::new(socket);
let conn = http_server.serve_connection(stream, hybrid_service);
if let Err(err) = graceful.watch(conn).await {
handle_connection_error(&*err);
}
debug!("Http handshake success");
};
});
}
/// Handles connection errors by logging them with appropriate severity
fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if hyper_err.is_incomplete_message() {
warn!("The HTTP connection is closed prematurely and the message is not completed:{}", hyper_err);
} else if hyper_err.is_closed() {
warn!("The HTTP connection is closed:{}", hyper_err);
} else if hyper_err.is_parse() {
error!("HTTP message parsing failed:{}", hyper_err);
} else if hyper_err.is_user() {
error!("HTTP user-custom error:{}", hyper_err);
} else if hyper_err.is_canceled() {
warn!("The HTTP connection is canceled:{}", hyper_err);
} else {
error!("Unknown hyper error:{:?}", hyper_err);
}
} else if let Some(io_err) = err.downcast_ref::<Error>() {
error!("Unknown connection IO error:{}", io_err);
} else {
error!("Unknown connection error type:{:?}", err);
}
}
#[allow(clippy::result_large_err)]
fn check_auth(req: Request<()>) -> std::result::Result<Request<()>, Status> {
let token: MetadataValue<_> = "rustfs rpc".parse().unwrap();
match req.metadata().get("authorization") {
Some(t) if token == t => Ok(req),
_ => Err(Status::unauthenticated("No valid auth token")),
}
}

View File

@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod http;
mod hybrid;
mod service_state;
pub(crate) use http::start_http_server;
pub(crate) use service_state::SHUTDOWN_TIMEOUT;
pub(crate) use service_state::ServiceState;
pub(crate) use service_state::ServiceStateManager;

View File

@@ -17,6 +17,8 @@ use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info, warn};
use crate::version;
/// Update check related errors
#[derive(Error, Debug)]
pub enum UpdateCheckError {
@@ -231,25 +233,7 @@ impl VersionChecker {
/// Get current version number
pub fn get_current_version() -> String {
use crate::console::CONSOLE_CONFIG;
if let Some(config) = CONSOLE_CONFIG.get() {
// Extract version from configuration
let version_str = config.version();
// Extract version part, removing extra information
if let Some(release_part) = version_str.split_whitespace().next() {
if release_part.starts_with("RELEASE.") {
release_part.trim_start_matches("RELEASE.").to_string()
} else {
release_part.to_string()
}
} else {
rustfs_config::VERSION.to_string()
}
} else {
// If configuration is not initialized, use constant version
rustfs_config::VERSION.to_string()
}
version::get_version()
}
/// Convenience function for async update checking

13
rustfs/src/version.rs Normal file
View File

@@ -0,0 +1,13 @@
use shadow_rs::shadow;
shadow!(build);
#[allow(clippy::const_is_empty)]
pub fn get_version() -> String {
if !build::TAG.is_empty() {
build::TAG.to_string()
} else if !build::SHORT_COMMIT.is_empty() {
format!("@{}", build::SHORT_COMMIT)
} else {
build::PKG_VERSION.to_string()
}
}