diff --git a/Cargo.lock b/Cargo.lock index 68b1892c..d3ddf94e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -669,6 +669,17 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomic_enum" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -7133,6 +7144,7 @@ dependencies = [ "appauth", "async-trait", "atoi", + "atomic_enum", "axum", "axum-extra", "axum-server", @@ -7279,9 +7291,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.25" +version = "0.23.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c" +checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" dependencies = [ "aws-lc-rs", "log", diff --git a/Cargo.toml b/Cargo.toml index 9f1b721c..70c8e39d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ madmin = { path = "./madmin" } atoi = "2.0.0" async-recursion = "1.0.5" async-trait = "0.1.87" +atomic_enum = "0.3.0" axum = "0.8.3" axum-extra = "0.10.1" axum-server = { version = "0.7.2", features = ["tls-rustls"] } @@ -98,7 +99,7 @@ rmp = "0.8.14" rmp-serde = "1.3.0" rustfs-obs = { path = "crates/obs", version = "0.0.1" } rust-embed = "8.6.0" -rustls = { version = "0.23" } +rustls = { version = "0.23.26" } rustls-pki-types = "1.11.0" rustls-pemfile = "2.2.0" s3s = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779", default-features = true, features = [ diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 4106834e..c7d3ff7d 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -1,5 +1,5 @@ -use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION}; -use config::{Config, File, FileFormat}; +use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT}; +use config::{Config, Environment, File, FileFormat}; use serde::Deserialize; use std::env; @@ -22,18 +22,44 @@ pub struct OtelConfig { pub logger_level: Option, } +// 辅助函数:从环境变量中提取可观测性配置 +fn extract_otel_config_from_env() -> OtelConfig { + OtelConfig { + endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()), + use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(USE_STDOUT)), + sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SAMPLE_RATIO)), + meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(METER_INTERVAL)), + service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SERVICE_NAME.to_string())), + service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SERVICE_VERSION.to_string())), + environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(ENVIRONMENT.to_string())), + logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(LOGGER_LEVEL.to_string())), + } +} + impl Default for OtelConfig { fn default() -> Self { - OtelConfig { - endpoint: "".to_string(), - use_stdout: Some(true), - sample_ratio: Some(SAMPLE_RATIO), - meter_interval: Some(METER_INTERVAL), - service_name: Some(SERVICE_NAME.to_string()), - service_version: Some(SERVICE_VERSION.to_string()), - environment: Some(ENVIRONMENT.to_string()), - logger_level: Some(LOGGER_LEVEL.to_string()), - } + extract_otel_config_from_env() } } @@ -69,14 +95,18 @@ pub struct FileSinkConfig { impl FileSinkConfig { pub fn get_default_log_path() -> String { - let temp_dir = env::temp_dir().join("rustfs").join("logs"); + let temp_dir = env::temp_dir().join("rustfs"); if let Err(e) = std::fs::create_dir_all(&temp_dir) { eprintln!("Failed to create log directory: {}", e); - return "logs/app.log".to_string(); + return "rustfs/rustfs.log".to_string(); } - temp_dir.join("app.log").to_str().unwrap_or("logs/app.log").to_string() + temp_dir + .join("rustfs.log") + .to_str() + .unwrap_or("rustfs/rustfs.log") + .to_string() } } @@ -84,7 +114,10 @@ impl Default for FileSinkConfig { fn default() -> Self { FileSinkConfig { enabled: true, - path: Self::get_default_log_path(), + path: env::var("RUSTFS_SINKS_FILE_PATH") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| Self::get_default_log_path()), buffer_size: Some(8192), flush_interval_ms: Some(1000), flush_threshold: Some(100), @@ -93,11 +126,21 @@ impl Default for FileSinkConfig { } /// Sink configuration collection -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone)] pub struct SinkConfig { - pub kafka: KafkaSinkConfig, - pub webhook: WebhookSinkConfig, - pub file: FileSinkConfig, + pub kafka: Option, + pub webhook: Option, + pub file: Option, +} + +impl Default for SinkConfig { + fn default() -> Self { + SinkConfig { + kafka: None, + webhook: None, + file: Some(FileSinkConfig::default()), + } + } } ///Logger Configuration @@ -109,7 +152,7 @@ pub struct LoggerConfig { impl Default for LoggerConfig { fn default() -> Self { LoggerConfig { - queue_capacity: Some(1000), + queue_capacity: Some(10000), } } } @@ -128,11 +171,28 @@ impl Default for LoggerConfig { /// /// let config = load_config(None); /// ``` -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone)] pub struct AppConfig { pub observability: OtelConfig, pub sinks: SinkConfig, - pub logger: LoggerConfig, + pub logger: Option, +} + +// 为 AppConfig 实现 Default +impl AppConfig { + pub fn new() -> Self { + Self { + observability: OtelConfig::default(), + sinks: SinkConfig::default(), + logger: Some(LoggerConfig::default()), + } + } +} + +impl Default for AppConfig { + fn default() -> Self { + Self::new() + } } const DEFAULT_CONFIG_FILE: &str = "obs"; @@ -187,9 +247,25 @@ pub fn load_config(config_dir: Option) -> AppConfig { let config = Config::builder() .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false)) .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false)) - .add_source(config::Environment::with_prefix("")) + .add_source( + Environment::default() + .prefix("RUSTFS") + .prefix_separator("__") + .separator("__") + .with_list_parse_key("volumes") + .try_parsing(true), + ) .build() .unwrap_or_default(); - config.try_deserialize().unwrap_or_default() + match config.try_deserialize::() { + Ok(app_config) => { + println!("Parsed AppConfig: {:?}", app_config); + app_config + } + Err(e) => { + println!("Failed to deserialize config: {}", e); + AppConfig::default() + } + } } diff --git a/crates/obs/src/logger.rs b/crates/obs/src/logger.rs index 55f4bd0e..1e62712c 100644 --- a/crates/obs/src/logger.rs +++ b/crates/obs/src/logger.rs @@ -21,7 +21,7 @@ impl Logger { /// Returns Logger and corresponding Receiver pub fn new(config: &AppConfig) -> (Self, Receiver) { // Get queue capacity from configuration, or use default values 10000 - let queue_capacity = config.logger.queue_capacity.unwrap_or(10000); + let queue_capacity = config.logger.as_ref().and_then(|l| l.queue_capacity).unwrap_or(10000); let (sender, receiver) = mpsc::channel(queue_capacity); (Logger { sender, queue_capacity }, receiver) } diff --git a/crates/obs/src/sink.rs b/crates/obs/src/sink.rs index 1f1c481f..a92a980a 100644 --- a/crates/obs/src/sink.rs +++ b/crates/obs/src/sink.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use tokio::fs::OpenOptions; use tokio::io; use tokio::io::AsyncWriteExt; -use tracing::debug; /// Sink Trait definition, asynchronously write logs #[async_trait] @@ -274,15 +273,15 @@ impl FileSink { // if the file not exists, create it if !file_exists { tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?; - debug!("the file not exists,create if. path: {:?}", path) + tracing::debug!("the file not exists,create if. path: {:?}", path) } let file = if file_exists { // If the file exists, open it in append mode - debug!("FileSink: File exists, opening in append mode."); + tracing::debug!("FileSink: File exists, opening in append mode."); OpenOptions::new().append(true).create(true).open(&path).await? } else { // If the file does not exist, create it - debug!("FileSink: File does not exist, creating a new file."); + tracing::debug!("FileSink: File does not exist, creating a new file."); // Create the file and write a header or initial content if needed OpenOptions::new().create(true).truncate(true).write(true).open(&path).await? }; @@ -414,52 +413,84 @@ pub async fn create_sinks(config: &AppConfig) -> Vec> { let mut sinks: Vec> = Vec::new(); #[cfg(feature = "kafka")] - if config.sinks.kafka.enabled { - match rdkafka::config::ClientConfig::new() - .set("bootstrap.servers", &config.sinks.kafka.bootstrap_servers) - .set("message.timeout.ms", "5000") - .create() - { - Ok(producer) => { - sinks.push(Arc::new(KafkaSink::new( - producer, - config.sinks.kafka.topic.clone(), - config.sinks.kafka.batch_size.unwrap_or(100), - config.sinks.kafka.batch_timeout_ms.unwrap_or(1000), - ))); + { + match &config.sinks.kafka { + Some(sink_kafka) => { + if sink_kafka.enabled { + match rdkafka::config::ClientConfig::new() + .set("bootstrap.servers", &sink_kafka.bootstrap_servers) + .set("message.timeout.ms", "5000") + .create() + { + Ok(producer) => { + sinks.push(Arc::new(KafkaSink::new( + producer, + sink_kafka.topic.clone(), + sink_kafka.batch_size.unwrap_or(100), + sink_kafka.batch_timeout_ms.unwrap_or(1000), + ))); + } + Err(e) => { + tracing::error!("Failed to create Kafka producer: {}", e); + } + } + } else { + tracing::info!("Kafka sink is disabled in the configuration"); + } + } + _ => { + tracing::info!("Kafka sink is not configured or disabled"); } - Err(e) => eprintln!("Failed to create Kafka producer: {}", e), } } - #[cfg(feature = "webhook")] - if config.sinks.webhook.enabled { - sinks.push(Arc::new(WebhookSink::new( - config.sinks.webhook.endpoint.clone(), - config.sinks.webhook.auth_token.clone(), - config.sinks.webhook.max_retries.unwrap_or(3), - config.sinks.webhook.retry_delay_ms.unwrap_or(100), - ))); + { + match &config.sinks.webhook { + Some(sink_webhook) => { + if sink_webhook.enabled { + sinks.push(Arc::new(WebhookSink::new( + sink_webhook.endpoint.clone(), + sink_webhook.auth_token.clone(), + sink_webhook.max_retries.unwrap_or(3), + sink_webhook.retry_delay_ms.unwrap_or(100), + ))); + } else { + tracing::info!("Webhook sink is disabled in the configuration"); + } + } + _ => { + tracing::info!("Webhook sink is not configured or disabled"); + } + } } #[cfg(feature = "file")] { - let path = if config.sinks.file.enabled { - config.sinks.file.path.clone() - } else { - "default.log".to_string() - }; - debug!("FileSink: Using path: {}", path); - sinks.push(Arc::new( - FileSink::new( - path.clone(), - config.sinks.file.buffer_size.unwrap_or(8192), - config.sinks.file.flush_interval_ms.unwrap_or(1000), - config.sinks.file.flush_threshold.unwrap_or(100), - ) - .await - .unwrap(), - )); + // let config = config.clone(); + match &config.sinks.file { + Some(sink_file) => { + tracing::info!("File sink is enabled in the configuration"); + let path = if sink_file.enabled { + sink_file.path.clone() + } else { + "rustfs.log".to_string() + }; + tracing::debug!("FileSink: Using path: {}", path); + sinks.push(Arc::new( + FileSink::new( + path.clone(), + sink_file.buffer_size.unwrap_or(8192), + sink_file.flush_interval_ms.unwrap_or(1000), + sink_file.flush_threshold.unwrap_or(100), + ) + .await + .unwrap(), + )); + } + _ => { + tracing::info!("File sink is not configured or disabled"); + } + } } sinks diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 415e427f..82c93de7 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -293,7 +293,10 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { registry.with(ErrorLayer::default()).with(fmt_layer).init(); if !config.endpoint.is_empty() { - info!("OpenTelemetry telemetry initialized with OTLP endpoint: {}", config.endpoint); + info!( + "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}", + config.endpoint, logger_level + ); } OtelGuard { diff --git a/deploy/config/.example.env b/deploy/config/.example.env new file mode 100644 index 00000000..9db2f211 --- /dev/null +++ b/deploy/config/.example.env @@ -0,0 +1,27 @@ +OBSERVABILITY__ENDPOINT=http://localhost:4317 +OBSERVABILITY__USE_STDOUT=true +OBSERVABILITY__SAMPLE_RATIO=2.0 +OBSERVABILITY__METER_INTERVAL=30 +OBSERVABILITY__SERVICE_NAME=rustfs +OBSERVABILITY__SERVICE_VERSION=0.1.0 +OBSERVABILITY__ENVIRONMENT=develop +OBSERVABILITY__LOGGER_LEVEL=debug + +SINKS__KAFKA__ENABLED=false +SINKS__KAFKA__BOOTSTRAP_SERVERS=localhost:9092 +SINKS__KAFKA__TOPIC=logs +SINKS__KAFKA__BATCH_SIZE=100 +SINKS__KAFKA__BATCH_TIMEOUT_MS=1000 + +SINKS__WEBHOOK__ENABLED=false +SINKS__WEBHOOK__ENDPOINT=http://localhost:8080/webhook +SINKS__WEBHOOK__AUTH_TOKEN= +SINKS__WEBHOOK__BATCH_SIZE=100 +SINKS__WEBHOOK__BATCH_TIMEOUT_MS=1000 + +SINKS__FILE__ENABLED=true +SINKS__FILE__PATH=./deploy/logs/app.log +SINKS__FILE__BATCH_SIZE=10 +SINKS__FILE__BATCH_TIMEOUT_MS=1000 + +LOGGER__QUEUE_CAPACITY=10 \ No newline at end of file diff --git a/deploy/config/obs.example.toml b/deploy/config/obs.example.toml index 0ba434ad..dc74a85f 100644 --- a/deploy/config/obs.example.toml +++ b/deploy/config/obs.example.toml @@ -6,7 +6,7 @@ meter_interval = 30 service_name = "rustfs" service_version = "0.1.0" environment = "develop" -looger_level = "info" +logger_level = "info" [sinks] [sinks.kafka] # Kafka sink is disabled by default @@ -25,7 +25,7 @@ batch_timeout_ms = 1000 # Default is 100ms if not specified [sinks.file] enabled = true -path = "logs/app.log" +path = "./deploy/logs/app.log" batch_size = 100 batch_timeout_ms = 1000 # Default is 8192 bytes if not specified diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 00c10beb..96e9700b 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -19,6 +19,7 @@ madmin.workspace = true api = { path = "../s3select/api" } appauth = { version = "0.0.1", path = "../appauth" } atoi = { workspace = true } +atomic_enum = { workspace = true } axum.workspace = true axum-extra = { workspace = true } axum-server = { workspace = true } @@ -87,7 +88,7 @@ url.workspace = true uuid = "1.15.1" [target.'cfg(target_os = "linux")'.dependencies] -libsystemd = "0.7" +libsystemd.workspace = true [build-dependencies] prost-build.workspace = true diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index cd1c72d1..8c34b349 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -5,14 +5,15 @@ mod console; mod grpc; pub mod license; mod logging; +mod server; mod service; mod storage; mod utils; - use crate::auth::IAMAuth; use crate::console::{init_console_cfg, CONSOLE_CONFIG}; -use crate::utils::error; // Ensure the correct path for parse_license is imported +use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT}; +use crate::utils::error; use chrono::Datelike; use clap::Parser; use common::{ @@ -31,6 +32,7 @@ use ecstore::{ }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use grpc::make_server; +use hyper_util::server::graceful::GracefulShutdown; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ConnBuilder, @@ -44,41 +46,18 @@ use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; use std::sync::Arc; +use std::time::Duration; use std::{io::IsTerminal, net::SocketAddr}; use tokio::net::TcpListener; use tokio::signal::unix::{signal, SignalKind}; use tokio_rustls::TlsAcceptor; use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; -use tracing::{debug, error, info, info_span, warn}; +use tracing::log::warn; +use tracing::{debug, error, info, info_span}; use tracing_error::ErrorLayer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -#[cfg(target_os = "linux")] -fn notify_systemd(state: &str) { - use libsystemd::daemon::{notify, NotifyState}; - let notify_state = match state { - "ready" => NotifyState::Ready, - "stopping" => NotifyState::Stopping, - _ => { - warn!("Unsupported state passed to notify_systemd: {}", state); - return; - } - }; - - if let Err(e) = notify(false, &[notify_state]) { - error!("Failed to notify systemd: {}", e); - } else { - debug!("Successfully notified systemd: {}", state); - } - info!("Systemd notifications are enabled on linux (state: {})", state); -} - -#[cfg(not(target_os = "linux"))] -fn notify_systemd(state: &str) { - info!("Systemd notifications are not available on this platform not linux (state: {})", state); -} - #[allow(dead_code)] fn setup_tracing() { use tracing_subscriber::EnvFilter; @@ -286,8 +265,14 @@ async fn run(opt: config::Opt) -> Result<()> { None }; - // Create an oneshot channel to wait for the service to start - let (tx, rx) = tokio::sync::oneshot::channel(); + let state_manager = ServiceStateManager::new(); + let worker_state_manager = state_manager.clone(); + // 更新服务状态为启动中 + state_manager.update(ServiceState::Starting); + + // Create shutdown channel + let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1); + let shutdown_tx_clone = shutdown_tx.clone(); tokio::spawn(async move { // 错误处理改进 @@ -317,11 +302,11 @@ async fn run(opt: config::Opt) -> Result<()> { let http_server = ConnBuilder::new(TokioExecutor::new()); let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); - let graceful = hyper_util::server::graceful::GracefulShutdown::new(); + let graceful = GracefulShutdown::new(); debug!("graceful initiated"); - // Send a message to the main thread to indicate that the server has started - let _ = tx.send(()); + // 服务准备就绪 + worker_state_manager.update(ServiceState::Ready); loop { debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs); @@ -337,17 +322,23 @@ async fn run(opt: config::Opt) -> Result<()> { } } _ = ctrl_c.as_mut() => { - drop(listener); - eprintln!("Ctrl-C received, starting shutdown"); + info!("Ctrl-C received in worker thread"); + let _ = shutdown_tx_clone.send(()); break; } _ = sigint_inner.recv() => { info!("SIGINT received in worker thread"); + let _ = shutdown_tx_clone.send(()); break; } _ = sigterm_inner.recv() => { info!("SIGTERM received in worker thread"); + let _ = shutdown_tx_clone.send(()); + break; + } + _ = shutdown_rx.recv() => { + info!("Shutdown signal received in worker thread"); break; } }; @@ -391,7 +382,7 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("Http handshake success"); } } - + worker_state_manager.update(ServiceState::Stopping); tokio::select! { () = graceful.shutdown() => { debug!("Gracefully shutdown!"); @@ -400,6 +391,7 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("Waited 10 seconds for graceful shutdown, aborting..."); } } + worker_state_manager.update(ServiceState::Stopped); }); // init store @@ -449,97 +441,24 @@ async fn run(opt: config::Opt) -> Result<()> { }); } - // 执行休眠 1 秒钟 - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Wait for the HTTP service to finish starting - if rx.await.is_ok() { - notify_systemd("ready"); - } else { - info!("Failed to start the server"); - } - - // 主线程中监听信号 - let mut sigterm = signal(SignalKind::terminate())?; - let mut sigint = signal(SignalKind::interrupt())?; - tokio::select! { - _ = tokio::signal::ctrl_c() => { - eprintln!("Ctrl-C received, starting shutdown"); - notify_systemd("stopping"); - } - - _ = sigint.recv() => { - info!("SIGINT received, starting shutdown"); - notify_systemd("stopping"); - } - _ = sigterm.recv() => { - info!("SIGTERM received, starting shutdown"); - notify_systemd("stopping"); + // Perform hibernation for 1 second + tokio::time::sleep(SHUTDOWN_TIMEOUT).await; + // listen to the shutdown signal + match wait_for_shutdown().await { + ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => { + info!("Shutdown signal received in main thread"); + // update the status to stopping first + state_manager.update(ServiceState::Stopping); + info!("Server is stopping..."); + let _ = shutdown_tx.send(()); + // Wait for the worker thread to complete the cleaning work + tokio::time::sleep(SHUTDOWN_TIMEOUT).await; + // the last updated status is stopped + state_manager.update(ServiceState::Stopped); + info!("Server stopped current "); } } - info!("server is stopped"); + info!("server is stopped state: {:?}", state_manager.current_state()); Ok(()) } - -// #[allow(dead_code)] -// #[derive(Debug)] -// enum ShutdownSignal { -// CtrlC, -// Sigterm, -// Sigint, -// } -// #[allow(dead_code)] -// async fn wait_for_shutdown() -> ShutdownSignal { -// let mut sigterm = signal(SignalKind::terminate()).unwrap(); -// let mut sigint = signal(SignalKind::interrupt()).unwrap(); -// -// tokio::select! { -// _ = tokio::signal::ctrl_c() => { -// info!("Received Ctrl-C signal"); -// ShutdownSignal::CtrlC -// } -// _ = sigint.recv() => { -// info!("Received SIGINT signal"); -// ShutdownSignal::Sigint -// } -// _ = sigterm.recv() => { -// info!("Received SIGTERM signal"); -// ShutdownSignal::Sigterm -// } -// } -// } -// #[allow(dead_code)] -// #[derive(Debug)] -// enum ServiceState { -// Starting, -// Ready, -// Stopping, -// Stopped, -// } -// #[allow(dead_code)] -// fn notify_service_state(state: ServiceState) { -// match state { -// ServiceState::Starting => { -// info!("Service is starting..."); -// #[cfg(target_os = "linux")] -// if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...")]) { -// error!("Failed to notify systemd of starting state: {}", e); -// } -// } -// ServiceState::Ready => { -// info!("Service is ready"); -// notify_systemd("ready"); -// } -// ServiceState::Stopping => { -// info!("Service is stopping..."); -// notify_systemd("stopping"); -// } -// ServiceState::Stopped => { -// info!("Service has stopped"); -// #[cfg(target_os = "linux")] -// if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped")]) { -// error!("Failed to notify systemd of stopped state: {}", e); -// } -// } -// } -// } diff --git a/rustfs/src/server/mod.rs b/rustfs/src/server/mod.rs new file mode 100644 index 00000000..b1c764a1 --- /dev/null +++ b/rustfs/src/server/mod.rs @@ -0,0 +1,6 @@ +mod service_state; +pub(crate) use service_state::wait_for_shutdown; +pub(crate) use service_state::ServiceState; +pub(crate) use service_state::ServiceStateManager; +pub(crate) use service_state::ShutdownSignal; +pub(crate) use service_state::SHUTDOWN_TIMEOUT; diff --git a/rustfs/src/server/service_state.rs b/rustfs/src/server/service_state.rs new file mode 100644 index 00000000..e1d03c8a --- /dev/null +++ b/rustfs/src/server/service_state.rs @@ -0,0 +1,152 @@ +use atomic_enum::atomic_enum; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; +use tokio::signal::unix::{signal, SignalKind}; +use tracing::info; + +// a configurable shutdown timeout +pub(crate) const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); + +#[cfg(target_os = "linux")] +fn notify_systemd(state: &str) { + use libsystemd::daemon::{notify, NotifyState}; + use tracing::{debug, error}; + let notify_state = match state { + "ready" => NotifyState::Ready, + "stopping" => NotifyState::Stopping, + _ => { + info!("Unsupported state passed to notify_systemd: {}", state); + return; + } + }; + + if let Err(e) = notify(false, &[notify_state]) { + error!("Failed to notify systemd: {}", e); + } else { + debug!("Successfully notified systemd: {}", state); + } + info!("Systemd notifications are enabled on linux (state: {})", state); +} + +#[cfg(not(target_os = "linux"))] +fn notify_systemd(state: &str) { + info!("Systemd notifications are not available on this platform not linux (state: {})", state); +} + +#[derive(Debug)] +pub enum ShutdownSignal { + CtrlC, + Sigterm, + Sigint, +} + +#[atomic_enum] +#[derive(PartialEq)] +pub(crate) enum ServiceState { + Starting, + Ready, + Stopping, + Stopped, +} + +pub(crate) async fn wait_for_shutdown() -> ShutdownSignal { + let mut sigterm = signal(SignalKind::terminate()).expect("failed to create SIGTERM signal handler"); + let mut sigint = signal(SignalKind::interrupt()).expect("failed to create SIGINT signal handler"); + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("Received Ctrl-C signal"); + ShutdownSignal::CtrlC + } + _ = sigint.recv() => { + info!("Received SIGINT signal"); + ShutdownSignal::Sigint + } + _ = sigterm.recv() => { + info!("Received SIGTERM signal"); + ShutdownSignal::Sigterm + } + } +} + +#[derive(Clone)] +pub(crate) struct ServiceStateManager { + state: Arc, +} + +impl ServiceStateManager { + pub fn new() -> Self { + Self { + state: Arc::new(AtomicServiceState::new(ServiceState::Starting)), + } + } + + pub fn update(&self, new_state: ServiceState) { + self.state.store(new_state, Ordering::SeqCst); + self.notify_systemd(&new_state); + } + + pub fn current_state(&self) -> ServiceState { + self.state.load(Ordering::SeqCst) + } + + fn notify_systemd(&self, state: &ServiceState) { + match state { + ServiceState::Starting => { + info!("Service is starting..."); + #[cfg(target_os = "linux")] + if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Starting...")]) { + tracing::error!("Failed to notify systemd of starting state: {}", e); + } + } + ServiceState::Ready => { + info!("Service is ready"); + notify_systemd("ready"); + } + ServiceState::Stopping => { + info!("Service is stopping..."); + notify_systemd("stopping"); + } + ServiceState::Stopped => { + info!("Service has stopped"); + #[cfg(target_os = "linux")] + if let Err(e) = libsystemd::daemon::notify(false, &[libsystemd::daemon::NotifyState::Status("Stopped")]) { + tracing::error!("Failed to notify systemd of stopped state: {}", e); + } + } + } + } +} + +impl Default for ServiceStateManager { + fn default() -> Self { + Self::new() + } +} + +// 使用示例 +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_service_state_manager() { + let manager = ServiceStateManager::new(); + + // 初始状态应该是 Starting + assert_eq!(manager.current_state(), ServiceState::Starting); + + // 更新状态到 Ready + manager.update(ServiceState::Ready); + assert_eq!(manager.current_state(), ServiceState::Ready); + + // 更新状态到 Stopping + manager.update(ServiceState::Stopping); + assert_eq!(manager.current_state(), ServiceState::Stopping); + + // 更新状态到 Stopped + manager.update(ServiceState::Stopped); + assert_eq!(manager.current_state(), ServiceState::Stopped); + } +} diff --git a/scripts/run.sh b/scripts/run.sh index 37e703f9..538c8652 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -33,8 +33,27 @@ export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002" # export RUSTFS_SERVER_DOMAINS="localhost:9000" -# 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 -export RUSTFS_OBS_CONFIG="./config/obs.example.toml" +# 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一 +export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" + +# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 +export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:43178 +export RUSTFS__OBSERVABILITY__USE_STDOUT=true +export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 +export RUSTFS__OBSERVABILITY__METER_INTERVAL=30 +export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs +export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 +export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop +export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info +export RUSTFS__SINKS__FILE__ENABLED=true +export RUSTFS__SINKS__FILE__PATH="./deploy/logs/app.log" +export RUSTFS__SINKS__WEBHOOK__ENABLED=false +export RUSTFS__SINKS__WEBHOOK__ENDPOINT="" +export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN="" +export RUSTFS__SINKS__KAFKA__ENABLED=false +export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS="" +export RUSTFS__SINKS__KAFKA__TOPIC="" +export RUSTFS__LOGGER__QUEUE_CAPACITY=10 if [ -n "$1" ]; then export RUSTFS_VOLUMES="$1"