diff --git a/rustfs/src/admin/console.rs b/rustfs/src/admin/console.rs new file mode 100644 index 00000000..76a0e89c --- /dev/null +++ b/rustfs/src/admin/console.rs @@ -0,0 +1,401 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// use crate::license::get_license; +use axum::{ + // Router, + body::Body, + http::{Response, StatusCode}, + response::IntoResponse, + // routing::get, +}; +// use axum_extra::extract::Host; +// use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +// use rustfs_utils::net::parse_and_resolve_address; +// use std::io; + +use http::Uri; +// use axum::response::Redirect; +// use axum_server::tls_rustls::RustlsConfig; +// use http::{HeaderMap, HeaderName, Uri, header}; +use mime_guess::from_path; +use rust_embed::RustEmbed; +// use serde::Serialize; +// use shadow_rs::shadow; +// use std::net::{IpAddr, SocketAddr}; +// use std::sync::OnceLock; +// use std::time::Duration; +// use tokio::signal; +// use tower_http::cors::{Any, CorsLayer}; +// use tower_http::trace::TraceLayer; +// use tracing::{debug, error, info, instrument}; + +// shadow!(build); + +// const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3"; + +#[derive(RustEmbed)] +#[folder = "$CARGO_MANIFEST_DIR/static"] +struct StaticFiles; + +/// Static file handler +pub(crate) async fn static_handler(uri: Uri) -> impl IntoResponse { + let mut path = uri.path().trim_start_matches('/'); + if path.is_empty() { + path = "index.html" + } + if let Some(file) = StaticFiles::get(path) { + let mime_type = from_path(path).first_or_octet_stream(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", mime_type.to_string()) + .body(Body::from(file.data)) + .unwrap() + } else if let Some(file) = StaticFiles::get("index.html") { + let mime_type = from_path("index.html").first_or_octet_stream(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", mime_type.to_string()) + .body(Body::from(file.data)) + .unwrap() + } else { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("404 Not Found")) + .unwrap() + } +} + +// #[derive(Debug, Serialize, Clone)] +// pub(crate) struct Config { +// #[serde(skip)] +// port: u16, +// api: Api, +// s3: S3, +// release: Release, +// license: License, +// doc: String, +// } + +// impl Config { +// fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self { +// Config { +// port, +// api: Api { +// base_url: format!("http://{local_ip}:{port}/{RUSTFS_ADMIN_PREFIX}"), +// }, +// s3: S3 { +// endpoint: format!("http://{local_ip}:{port}"), +// region: "cn-east-1".to_owned(), +// }, +// release: Release { +// version: version.to_string(), +// date: date.to_string(), +// }, +// license: License { +// name: "Apache-2.0".to_string(), +// url: "https://www.apache.org/licenses/LICENSE-2.0".to_string(), +// }, +// doc: "https://rustfs.com/docs/".to_string(), +// } +// } + +// fn to_json(&self) -> String { +// serde_json::to_string(self).unwrap_or_default() +// } + +// pub(crate) fn version_info(&self) -> String { +// format!( +// "RELEASE.{}@{} (rust {} {})", +// self.release.date.clone(), +// self.release.version.clone().trim_start_matches('@'), +// build::RUST_VERSION, +// build::BUILD_TARGET +// ) +// } + +// pub(crate) fn version(&self) -> String { +// self.release.version.clone() +// } + +// pub(crate) fn license(&self) -> String { +// format!("{} {}", self.license.name.clone(), self.license.url.clone()) +// } + +// pub(crate) fn doc(&self) -> String { +// self.doc.clone() +// } +// } + +// #[derive(Debug, Serialize, Clone)] +// struct Api { +// #[serde(rename = "baseURL")] +// base_url: String, +// } + +// #[derive(Debug, Serialize, Clone)] +// struct S3 { +// endpoint: String, +// region: String, +// } + +// #[derive(Debug, Serialize, Clone)] +// struct Release { +// version: String, +// date: String, +// } + +// #[derive(Debug, Serialize, Clone)] +// struct License { +// name: String, +// url: String, +// } + +// pub(crate) static CONSOLE_CONFIG: OnceLock = OnceLock::new(); + +// #[allow(clippy::const_is_empty)] +// pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) { +// CONSOLE_CONFIG.get_or_init(|| { +// let ver = { +// if !build::TAG.is_empty() { +// build::TAG.to_string() +// } else if !build::SHORT_COMMIT.is_empty() { +// format!("@{}", build::SHORT_COMMIT) +// } else { +// build::PKG_VERSION.to_string() +// } +// }; + +// Config::new(local_ip, port, ver.as_str(), build::COMMIT_DATE_3339) +// }); +// } + +// // fn is_socket_addr_or_ip_addr(host: &str) -> bool { +// // host.parse::().is_ok() || host.parse::().is_ok() +// // } + +// #[allow(dead_code)] +// async fn license_handler() -> impl IntoResponse { +// let license = get_license().unwrap_or_default(); + +// Response::builder() +// .header("content-type", "application/json") +// .status(StatusCode::OK) +// .body(Body::from(serde_json::to_string(&license).unwrap_or_default())) +// .unwrap() +// } + +// fn _is_private_ip(ip: IpAddr) -> bool { +// match ip { +// IpAddr::V4(ip) => { +// let octets = ip.octets(); +// // 10.0.0.0/8 +// octets[0] == 10 || +// // 172.16.0.0/12 +// (octets[0] == 172 && (octets[1] >= 16 && octets[1] <= 31)) || +// // 192.168.0.0/16 +// (octets[0] == 192 && octets[1] == 168) +// } +// IpAddr::V6(_) => false, +// } +// } + +// #[allow(clippy::const_is_empty)] +// #[allow(dead_code)] +// #[instrument(fields(host))] +// async fn config_handler(uri: Uri, Host(host): Host, headers: HeaderMap) -> impl IntoResponse { +// // Get the scheme from the headers or use the URI scheme +// let scheme = headers +// .get(HeaderName::from_static("x-forwarded-proto")) +// .and_then(|value| value.to_str().ok()) +// .unwrap_or_else(|| uri.scheme().map(|s| s.as_str()).unwrap_or("http")); + +// // Print logs for debugging +// info!("Scheme: {}, ", scheme); + +// // Get the host from the uri and use the value of the host extractor if it doesn't have one +// let host = uri.host().unwrap_or(host.as_str()); + +// let host = if let Ok(socket_addr) = host.parse::() { +// // Successfully parsed, it's in IP:Port format. +// // For IPv6, we need to enclose it in brackets to form a valid URL. +// let ip = socket_addr.ip(); +// if ip.is_ipv6() { format!("[{ip}]") } else { format!("{ip}") } +// } else { +// // Failed to parse, it might be a domain name or a bare IP, use it as is. +// host.to_string() +// }; + +// // Make a copy of the current configuration +// let mut cfg = match CONSOLE_CONFIG.get() { +// Some(cfg) => cfg.clone(), +// None => { +// error!("Console configuration not initialized"); +// return Response::builder() +// .status(StatusCode::INTERNAL_SERVER_ERROR) +// .body(Body::from("Console configuration not initialized")) +// .unwrap(); +// } +// }; + +// let url = format!("{}://{}:{}", scheme, host, cfg.port); +// cfg.api.base_url = format!("{url}{RUSTFS_ADMIN_PREFIX}"); +// cfg.s3.endpoint = url; + +// Response::builder() +// .header("content-type", "application/json") +// .status(StatusCode::OK) +// .body(Body::from(cfg.to_json())) +// .unwrap() +// } + +// pub fn register_router() -> Router { +// Router::new() +// // .route("/license", get(license_handler)) +// // .route("/config.json", get(config_handler)) +// .fallback_service(get(static_handler)) +// } + +// #[allow(dead_code)] +// pub async fn start_static_file_server( +// addrs: &str, +// local_ip: IpAddr, +// access_key: &str, +// secret_key: &str, +// tls_path: Option, +// ) { +// // Configure CORS +// let cors = CorsLayer::new() +// .allow_origin(Any) // In the production environment, we recommend that you specify a specific domain name +// .allow_methods([http::Method::GET, http::Method::POST]) +// .allow_headers([header::CONTENT_TYPE]); + +// // Create a route +// let app = register_router() +// .layer(cors) +// .layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true)) +// .layer(TraceLayer::new_for_http()); + +// let server_addr = parse_and_resolve_address(addrs).expect("Failed to parse socket address"); +// let server_port = server_addr.port(); +// let server_address = server_addr.to_string(); + +// info!( +// "WebUI: http://{}:{} http://127.0.0.1:{} http://{}", +// local_ip, server_port, server_port, server_address +// ); +// info!(" RootUser: {}", access_key); +// info!(" RootPass: {}", secret_key); + +// // Check and start the HTTPS/HTTP server +// match start_server(server_addr, tls_path, app.clone()).await { +// Ok(_) => info!("Server shutdown gracefully"), +// Err(e) => error!("Server error: {}", e), +// } +// } + +// async fn start_server(server_addr: SocketAddr, tls_path: Option, app: Router) -> io::Result<()> { +// let tls_path = tls_path.unwrap_or_default(); +// let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}"); +// let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}"); +// let handle = axum_server::Handle::new(); +// // create a signal off listening task +// let handle_clone = handle.clone(); +// tokio::spawn(async move { +// shutdown_signal().await; +// info!("Initiating graceful shutdown..."); +// handle_clone.graceful_shutdown(Some(Duration::from_secs(10))); +// }); + +// let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok(); +// info!("Console TLS certs: {:?}", has_tls_certs); +// if has_tls_certs { +// info!("Found TLS certificates, starting with HTTPS"); +// match RustlsConfig::from_pem_file(cert_path, key_path).await { +// Ok(config) => { +// info!("Starting HTTPS server..."); +// axum_server::bind_rustls(server_addr, config) +// .handle(handle.clone()) +// .serve(app.into_make_service()) +// .await +// .map_err(io::Error::other)?; + +// info!("HTTPS server running on https://{}", server_addr); + +// Ok(()) +// } +// Err(e) => { +// error!("Failed to create TLS config: {}", e); +// start_http_server(server_addr, app, handle).await +// } +// } +// } else { +// info!("TLS certificates not found at {} and {}", key_path, cert_path); +// start_http_server(server_addr, app, handle).await +// } +// } + +// #[allow(dead_code)] +// /// 308 redirect for HTTP to HTTPS +// fn redirect_to_https(https_port: u16) -> Router { +// Router::new().route( +// "/*path", +// get({ +// move |uri: Uri, req: http::Request| async move { +// let host = req +// .headers() +// .get("host") +// .map_or("localhost", |h| h.to_str().unwrap_or("localhost")); +// let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or(""); +// let https_url = format!("https://{host}:{https_port}{path}"); +// Redirect::permanent(&https_url) +// } +// }), +// ) +// } + +// async fn start_http_server(addr: SocketAddr, app: Router, handle: axum_server::Handle) -> io::Result<()> { +// debug!("Starting HTTP server..."); +// axum_server::bind(addr) +// .handle(handle) +// .serve(app.into_make_service()) +// .await +// .map_err(io::Error::other) +// } + +// async fn shutdown_signal() { +// let ctrl_c = async { +// signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); +// }; + +// #[cfg(unix)] +// let terminate = async { +// signal::unix::signal(signal::unix::SignalKind::terminate()) +// .expect("failed to install signal handler") +// .recv() +// .await; +// }; + +// #[cfg(not(unix))] +// let terminate = std::future::pending::<()>(); + +// tokio::select! { +// _ = ctrl_c => { +// info!("shutdown_signal ctrl_c") +// }, +// _ = terminate => { +// info!("shutdown_signal terminate") +// }, +// } +// } diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index cbb74ec1..7b09bd1a 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod console; pub mod handlers; pub mod router; mod rpc; @@ -32,8 +33,8 @@ use s3s::route::S3Route; const ADMIN_PREFIX: &str = "/rustfs/admin"; -pub fn make_admin_route() -> std::io::Result { - let mut r: S3Router = S3Router::new(); +pub fn make_admin_route(console_enabled: bool) -> std::io::Result { + let mut r: S3Router = S3Router::new(console_enabled); // 1 r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?; diff --git a/rustfs/src/admin/router.rs b/rustfs/src/admin/router.rs index ee426c60..3b3149e6 100644 --- a/rustfs/src/admin/router.rs +++ b/rustfs/src/admin/router.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use axum::routing::get; use hyper::HeaderMap; use hyper::Method; use hyper::StatusCode; @@ -27,20 +28,41 @@ use s3s::S3Result; use s3s::header; use s3s::route::S3Route; use s3s::s3_error; +use tower::Service; use tracing::error; -use super::ADMIN_PREFIX; -use super::rpc::RPC_PREFIX; +use crate::admin::ADMIN_PREFIX; +use crate::admin::console; +use crate::admin::rpc::RPC_PREFIX; + +const CONSOLE_PREFIX: &str = "/rustfs/console"; pub struct S3Router { router: Router, + console_enabled: bool, + console_router: Option>, } impl S3Router { - pub fn new() -> Self { + pub fn new(console_enabled: bool) -> Self { let router = Router::new(); - Self { router } + let console_router = if console_enabled { + Some( + axum::Router::new() + .nest(CONSOLE_PREFIX, axum::Router::new().fallback_service(get(console::static_handler))) + .fallback_service(get(console::static_handler)) + .into_service::(), + ) + } else { + None + }; + + Self { + router, + console_enabled, + console_router, + } } pub fn insert(&mut self, method: Method, path: &str, operation: T) -> std::io::Result<()> { @@ -60,7 +82,7 @@ impl S3Router { impl Default for S3Router { fn default() -> Self { - Self::new() + Self::new(false) } } @@ -79,10 +101,23 @@ where } } - uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) + uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(CONSOLE_PREFIX) } async fn call(&self, req: S3Request) -> S3Result> { + if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) { + if let Some(console_router) = &self.console_router { + let mut console_router = console_router.clone(); + let req = convert_request(req); + let result = console_router.call(req).await; + return match result { + Ok(resp) => Ok(convert_response(resp)), + Err(e) => Err(s3_error!(InternalError, "{}", e)), + }; + } + return Err(s3_error!(InternalError, "console is not enabled")); + } + let uri = format!("{}|{}", &req.method, req.uri.path()); // warn!("get uri {}", &uri); @@ -99,6 +134,10 @@ where // check_access before call async fn check_access(&self, req: &mut S3Request) -> S3Result<()> { + if self.console_enabled && req.uri.path().starts_with(CONSOLE_PREFIX) { + return Ok(()); + } + // Check RPC signature verification if req.uri.path().starts_with(RPC_PREFIX) { // Skip signature verification for HEAD requests (health checks) @@ -134,3 +173,34 @@ impl Operation for AdminOperation { self.0.call(req, params).await } } + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct Extra { + pub credentials: Option, + pub region: Option, + pub service: Option, +} + +fn convert_request(req: S3Request) -> http::Request { + let (mut parts, _) = http::Request::new(Body::empty()).into_parts(); + parts.method = req.method; + parts.uri = req.uri; + parts.headers = req.headers; + parts.extensions = req.extensions; + parts.extensions.insert(Extra { + credentials: req.credentials, + region: req.region, + service: req.service, + }); + http::Request::from_parts(parts, req.input) +} + +fn convert_response(resp: http::Response) -> S3Response { + let (parts, body) = resp.into_parts(); + let mut s3_resp = S3Response::new(Body::http_body_unsync(body)); + s3_resp.status = Some(parts.status); + s3_resp.headers = parts.headers; + s3_resp.extensions = parts.extensions; + s3_resp +} diff --git a/rustfs/src/console.rs b/rustfs/src/console.rs deleted file mode 100644 index 5160f6b7..00000000 --- a/rustfs/src/console.rs +++ /dev/null @@ -1,391 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::license::get_license; -use axum::{ - Router, - body::Body, - http::{Response, StatusCode}, - response::IntoResponse, - routing::get, -}; -use axum_extra::extract::Host; -use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; -use rustfs_utils::net::parse_and_resolve_address; -use std::io; - -use axum::response::Redirect; -use axum_server::tls_rustls::RustlsConfig; -use http::{HeaderMap, HeaderName, Uri, header}; -use mime_guess::from_path; -use rust_embed::RustEmbed; -use serde::Serialize; -use shadow_rs::shadow; -use std::net::{IpAddr, SocketAddr}; -use std::sync::OnceLock; -use std::time::Duration; -use tokio::signal; -use tower_http::cors::{Any, CorsLayer}; -use tower_http::trace::TraceLayer; -use tracing::{debug, error, info, instrument}; - -shadow!(build); - -const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3"; - -#[derive(RustEmbed)] -#[folder = "$CARGO_MANIFEST_DIR/static"] -struct StaticFiles; - -/// Static file handler -async fn static_handler(uri: Uri) -> impl IntoResponse { - let mut path = uri.path().trim_start_matches('/'); - if path.is_empty() { - path = "index.html" - } - if let Some(file) = StaticFiles::get(path) { - let mime_type = from_path(path).first_or_octet_stream(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", mime_type.to_string()) - .body(Body::from(file.data)) - .unwrap() - } else if let Some(file) = StaticFiles::get("index.html") { - let mime_type = from_path("index.html").first_or_octet_stream(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", mime_type.to_string()) - .body(Body::from(file.data)) - .unwrap() - } else { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("404 Not Found")) - .unwrap() - } -} - -#[derive(Debug, Serialize, Clone)] -pub(crate) struct Config { - #[serde(skip)] - port: u16, - api: Api, - s3: S3, - release: Release, - license: License, - doc: String, -} - -impl Config { - fn new(local_ip: IpAddr, port: u16, version: &str, date: &str) -> Self { - Config { - port, - api: Api { - base_url: format!("http://{local_ip}:{port}/{RUSTFS_ADMIN_PREFIX}"), - }, - s3: S3 { - endpoint: format!("http://{local_ip}:{port}"), - region: "cn-east-1".to_owned(), - }, - release: Release { - version: version.to_string(), - date: date.to_string(), - }, - license: License { - name: "Apache-2.0".to_string(), - url: "https://www.apache.org/licenses/LICENSE-2.0".to_string(), - }, - doc: "https://rustfs.com/docs/".to_string(), - } - } - - fn to_json(&self) -> String { - serde_json::to_string(self).unwrap_or_default() - } - - pub(crate) fn version_info(&self) -> String { - format!( - "RELEASE.{}@{} (rust {} {})", - self.release.date.clone(), - self.release.version.clone().trim_start_matches('@'), - build::RUST_VERSION, - build::BUILD_TARGET - ) - } - - pub(crate) fn version(&self) -> String { - self.release.version.clone() - } - - pub(crate) fn license(&self) -> String { - format!("{} {}", self.license.name.clone(), self.license.url.clone()) - } - - pub(crate) fn doc(&self) -> String { - self.doc.clone() - } -} - -#[derive(Debug, Serialize, Clone)] -struct Api { - #[serde(rename = "baseURL")] - base_url: String, -} - -#[derive(Debug, Serialize, Clone)] -struct S3 { - endpoint: String, - region: String, -} - -#[derive(Debug, Serialize, Clone)] -struct Release { - version: String, - date: String, -} - -#[derive(Debug, Serialize, Clone)] -struct License { - name: String, - url: String, -} - -pub(crate) static CONSOLE_CONFIG: OnceLock = OnceLock::new(); - -#[allow(clippy::const_is_empty)] -pub(crate) fn init_console_cfg(local_ip: IpAddr, port: u16) { - CONSOLE_CONFIG.get_or_init(|| { - let ver = { - if !build::TAG.is_empty() { - build::TAG.to_string() - } else if !build::SHORT_COMMIT.is_empty() { - format!("@{}", build::SHORT_COMMIT) - } else { - build::PKG_VERSION.to_string() - } - }; - - Config::new(local_ip, port, ver.as_str(), build::COMMIT_DATE_3339) - }); -} - -// fn is_socket_addr_or_ip_addr(host: &str) -> bool { -// host.parse::().is_ok() || host.parse::().is_ok() -// } - -async fn license_handler() -> impl IntoResponse { - let license = get_license().unwrap_or_default(); - - Response::builder() - .header("content-type", "application/json") - .status(StatusCode::OK) - .body(Body::from(serde_json::to_string(&license).unwrap_or_default())) - .unwrap() -} - -fn _is_private_ip(ip: IpAddr) -> bool { - match ip { - IpAddr::V4(ip) => { - let octets = ip.octets(); - // 10.0.0.0/8 - octets[0] == 10 || - // 172.16.0.0/12 - (octets[0] == 172 && (octets[1] >= 16 && octets[1] <= 31)) || - // 192.168.0.0/16 - (octets[0] == 192 && octets[1] == 168) - } - IpAddr::V6(_) => false, - } -} - -#[allow(clippy::const_is_empty)] -#[instrument(fields(host))] -async fn config_handler(uri: Uri, Host(host): Host, headers: HeaderMap) -> impl IntoResponse { - // Get the scheme from the headers or use the URI scheme - let scheme = headers - .get(HeaderName::from_static("x-forwarded-proto")) - .and_then(|value| value.to_str().ok()) - .unwrap_or_else(|| uri.scheme().map(|s| s.as_str()).unwrap_or("http")); - - // Print logs for debugging - info!("Scheme: {}, ", scheme); - - // Get the host from the uri and use the value of the host extractor if it doesn't have one - let host = uri.host().unwrap_or(host.as_str()); - - let host = if let Ok(socket_addr) = host.parse::() { - // Successfully parsed, it's in IP:Port format. - // For IPv6, we need to enclose it in brackets to form a valid URL. - let ip = socket_addr.ip(); - if ip.is_ipv6() { format!("[{ip}]") } else { format!("{ip}") } - } else { - // Failed to parse, it might be a domain name or a bare IP, use it as is. - host.to_string() - }; - - // Make a copy of the current configuration - let mut cfg = match CONSOLE_CONFIG.get() { - Some(cfg) => cfg.clone(), - None => { - error!("Console configuration not initialized"); - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from("Console configuration not initialized")) - .unwrap(); - } - }; - - let url = format!("{}://{}:{}", scheme, host, cfg.port); - cfg.api.base_url = format!("{url}{RUSTFS_ADMIN_PREFIX}"); - cfg.s3.endpoint = url; - - Response::builder() - .header("content-type", "application/json") - .status(StatusCode::OK) - .body(Body::from(cfg.to_json())) - .unwrap() -} - -pub async fn start_static_file_server( - addrs: &str, - local_ip: IpAddr, - access_key: &str, - secret_key: &str, - tls_path: Option, -) { - // Configure CORS - let cors = CorsLayer::new() - .allow_origin(Any) // In the production environment, we recommend that you specify a specific domain name - .allow_methods([http::Method::GET, http::Method::POST]) - .allow_headers([header::CONTENT_TYPE]); - // Create a route - let app = Router::new() - .route("/license", get(license_handler)) - .route("/config.json", get(config_handler)) - .fallback_service(get(static_handler)) - .layer(cors) - .layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true)) - .layer(TraceLayer::new_for_http()); - - let server_addr = parse_and_resolve_address(addrs).expect("Failed to parse socket address"); - let server_port = server_addr.port(); - let server_address = server_addr.to_string(); - - info!( - "WebUI: http://{}:{} http://127.0.0.1:{} http://{}", - local_ip, server_port, server_port, server_address - ); - info!(" RootUser: {}", access_key); - info!(" RootPass: {}", secret_key); - - // Check and start the HTTPS/HTTP server - match start_server(server_addr, tls_path, app.clone()).await { - Ok(_) => info!("Server shutdown gracefully"), - Err(e) => error!("Server error: {}", e), - } -} -async fn start_server(server_addr: SocketAddr, tls_path: Option, app: Router) -> io::Result<()> { - let tls_path = tls_path.unwrap_or_default(); - let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}"); - let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}"); - let handle = axum_server::Handle::new(); - // create a signal off listening task - let handle_clone = handle.clone(); - tokio::spawn(async move { - shutdown_signal().await; - info!("Initiating graceful shutdown..."); - handle_clone.graceful_shutdown(Some(Duration::from_secs(10))); - }); - - let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok(); - info!("Console TLS certs: {:?}", has_tls_certs); - if has_tls_certs { - info!("Found TLS certificates, starting with HTTPS"); - match RustlsConfig::from_pem_file(cert_path, key_path).await { - Ok(config) => { - info!("Starting HTTPS server..."); - axum_server::bind_rustls(server_addr, config) - .handle(handle.clone()) - .serve(app.into_make_service()) - .await - .map_err(io::Error::other)?; - - info!("HTTPS server running on https://{}", server_addr); - - Ok(()) - } - Err(e) => { - error!("Failed to create TLS config: {}", e); - start_http_server(server_addr, app, handle).await - } - } - } else { - info!("TLS certificates not found at {} and {}", key_path, cert_path); - start_http_server(server_addr, app, handle).await - } -} - -#[allow(dead_code)] -/// 308 redirect for HTTP to HTTPS -fn redirect_to_https(https_port: u16) -> Router { - Router::new().route( - "/*path", - get({ - move |uri: Uri, req: http::Request| async move { - let host = req - .headers() - .get("host") - .map_or("localhost", |h| h.to_str().unwrap_or("localhost")); - let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or(""); - let https_url = format!("https://{host}:{https_port}{path}"); - Redirect::permanent(&https_url) - } - }), - ) -} - -async fn start_http_server(addr: SocketAddr, app: Router, handle: axum_server::Handle) -> io::Result<()> { - debug!("Starting HTTP server..."); - axum_server::bind(addr) - .handle(handle) - .serve(app.into_make_service()) - .await - .map_err(io::Error::other) -} - -async fn shutdown_signal() { - let ctrl_c = async { - signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); - }; - - #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - tokio::select! { - _ = ctrl_c => { - info!("shutdown_signal ctrl_c") - }, - _ = terminate => { - info!("shutdown_signal terminate") - }, - } -} diff --git a/rustfs/src/event.rs b/rustfs/src/event.rs deleted file mode 100644 index 981b86cd..00000000 --- a/rustfs/src/event.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use rustfs_config::DEFAULT_DELIMITER; -use rustfs_ecstore::config::GLOBAL_ServerConfig; -use tracing::{error, info, instrument}; - -#[instrument] -pub(crate) async fn init_event_notifier() { - info!("Initializing event notifier..."); - - // 1. Get the global configuration loaded by ecstore - let server_config = match GLOBAL_ServerConfig.get() { - Some(config) => config.clone(), // Clone the config to pass ownership - None => { - error!("Event notifier initialization failed: Global server config not loaded."); - return; - } - }; - - info!("Global server configuration loaded successfully. config: {:?}", server_config); - // 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't - if server_config - .get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER) - .is_none() - || server_config - .get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER) - .is_none() - { - info!("'notify' subsystem not configured, skipping event notifier initialization."); - return; - } - - info!("Event notifier configuration found, proceeding with initialization."); - - // 3. Initialize the notification system asynchronously with a global configuration - // Put it into a separate task to avoid blocking the main initialization process - tokio::spawn(async move { - if let Err(e) = rustfs_notify::initialize(server_config).await { - error!("Failed to initialize event notifier system: {}", e); - } else { - info!("Event notifier system initialized successfully."); - } - }); -} - -/// Shuts down the event notifier system gracefully -pub async fn shutdown_event_notifier() { - info!("Shutting down event notifier system..."); - - if !rustfs_notify::is_notification_system_initialized() { - info!("Event notifier system is not initialized, nothing to shut down."); - return; - } - - let system = match rustfs_notify::notification_system() { - Some(sys) => sys, - None => { - error!("Event notifier system is not initialized."); - return; - } - }; - - // Call the shutdown function from the rustfs_notify module - system.shutdown().await; - info!("Event notifier system shut down successfully."); -} diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 07ac1879..1a0a9c39 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -15,42 +15,29 @@ mod admin; mod auth; mod config; -mod console; mod error; -mod event; // mod grpc; pub mod license; mod logging; mod server; -mod service; mod storage; -mod update_checker; +mod update; +mod version; -use crate::auth::IAMAuth; -use crate::console::{CONSOLE_CONFIG, init_console_cfg}; +use rustfs_config::DEFAULT_DELIMITER; +use rustfs_ecstore::config::GLOBAL_ServerConfig; // Ensure the correct path for parse_license is imported -use crate::event::shutdown_event_notifier; -use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown}; -use bytes::Bytes; +use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, start_http_server, wait_for_shutdown}; use chrono::Datelike; use clap::Parser; -use http::{HeaderMap, Request as HttpRequest, Response}; -use hyper_util::server::graceful::GracefulShutdown; -use hyper_util::{ - rt::{TokioExecutor, TokioIo}, - server::conn::auto::Builder as ConnBuilder, - service::TowerToHyperService, -}; use license::init_license; use rustfs_ahm::scanner::data_scanner::ScannerConfig; use rustfs_ahm::{Scanner, create_ahm_services_cancel_token, shutdown_ahm_services}; use rustfs_common::globals::set_global_addr; -use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys; use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool; use rustfs_ecstore::config as ecconfig; use rustfs_ecstore::config::GLOBAL_ConfigSys; -use rustfs_ecstore::rpc::make_server; use rustfs_ecstore::store_api::BucketOptions; use rustfs_ecstore::{ StorageAPI, @@ -63,56 +50,25 @@ use rustfs_ecstore::{ update_erasure_type, }; use rustfs_iam::init_iam_sys; -use rustfs_obs::{SystemObserver, init_obs, set_global_guard}; -use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; +use rustfs_obs::{init_obs, set_global_guard}; use rustfs_utils::net::parse_and_resolve_address; -use rustls::ServerConfig; -use s3s::service::S3Service; -use s3s::{host::MultiDomain, service::S3ServiceBuilder}; -use service::hybrid; -use socket2::SockRef; use std::io::{Error, Result}; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; -use tokio::net::{TcpListener, TcpStream}; -#[cfg(unix)] -use tokio::signal::unix::{SignalKind, signal}; -use tokio_rustls::TlsAcceptor; -use tonic::{Request, Status, metadata::MetadataValue}; -use tower::ServiceBuilder; -use tower_http::catch_panic::CatchPanicLayer; -use tower_http::cors::CorsLayer; -use tower_http::trace::TraceLayer; -use tracing::{Span, debug, error, info, instrument, warn}; - -const MI_B: usize = 1024 * 1024; +use tracing::{debug, error, info, instrument, warn}; #[cfg(all(target_os = "linux", target_env = "gnu"))] #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; -#[allow(clippy::result_large_err)] -fn check_auth(req: Request<()>) -> std::result::Result, Status> { - let token: MetadataValue<_> = "rustfs rpc".parse().unwrap(); - - match req.metadata().get("authorization") { - Some(t) if token == t => Ok(req), - _ => Err(Status::unauthenticated("No valid auth token")), - } -} - #[instrument] fn print_server_info() { - let cfg = CONSOLE_CONFIG.get().unwrap(); let current_year = chrono::Utc::now().year(); // Use custom macros to print server information info!("RustFS Object Storage Server"); info!("Copyright: 2024-{} RustFS, Inc", current_year); - info!("License: {}", cfg.license()); - info!("Version: {}", cfg.version_info()); - info!("Docs: {}", cfg.doc()); + info!("License: Apache-2.0 https://www.apache.org/licenses/LICENSE-2.0"); + info!("Version: {}", version::get_version()); + info!("Docs: https://rustfs.com/docs/"); } #[tokio::main] @@ -133,55 +89,12 @@ async fn main() -> Result<()> { run(opt).await } -/// Sets up the TLS acceptor if certificates are available. -#[instrument(skip(tls_path))] -async fn setup_tls_acceptor(tls_path: &str) -> Result> { - if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() { - debug!("TLS path is not provided or does not exist, starting with HTTP"); - return Ok(None); - } - - debug!("Found TLS directory, checking for certificates"); - - // 1. Try to load all certificates from the directory (multi-cert support) - if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) { - if !cert_key_pairs.is_empty() { - debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len()); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let mut server_config = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?)); - server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; - return Ok(Some(TlsAcceptor::from(Arc::new(server_config)))); - } - } - - // 2. Fallback to legacy single certificate mode - let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}"); - let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}"); - if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() { - debug!("Found legacy single TLS certificate, starting with HTTPS"); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; - let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; - let mut server_config = ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, key) - .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; - server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; - return Ok(Some(TlsAcceptor::from(Arc::new(server_config)))); - } - - debug!("No valid TLS certificates found in the directory, starting with HTTP"); - Ok(None) -} - #[instrument(skip(opt))] async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); - if let Some(region) = opt.region { - rustfs_ecstore::global::set_global_region(region); + if let Some(region) = &opt.region { + rustfs_ecstore::global::set_global_region(region.clone()); } let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?; @@ -195,21 +108,7 @@ async fn run(opt: config::Opt) -> Result<()> { set_global_rustfs_port(server_port); - // The listening address and port are obtained from the parameters - let listener = TcpListener::bind(server_address.clone()).await?; - // Obtain the listener address - let local_addr: SocketAddr = listener.local_addr()?; - debug!("Listening on: {}", local_addr); - let local_ip = match rustfs_utils::get_local_ip() { - Some(ip) => { - debug!("Obtained local IP address: {}", ip); - ip - } - None => { - warn!("Unable to obtain local IP address, using fallback IP: {}", local_addr.ip()); - local_addr.ip() - } - }; + set_global_addr(&opt.address).await; // For RPC let (endpoint_pools, setup_type) = @@ -228,19 +127,6 @@ async fn run(opt: config::Opt) -> Result<()> { } } - // Detailed endpoint information (showing all API endpoints) - let api_endpoints = format!("http://{local_ip}:{server_port}"); - let localhost_endpoint = format!("http://127.0.0.1:{server_port}"); - info!(" API: {} {}", api_endpoints, localhost_endpoint); - info!(" RootUser: {}", opt.access_key.clone()); - info!(" RootPass: {}", opt.secret_key.clone()); - if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) { - warn!( - "Detected default credentials '{}:{}', we recommend that you change these values with 'RUSTFS_ACCESS_KEY' and 'RUSTFS_SECRET_KEY' environment variables", - DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY - ); - } - for (i, eps) in endpoint_pools.as_ref().iter().enumerate() { info!( "created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}", @@ -252,7 +138,11 @@ async fn run(opt: config::Opt) -> Result<()> { } } - set_global_addr(&opt.address).await; + let state_manager = ServiceStateManager::new(); + // Update service status to Starting + state_manager.update(ServiceState::Starting); + + let shutdown_tx = start_http_server(&opt, state_manager.clone()).await?; set_global_endpoints(endpoint_pools.as_ref().clone()); update_erasure_type(setup_type).await; @@ -260,163 +150,6 @@ async fn run(opt: config::Opt) -> Result<()> { // Initialize the local disk init_local_disks(endpoint_pools.clone()).await.map_err(Error::other)?; - // Setup S3 service - // This project uses the S3S library to implement S3 services - let s3_service = { - let store = storage::ecfs::FS::new(); - let mut b = S3ServiceBuilder::new(store.clone()); - - let access_key = opt.access_key.clone(); - let secret_key = opt.secret_key.clone(); - debug!("authentication is enabled {}, {}", &access_key, &secret_key); - - b.set_auth(IAMAuth::new(access_key, secret_key)); - b.set_access(store.clone()); - b.set_route(admin::make_admin_route()?); - - if !opt.server_domains.is_empty() { - info!("virtual-hosted-style requests are enabled use domain_name {:?}", &opt.server_domains); - b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?); - } - - b.build() - }; - - tokio::spawn(async move { - // Record the PID-related metrics of the current process - let meter = opentelemetry::global::meter("system"); - let obs_result = SystemObserver::init_process_observer(meter).await; - match obs_result { - Ok(_) => { - info!("Process observer initialized successfully"); - } - Err(e) => { - error!("Failed to initialize process observer: {}", e); - } - } - }); - - let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?; - - let state_manager = ServiceStateManager::new(); - let worker_state_manager = state_manager.clone(); - // Update service status to Starting - state_manager.update(ServiceState::Starting); - - // Create shutdown channel - let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1); - let shutdown_tx_clone = shutdown_tx.clone(); - - tokio::spawn(async move { - #[cfg(unix)] - let (mut sigterm_inner, mut sigint_inner) = { - // Unix platform specific code - let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler"); - let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler"); - (sigterm_inner, sigint_inner) - }; - - let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new())); - let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); - let graceful = Arc::new(GracefulShutdown::new()); - debug!("graceful initiated"); - - // service ready - worker_state_manager.update(ServiceState::Ready); - let tls_acceptor = tls_acceptor.map(Arc::new); - - loop { - debug!("Waiting for new connection..."); - let (socket, _) = { - #[cfg(unix)] - { - tokio::select! { - res = listener.accept() => match res { - Ok(conn) => conn, - Err(err) => { - error!("error accepting connection: {err}"); - continue; - } - }, - _ = ctrl_c.as_mut() => { - info!("Ctrl-C received in worker thread"); - let _ = shutdown_tx_clone.send(()); - break; - }, - Some(_) = sigint_inner.recv() => { - info!("SIGINT received in worker thread"); - let _ = shutdown_tx_clone.send(()); - break; - }, - Some(_) = sigterm_inner.recv() => { - info!("SIGTERM received in worker thread"); - let _ = shutdown_tx_clone.send(()); - break; - }, - _ = shutdown_rx.recv() => { - info!("Shutdown signal received in worker thread"); - break; - } - } - } - #[cfg(not(unix))] - { - tokio::select! { - res = listener.accept() => match res { - Ok(conn) => conn, - Err(err) => { - error!("error accepting connection: {err}"); - continue; - } - }, - _ = ctrl_c.as_mut() => { - info!("Ctrl-C received in worker thread"); - let _ = shutdown_tx_clone.send(()); - break; - }, - _ = shutdown_rx.recv() => { - info!("Shutdown signal received in worker thread"); - break; - } - } - } - }; - - let socket_ref = SockRef::from(&socket); - if let Err(err) = socket_ref.set_nodelay(true) { - warn!(?err, "Failed to set TCP_NODELAY"); - } - if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) { - warn!(?err, "Failed to set set_recv_buffer_size"); - } - if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) { - warn!(?err, "Failed to set set_send_buffer_size"); - } - - process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone()); - } - - worker_state_manager.update(ServiceState::Stopping); - match Arc::try_unwrap(graceful) { - Ok(g) => { - tokio::select! { - () = g.shutdown() => { - debug!("Gracefully shutdown!"); - }, - () = tokio::time::sleep(Duration::from_secs(10)) => { - debug!("Waited 10 seconds for graceful shutdown, aborting..."); - } - } - } - Err(arc_graceful) => { - error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful); - tokio::time::sleep(Duration::from_secs(10)).await; - debug!("Timeout reached, forcing shutdown"); - } - } - worker_state_manager.update(ServiceState::Stopped); - }); - // init store let store = ECStore::new(server_addr, endpoint_pools.clone()).await.inspect_err(|err| { error!("ECStore::new {:?}", err); @@ -427,7 +160,7 @@ async fn run(opt: config::Opt) -> Result<()> { GLOBAL_ConfigSys.init(store.clone()).await?; // Initialize event notifier - event::init_event_notifier().await; + init_event_notifier().await; let buckets_list = store .list_bucket(&BucketOptions { @@ -455,15 +188,12 @@ async fn run(opt: config::Opt) -> Result<()> { let _ = create_ahm_services_cancel_token(); let scanner = Scanner::new(Some(ScannerConfig::default())); scanner.start().await?; - - // init console configuration - init_console_cfg(local_ip, server_port); print_server_info(); init_bucket_replication_pool().await; // Async update check (optional) tokio::spawn(async { - use crate::update_checker::{UpdateCheckError, check_updates}; + use crate::update::{UpdateCheckError, check_updates}; match check_updates().await { Ok(result) => { @@ -493,23 +223,6 @@ async fn run(opt: config::Opt) -> Result<()> { } }); - if opt.console_enable { - debug!("console is enabled"); - let access_key = opt.access_key.clone(); - let secret_key = opt.secret_key.clone(); - let console_address = opt.console_address.clone(); - let tls_path = opt.tls_path.clone(); - - if console_address.is_empty() { - error!("console_address is empty"); - return Err(Error::other("console_address is empty".to_string())); - } - - tokio::spawn(async move { - console::start_static_file_server(&console_address, local_ip, &access_key, &secret_key, tls_path).await; - }); - } - // Perform hibernation for 1 second tokio::time::sleep(SHUTDOWN_TIMEOUT).await; // listen to the shutdown signal @@ -528,103 +241,6 @@ async fn run(opt: config::Opt) -> Result<()> { Ok(()) } -/// Process a single incoming TCP connection. -/// -/// This function is executed in a new Tokio task and it will: -/// 1. If TLS is configured, perform TLS handshake. -/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware. -/// 3. Use Hyper to handle HTTP requests on this connection. -/// 4. Incorporate connections into the management of elegant closures. -#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))] -fn process_connection( - socket: TcpStream, - tls_acceptor: Option>, - http_server: Arc>, - s3_service: S3Service, - graceful: Arc, -) { - tokio::spawn(async move { - // Build services inside each connected task to avoid passing complex service types across tasks, - // It also ensures that each connection has an independent service instance. - let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); - let hybrid_service = ServiceBuilder::new() - .layer(CatchPanicLayer::new()) - .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &HttpRequest<_>| { - let span = tracing::info_span!("http-request", - status_code = tracing::field::Empty, - method = %request.method(), - uri = %request.uri(), - version = ?request.version(), - ); - for (header_name, header_value) in request.headers() { - if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" { - span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); - } - } - - span - }) - .on_request(|request: &HttpRequest<_>, _span: &Span| { - info!( - counter.rustfs_api_requests_total = 1_u64, - key_request_method = %request.method().to_string(), - key_request_uri_path = %request.uri().path().to_owned(), - "handle request api total", - ); - debug!("http started method: {}, url path: {}", request.method(), request.uri().path()) - }) - .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { - _span.record("http response status_code", tracing::field::display(response.status())); - debug!("http response generated in {:?}", latency) - }) - .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { - info!(histogram.request.body.len = chunk.len(), "histogram request body length",); - debug!("http body sending {} bytes in {:?}", chunk.len(), latency) - }) - .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { - debug!("http stream closed after {:?}", stream_duration) - }) - .on_failure(|_error, latency: Duration, _span: &Span| { - info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); - debug!("http request failure error: {:?} in {:?}", _error, latency) - }), - ) - .layer(CorsLayer::permissive()) - .service(hybrid(s3_service, rpc_service)); - let hybrid_service = TowerToHyperService::new(hybrid_service); - - // Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor - if let Some(acceptor) = tls_acceptor { - debug!("TLS handshake start"); - match acceptor.accept(socket).await { - Ok(tls_socket) => { - debug!("TLS handshake successful"); - let stream = TokioIo::new(tls_socket); - let conn = http_server.serve_connection(stream, hybrid_service); - if let Err(err) = graceful.watch(conn).await { - handle_connection_error(&*err); - } - } - Err(err) => { - error!(?err, "TLS handshake failed"); - return; // Failed to end the task directly - } - } - debug!("TLS handshake success"); - } else { - debug!("Http handshake start"); - let stream = TokioIo::new(socket); - let conn = http_server.serve_connection(stream, hybrid_service); - if let Err(err) = graceful.watch(conn).await { - handle_connection_error(&*err); - } - debug!("Http handshake success"); - }; - }); -} - /// Handles the shutdown process of the server async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) { info!("Shutdown signal received in main thread"); @@ -653,25 +269,63 @@ async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &toki info!("Server stopped current "); } -/// Handles connection errors by logging them with appropriate severity -fn handle_connection_error(err: &(dyn std::error::Error + 'static)) { - if let Some(hyper_err) = err.downcast_ref::() { - if hyper_err.is_incomplete_message() { - warn!("The HTTP connection is closed prematurely and the message is not completed:{}", hyper_err); - } else if hyper_err.is_closed() { - warn!("The HTTP connection is closed:{}", hyper_err); - } else if hyper_err.is_parse() { - error!("HTTP message parsing failed:{}", hyper_err); - } else if hyper_err.is_user() { - error!("HTTP user-custom error:{}", hyper_err); - } else if hyper_err.is_canceled() { - warn!("The HTTP connection is canceled:{}", hyper_err); - } else { - error!("Unknown hyper error:{:?}", hyper_err); +#[instrument] +pub(crate) async fn init_event_notifier() { + info!("Initializing event notifier..."); + + // 1. Get the global configuration loaded by ecstore + let server_config = match GLOBAL_ServerConfig.get() { + Some(config) => config.clone(), // Clone the config to pass ownership + None => { + error!("Event notifier initialization failed: Global server config not loaded."); + return; } - } else if let Some(io_err) = err.downcast_ref::() { - error!("Unknown connection IO error:{}", io_err); - } else { - error!("Unknown connection error type:{:?}", err); + }; + + info!("Global server configuration loaded successfully. config: {:?}", server_config); + // 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't + if server_config + .get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER) + .is_none() + || server_config + .get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER) + .is_none() + { + info!("'notify' subsystem not configured, skipping event notifier initialization."); + return; } + + info!("Event notifier configuration found, proceeding with initialization."); + + // 3. Initialize the notification system asynchronously with a global configuration + // Put it into a separate task to avoid blocking the main initialization process + tokio::spawn(async move { + if let Err(e) = rustfs_notify::initialize(server_config).await { + error!("Failed to initialize event notifier system: {}", e); + } else { + info!("Event notifier system initialized successfully."); + } + }); +} + +/// Shuts down the event notifier system gracefully +pub async fn shutdown_event_notifier() { + info!("Shutting down event notifier system..."); + + if !rustfs_notify::is_notification_system_initialized() { + info!("Event notifier system is not initialized, nothing to shut down."); + return; + } + + let system = match rustfs_notify::notification_system() { + Some(sys) => sys, + None => { + error!("Event notifier system is not initialized."); + return; + } + }; + + // Call the shutdown function from the rustfs_notify module + system.shutdown().await; + info!("Event notifier system shut down successfully."); } diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs new file mode 100644 index 00000000..a85be294 --- /dev/null +++ b/rustfs/src/server/http.rs @@ -0,0 +1,413 @@ +// use crate::admin::console::{CONSOLE_CONFIG, init_console_cfg}; +use crate::auth::IAMAuth; +// Ensure the correct path for parse_license is imported +use crate::admin; +use crate::config; +use crate::server::hybrid::hybrid; +use crate::server::{ServiceState, ServiceStateManager}; +use crate::storage; +use bytes::Bytes; +use http::{HeaderMap, Request as HttpRequest, Response}; +use hyper_util::server::graceful::GracefulShutdown; +use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server::conn::auto::Builder as ConnBuilder, + service::TowerToHyperService, +}; +use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +use rustfs_ecstore::rpc::make_server; +use rustfs_obs::SystemObserver; +use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; +use rustfs_utils::net::parse_and_resolve_address; +use rustls::ServerConfig; +use s3s::service::S3Service; +use s3s::{host::MultiDomain, service::S3ServiceBuilder}; +use socket2::SockRef; +use std::io::{Error, Result}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::{TcpListener, TcpStream}; +#[cfg(unix)] +use tokio::signal::unix::{SignalKind, signal}; +use tokio_rustls::TlsAcceptor; +use tonic::{Request, Status, metadata::MetadataValue}; +use tower::ServiceBuilder; +use tower_http::catch_panic::CatchPanicLayer; +use tower_http::cors::CorsLayer; +use tower_http::trace::TraceLayer; +use tracing::{Span, debug, error, info, instrument, warn}; + +const MI_B: usize = 1024 * 1024; + +pub async fn start_http_server( + opt: &config::Opt, + worker_state_manager: ServiceStateManager, +) -> std::io::Result> { + let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?; + let server_port = server_addr.port(); + let server_address = server_addr.to_string(); + + // The listening address and port are obtained from the parameters + let listener = TcpListener::bind(server_address.clone()).await?; + // Obtain the listener address + let local_addr: SocketAddr = listener.local_addr()?; + debug!("Listening on: {}", local_addr); + let local_ip = match rustfs_utils::get_local_ip() { + Some(ip) => { + debug!("Obtained local IP address: {}", ip); + ip + } + None => { + warn!("Unable to obtain local IP address, using fallback IP: {}", local_addr.ip()); + local_addr.ip() + } + }; + + // Detailed endpoint information (showing all API endpoints) + let api_endpoints = format!("http://{local_ip}:{server_port}"); + let localhost_endpoint = format!("http://127.0.0.1:{server_port}"); + info!(" API: {} {}", api_endpoints, localhost_endpoint); + if opt.console_enable { + info!( + " WebUI: http://{}:{}/rustfs/console/index.html http://127.0.0.1:{}/rustfs/console/index.html http://{}/rustfs/console/index.html", + local_ip, server_port, server_port, server_address + ); + } + info!(" RootUser: {}", opt.access_key.clone()); + info!(" RootPass: {}", opt.secret_key.clone()); + if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) { + warn!( + "Detected default credentials '{}:{}', we recommend that you change these values with 'RUSTFS_ACCESS_KEY' and 'RUSTFS_SECRET_KEY' environment variables", + DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY + ); + } + + // Setup S3 service + // This project uses the S3S library to implement S3 services + let s3_service = { + let store = storage::ecfs::FS::new(); + let mut b = S3ServiceBuilder::new(store.clone()); + + let access_key = opt.access_key.clone(); + let secret_key = opt.secret_key.clone(); + debug!("authentication is enabled {}, {}", &access_key, &secret_key); + + b.set_auth(IAMAuth::new(access_key, secret_key)); + b.set_access(store.clone()); + b.set_route(admin::make_admin_route(opt.console_enable)?); + + if !opt.server_domains.is_empty() { + info!("virtual-hosted-style requests are enabled use domain_name {:?}", &opt.server_domains); + b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?); + } + + b.build() + }; + + tokio::spawn(async move { + // Record the PID-related metrics of the current process + let meter = opentelemetry::global::meter("system"); + let obs_result = SystemObserver::init_process_observer(meter).await; + match obs_result { + Ok(_) => { + info!("Process observer initialized successfully"); + } + Err(e) => { + error!("Failed to initialize process observer: {}", e); + } + } + }); + + let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?; + // Create shutdown channel + let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1); + let shutdown_tx_clone = shutdown_tx.clone(); + + tokio::spawn(async move { + #[cfg(unix)] + let (mut sigterm_inner, mut sigint_inner) = { + // Unix platform specific code + let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler"); + let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler"); + (sigterm_inner, sigint_inner) + }; + + let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new())); + let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); + let graceful = Arc::new(GracefulShutdown::new()); + debug!("graceful initiated"); + + // service ready + worker_state_manager.update(ServiceState::Ready); + let tls_acceptor = tls_acceptor.map(Arc::new); + + loop { + debug!("Waiting for new connection..."); + let (socket, _) = { + #[cfg(unix)] + { + tokio::select! { + res = listener.accept() => match res { + Ok(conn) => conn, + Err(err) => { + error!("error accepting connection: {err}"); + continue; + } + }, + _ = ctrl_c.as_mut() => { + info!("Ctrl-C received in worker thread"); + let _ = shutdown_tx_clone.send(()); + break; + }, + Some(_) = sigint_inner.recv() => { + info!("SIGINT received in worker thread"); + let _ = shutdown_tx_clone.send(()); + break; + }, + Some(_) = sigterm_inner.recv() => { + info!("SIGTERM received in worker thread"); + let _ = shutdown_tx_clone.send(()); + break; + }, + _ = shutdown_rx.recv() => { + info!("Shutdown signal received in worker thread"); + break; + } + } + } + #[cfg(not(unix))] + { + tokio::select! { + res = listener.accept() => match res { + Ok(conn) => conn, + Err(err) => { + error!("error accepting connection: {err}"); + continue; + } + }, + _ = ctrl_c.as_mut() => { + info!("Ctrl-C received in worker thread"); + let _ = shutdown_tx_clone.send(()); + break; + }, + _ = shutdown_rx.recv() => { + info!("Shutdown signal received in worker thread"); + break; + } + } + } + }; + + let socket_ref = SockRef::from(&socket); + if let Err(err) = socket_ref.set_nodelay(true) { + warn!(?err, "Failed to set TCP_NODELAY"); + } + if let Err(err) = socket_ref.set_recv_buffer_size(4 * MI_B) { + warn!(?err, "Failed to set set_recv_buffer_size"); + } + if let Err(err) = socket_ref.set_send_buffer_size(4 * MI_B) { + warn!(?err, "Failed to set set_send_buffer_size"); + } + + process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone()); + } + + worker_state_manager.update(ServiceState::Stopping); + match Arc::try_unwrap(graceful) { + Ok(g) => { + tokio::select! { + () = g.shutdown() => { + debug!("Gracefully shutdown!"); + }, + () = tokio::time::sleep(Duration::from_secs(10)) => { + debug!("Waited 10 seconds for graceful shutdown, aborting..."); + } + } + } + Err(arc_graceful) => { + error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful); + tokio::time::sleep(Duration::from_secs(10)).await; + debug!("Timeout reached, forcing shutdown"); + } + } + worker_state_manager.update(ServiceState::Stopped); + }); + + Ok(shutdown_tx) +} + +/// Sets up the TLS acceptor if certificates are available. +#[instrument(skip(tls_path))] +async fn setup_tls_acceptor(tls_path: &str) -> Result> { + if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() { + debug!("TLS path is not provided or does not exist, starting with HTTP"); + return Ok(None); + } + + debug!("Found TLS directory, checking for certificates"); + + // 1. Try to load all certificates from the directory (multi-cert support) + if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) { + if !cert_key_pairs.is_empty() { + debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len()); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?)); + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + return Ok(Some(TlsAcceptor::from(Arc::new(server_config)))); + } + } + + // 2. Fallback to legacy single certificate mode + let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}"); + let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}"); + if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() { + debug!("Found legacy single TLS certificate, starting with HTTPS"); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key) + .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + return Ok(Some(TlsAcceptor::from(Arc::new(server_config)))); + } + + debug!("No valid TLS certificates found in the directory, starting with HTTP"); + Ok(None) +} + +/// Process a single incoming TCP connection. +/// +/// This function is executed in a new Tokio task and it will: +/// 1. If TLS is configured, perform TLS handshake. +/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware. +/// 3. Use Hyper to handle HTTP requests on this connection. +/// 4. Incorporate connections into the management of elegant closures. +#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))] +fn process_connection( + socket: TcpStream, + tls_acceptor: Option>, + http_server: Arc>, + s3_service: S3Service, + graceful: Arc, +) { + tokio::spawn(async move { + // Build services inside each connected task to avoid passing complex service types across tasks, + // It also ensures that each connection has an independent service instance. + let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); + let service = hybrid(s3_service, rpc_service); + + let hybrid_service = ServiceBuilder::new() + .layer(CatchPanicLayer::new()) + .layer( + TraceLayer::new_for_http() + .make_span_with(|request: &HttpRequest<_>| { + let span = tracing::info_span!("http-request", + status_code = tracing::field::Empty, + method = %request.method(), + uri = %request.uri(), + version = ?request.version(), + ); + for (header_name, header_value) in request.headers() { + if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" { + span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); + } + } + + span + }) + .on_request(|request: &HttpRequest<_>, _span: &Span| { + info!( + counter.rustfs_api_requests_total = 1_u64, + key_request_method = %request.method().to_string(), + key_request_uri_path = %request.uri().path().to_owned(), + "handle request api total", + ); + debug!("http started method: {}, url path: {}", request.method(), request.uri().path()) + }) + .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { + _span.record("http response status_code", tracing::field::display(response.status())); + debug!("http response generated in {:?}", latency) + }) + .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { + info!(histogram.request.body.len = chunk.len(), "histogram request body length",); + debug!("http body sending {} bytes in {:?}", chunk.len(), latency) + }) + .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { + debug!("http stream closed after {:?}", stream_duration) + }) + .on_failure(|_error, latency: Duration, _span: &Span| { + info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); + debug!("http request failure error: {:?} in {:?}", _error, latency) + }), + ) + .layer(CorsLayer::permissive()) + .service(service); + let hybrid_service = TowerToHyperService::new(hybrid_service); + + // Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor + if let Some(acceptor) = tls_acceptor { + debug!("TLS handshake start"); + match acceptor.accept(socket).await { + Ok(tls_socket) => { + debug!("TLS handshake successful"); + let stream = TokioIo::new(tls_socket); + let conn = http_server.serve_connection(stream, hybrid_service); + if let Err(err) = graceful.watch(conn).await { + handle_connection_error(&*err); + } + } + Err(err) => { + error!(?err, "TLS handshake failed"); + return; // Failed to end the task directly + } + } + debug!("TLS handshake success"); + } else { + debug!("Http handshake start"); + let stream = TokioIo::new(socket); + let conn = http_server.serve_connection(stream, hybrid_service); + if let Err(err) = graceful.watch(conn).await { + handle_connection_error(&*err); + } + debug!("Http handshake success"); + }; + }); +} + +/// Handles connection errors by logging them with appropriate severity +fn handle_connection_error(err: &(dyn std::error::Error + 'static)) { + if let Some(hyper_err) = err.downcast_ref::() { + if hyper_err.is_incomplete_message() { + warn!("The HTTP connection is closed prematurely and the message is not completed:{}", hyper_err); + } else if hyper_err.is_closed() { + warn!("The HTTP connection is closed:{}", hyper_err); + } else if hyper_err.is_parse() { + error!("HTTP message parsing failed:{}", hyper_err); + } else if hyper_err.is_user() { + error!("HTTP user-custom error:{}", hyper_err); + } else if hyper_err.is_canceled() { + warn!("The HTTP connection is canceled:{}", hyper_err); + } else { + error!("Unknown hyper error:{:?}", hyper_err); + } + } else if let Some(io_err) = err.downcast_ref::() { + error!("Unknown connection IO error:{}", io_err); + } else { + error!("Unknown connection error type:{:?}", err); + } +} + +#[allow(clippy::result_large_err)] +fn check_auth(req: Request<()>) -> std::result::Result, Status> { + let token: MetadataValue<_> = "rustfs rpc".parse().unwrap(); + + match req.metadata().get("authorization") { + Some(t) if token == t => Ok(req), + _ => Err(Status::unauthenticated("No valid auth token")), + } +} diff --git a/rustfs/src/service.rs b/rustfs/src/server/hybrid.rs similarity index 100% rename from rustfs/src/service.rs rename to rustfs/src/server/hybrid.rs diff --git a/rustfs/src/server/mod.rs b/rustfs/src/server/mod.rs index 1309ad1d..7663755a 100644 --- a/rustfs/src/server/mod.rs +++ b/rustfs/src/server/mod.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod http; +mod hybrid; mod service_state; +pub(crate) use http::start_http_server; pub(crate) use service_state::SHUTDOWN_TIMEOUT; pub(crate) use service_state::ServiceState; pub(crate) use service_state::ServiceStateManager; diff --git a/rustfs/src/update_checker.rs b/rustfs/src/update.rs similarity index 95% rename from rustfs/src/update_checker.rs rename to rustfs/src/update.rs index 19cd0d99..1d145043 100644 --- a/rustfs/src/update_checker.rs +++ b/rustfs/src/update.rs @@ -17,6 +17,8 @@ use std::time::Duration; use thiserror::Error; use tracing::{debug, error, info, warn}; +use crate::version; + /// Update check related errors #[derive(Error, Debug)] pub enum UpdateCheckError { @@ -231,25 +233,7 @@ impl VersionChecker { /// Get current version number pub fn get_current_version() -> String { - use crate::console::CONSOLE_CONFIG; - - if let Some(config) = CONSOLE_CONFIG.get() { - // Extract version from configuration - let version_str = config.version(); - // Extract version part, removing extra information - if let Some(release_part) = version_str.split_whitespace().next() { - if release_part.starts_with("RELEASE.") { - release_part.trim_start_matches("RELEASE.").to_string() - } else { - release_part.to_string() - } - } else { - rustfs_config::VERSION.to_string() - } - } else { - // If configuration is not initialized, use constant version - rustfs_config::VERSION.to_string() - } + version::get_version() } /// Convenience function for async update checking diff --git a/rustfs/src/version.rs b/rustfs/src/version.rs new file mode 100644 index 00000000..9002a51a --- /dev/null +++ b/rustfs/src/version.rs @@ -0,0 +1,13 @@ +use shadow_rs::shadow; +shadow!(build); + +#[allow(clippy::const_is_empty)] +pub fn get_version() -> String { + if !build::TAG.is_empty() { + build::TAG.to_string() + } else if !build::SHORT_COMMIT.is_empty() { + format!("@{}", build::SHORT_COMMIT) + } else { + build::PKG_VERSION.to_string() + } +}