This commit is contained in:
houseme
2025-06-27 17:47:59 +08:00
parent bd5d27ffbd
commit 35489ea352

View File

@@ -12,10 +12,10 @@ mod service;
mod storage;
use crate::auth::IAMAuth;
use crate::console::{CONSOLE_CONFIG, init_console_cfg};
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
// Ensure the correct path for parse_license is imported
use crate::event::shutdown_event_notifier;
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown};
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
@@ -23,7 +23,6 @@ use common::{
// error::{Error, Result},
globals::set_global_addr,
};
use ecstore::StorageAPI;
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use ecstore::config as ecconfig;
@@ -31,11 +30,12 @@ use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::rpc::make_server;
use ecstore::store_api::BucketOptions;
use ecstore::StorageAPI;
use ecstore::{
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
set_global_endpoints,
store::{ECStore, init_local_disks},
store::{init_local_disks, ECStore},
update_erasure_type,
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
@@ -50,7 +50,7 @@ use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
use rustfs_obs::{init_obs, set_global_guard, SystemObserver};
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
@@ -62,12 +62,12 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};
use tokio::signal::unix::{signal, SignalKind};
use tokio_rustls::TlsAcceptor;
use tonic::{Request, Status, metadata::MetadataValue};
use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument, warn, Span};
const MI_B: usize = 1024 * 1024;
@@ -610,34 +610,41 @@ async fn run(opt: config::Opt) -> Result<()> {
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal
match wait_for_shutdown().await {
signal
if matches!(signal, ShutdownSignal::CtrlC)
|| (cfg!(unix) && matches!(signal, ShutdownSignal::Sigint | ShutdownSignal::Sigterm)) =>
{
info!("Shutdown signal received in main thread");
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Stop the notification system
shutdown_event_notifier().await;
info!("Server is stopping...");
let _ = shutdown_tx.send(());
// Wait for the worker thread to complete the cleaning work
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!("Server stopped current ");
#[cfg(unix)]
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
handle_shutdown(&state_manager, &shutdown_tx).await;
}
#[cfg(not(unix))]
ShutdownSignal::CtrlC => {
handle_shutdown(&state_manager, &shutdown_tx).await;
}
// This branch is required to make the match exhaustive.
// Because the guard above handles all variants, this branch should never be reached.
_ => unreachable!("All shutdown signals should be handled by the guarded arm"),
}
info!("server is stopped state: {:?}", state_manager.current_state());
Ok(())
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!("Shutdown signal received in main thread");
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Stop the notification system
shutdown_event_notifier().await;
info!("Server is stopping...");
let _ = shutdown_tx.send(());
// Wait for the worker thread to complete the cleaning work
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!("Server stopped current ");
}
/// Handles connection errors by logging them with appropriate severity
fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if hyper_err.is_incomplete_message() {