From 9b7f4d477af150dd6beeab117ffb381d8b08dea9 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 24 Sep 2025 22:23:31 +0800 Subject: [PATCH] Fix Tokio Runtime Initialization: Remove Private API Usage and Ensure IO Enabled (#587) * fix: remove code * improve code for tokio runtime config * improve code for main * fix: add tokio enable_all * upgrade version * improve for Cargo.toml --- Cargo.lock | 46 +++++++++---------- Cargo.toml | 12 ++--- crates/ahm/Cargo.toml | 4 +- crates/config/src/constants/app.rs | 5 +++ crates/config/src/constants/mod.rs | 1 + crates/config/src/constants/runtime.rs | 35 +++++++++++++++ crates/config/src/lib.rs | 2 + crates/e2e_test/Cargo.toml | 6 +-- crates/kms/Cargo.toml | 11 +---- crates/targets/src/target/webhook.rs | 2 +- crates/utils/src/lib.rs | 6 +++ crates/utils/src/sys/envs.rs | 42 ++++++++++++++++++ crates/utils/src/sys/mod.rs | 6 +-- rustfs/Cargo.toml | 3 -- rustfs/src/admin/handlers/event.rs | 6 --- rustfs/src/admin/utils.rs | 2 +- rustfs/src/main.rs | 39 +++++++++++----- rustfs/src/server/http.rs | 61 ++++++++++++++++++++++---- rustfs/src/server/mod.rs | 2 +- scripts/run.sh | 10 +++++ 20 files changed, 221 insertions(+), 80 deletions(-) create mode 100644 crates/config/src/constants/runtime.rs create mode 100644 crates/utils/src/sys/envs.rs diff --git a/Cargo.lock b/Cargo.lock index bae2a188..4a6d662f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,9 +695,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.85.0" +version = "1.86.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67e05f33b6c9026fecfe9b3b6740f34d41bc6ff641a6a32dabaab60209245b75" +checksum = "9d1cc7fb324aa12eb4404210e6381195c5b5e9d52c2682384f295f38716dd3c7" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1682,9 +1682,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpp_demangle" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d" +checksum = "f2bb79cb74d735044c972aae58ed0aaa9a837e85b01106a54c39e42e97f62253" dependencies = [ "cfg-if", ] @@ -2719,12 +2719,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" +checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" dependencies = [ "powerfmt", - "serde", + "serde_core", ] [[package]] @@ -2886,20 +2886,18 @@ dependencies = [ "chrono", "flatbuffers", "futures", - "md5", + "md5 0.8.0", "rand 0.9.2", "reqwest", "rmp-serde", "rustfs-ecstore", "rustfs-filemeta", - "rustfs-kms", "rustfs-lock", "rustfs-madmin", "rustfs-protos", "serde", "serde_json", "serial_test", - "tempfile", "tokio", "tonic 0.14.2", "tracing", @@ -3116,9 +3114,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flatbuffers" -version = "25.2.10" +version = "25.9.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" +checksum = "09b6620799e7340ebd9968d2e0708eb82cf1971e9a16821e2091b6d6e475eed5" dependencies = [ "bitflags 2.9.4", "rustc_version", @@ -3637,9 +3635,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "hybrid-array" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a09fa0190457fce307a699c050054974f81b6975b7a017f1e784eb7d9c2d4bc" +checksum = "ed7c10d9cd8b8e0733111482917f4f7e188cf6f57fc8eb0ff9b26a51db9fbd3c" dependencies = [ "typenum", ] @@ -4414,6 +4412,12 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.7.5" @@ -6203,14 +6207,13 @@ dependencies = [ "hyper-util", "libsystemd", "matchit", - "md5", + "md5 0.8.0", "mimalloc", "mime_guess", "opentelemetry", "percent-encoding", "pin-project-lite", "pprof", - "rand 0.9.2", "reqwest", "rust-embed", "rustfs-ahm", @@ -6238,7 +6241,6 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "serial_test", "shadow-rs", "socket2 0.6.0", "sysctl", @@ -6254,7 +6256,6 @@ dependencies = [ "tower", "tower-http", "tracing", - "tracing-subscriber", "url", "urlencoding", "uuid", @@ -6504,24 +6505,19 @@ name = "rustfs-kms" version = "0.0.5" dependencies = [ "aes-gcm", - "anyhow", "async-trait", "base64 0.22.1", - "bytes", "chacha20poly1305", "chrono", - "futures", - "md5", + "md5 0.8.0", "moka", "once_cell", "rand 0.9.2", "reqwest", - "rustfs-crypto", "serde", "serde_json", "sha2 0.10.9", "tempfile", - "test-case", "thiserror 2.0.16", "tokio", "tokio-test", @@ -6713,7 +6709,7 @@ dependencies = [ "hex", "hmac 0.12.1", "hyper 1.7.0", - "md5", + "md5 0.7.0", "once_cell", "regex", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 5b967958..befed3c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,9 +123,9 @@ dashmap = "6.1.0" datafusion = "50.0.0" derive_builder = "0.20.2" enumset = "1.1.10" -flatbuffers = "25.2.10" +flatbuffers = "25.9.23" flate2 = "1.1.2" -flexi_logger = { version = "0.31.3", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] } +flexi_logger = { version = "0.31.4", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] } form_urlencoded = "1.2.2" futures = "0.3.31" futures-core = "0.3.31" @@ -153,15 +153,15 @@ local-ip-address = "0.6.5" lz4 = "1.28.1" matchit = "0.8.4" md-5 = "0.10.6" -md5 = "0.7.0" +md5 = "0.8.0" mime_guess = "2.0.5" -moka = { version = "0.12.10", features = ["future"] } +moka = { version = "0.12.11", features = ["future"] } netif = "0.1.6" nix = { version = "0.30.1", features = ["fs"] } nu-ansi-term = "0.50.1" num_cpus = { version = "1.17.0" } nvml-wrapper = "0.11.0" -object_store = "0.12.3" +object_store = "0.12.4" once_cell = "1.21.3" opentelemetry = { version = "0.30.0" } opentelemetry-appender-tracing = { version = "0.30.1", features = [ @@ -263,6 +263,8 @@ uuid = { version = "1.18.1", features = [ "fast-rng", "macro-diagnostics", ] } +vaultrs = { version = "0.7.4" } +walkdir = "2.5.0" wildmatch = { version = "2.5.0", features = ["serde"] } zeroize = { version = "1.8.1", features = ["derive"] } winapi = { version = "0.3.9" } diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index 4ea392ec..d007965f 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -33,10 +33,10 @@ chrono = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } tempfile = { workspace = true } -walkdir = "2.5.0" +walkdir = { workspace = true } [dev-dependencies] serde_json = { workspace = true } -serial_test = "3.2.0" +serial_test = { workspace = true } tracing-subscriber = { workspace = true } tempfile = { workspace = true } diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index 00917d42..afb2d944 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -170,6 +170,11 @@ pub const DEFAULT_LOG_KEEP_FILES: u16 = 30; /// Example: --external-address ":9020" pub const ENV_EXTERNAL_ADDRESS: &str = "RUSTFS_EXTERNAL_ADDRESS"; +/// 1 KiB +pub const KI_B: usize = 1024; +/// 1 MiB +pub const MI_B: usize = 1024 * 1024; + #[cfg(test)] mod tests { use super::*; diff --git a/crates/config/src/constants/mod.rs b/crates/config/src/constants/mod.rs index a9299c5d..d6a683a4 100644 --- a/crates/config/src/constants/mod.rs +++ b/crates/config/src/constants/mod.rs @@ -15,5 +15,6 @@ pub(crate) mod app; pub(crate) mod console; pub(crate) mod env; +pub(crate) mod runtime; pub(crate) mod targets; pub(crate) mod tls; diff --git a/crates/config/src/constants/runtime.rs b/crates/config/src/constants/runtime.rs new file mode 100644 index 00000000..044dff96 --- /dev/null +++ b/crates/config/src/constants/runtime.rs @@ -0,0 +1,35 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::MI_B; + +// Tokio runtime ENV keys +pub const ENV_WORKER_THREADS: &str = "RUSTFS_RUNTIME_WORKER_THREADS"; +pub const ENV_MAX_BLOCKING_THREADS: &str = "RUSTFS_RUNTIME_MAX_BLOCKING_THREADS"; +pub const ENV_THREAD_PRINT_ENABLED: &str = "RUSTFS_RUNTIME_THREAD_PRINT_ENABLED"; +pub const ENV_THREAD_STACK_SIZE: &str = "RUSTFS_RUNTIME_THREAD_STACK_SIZE"; +pub const ENV_THREAD_KEEP_ALIVE: &str = "RUSTFS_RUNTIME_THREAD_KEEP_ALIVE"; +pub const ENV_GLOBAL_QUEUE_INTERVAL: &str = "RUSTFS_RUNTIME_GLOBAL_QUEUE_INTERVAL"; +pub const ENV_THREAD_NAME: &str = "RUSTFS_RUNTIME_THREAD_NAME"; +pub const ENV_RNG_SEED: &str = "RUSTFS_RUNTIME_RNG_SEED"; + +// Default values for Tokio runtime +pub const DEFAULT_WORKER_THREADS: usize = 16; +pub const DEFAULT_MAX_BLOCKING_THREADS: usize = 1024; +pub const DEFAULT_THREAD_PRINT_ENABLED: bool = false; +pub const DEFAULT_THREAD_STACK_SIZE: usize = MI_B; // 1 MiB +pub const DEFAULT_THREAD_KEEP_ALIVE: u64 = 60; // seconds +pub const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; +pub const DEFAULT_THREAD_NAME: &str = "rustfs-worker"; +pub const DEFAULT_RNG_SEED: Option = None; // None means random diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 495a9f33..b4291879 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -21,6 +21,8 @@ pub use constants::console::*; #[cfg(feature = "constants")] pub use constants::env::*; #[cfg(feature = "constants")] +pub use constants::runtime::*; +#[cfg(feature = "constants")] pub use constants::targets::*; #[cfg(feature = "constants")] pub use constants::tls::*; diff --git a/crates/e2e_test/Cargo.toml b/crates/e2e_test/Cargo.toml index 24eb4b32..9f5f9538 100644 --- a/crates/e2e_test/Cargo.toml +++ b/crates/e2e_test/Cargo.toml @@ -42,13 +42,11 @@ serial_test = { workspace = true } aws-sdk-s3.workspace = true aws-config = { workspace = true } async-trait = { workspace = true } -rustfs-kms.workspace = true reqwest = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } uuid = { workspace = true } base64 = { workspace = true } -md5 = { workspace = true } -tempfile = { workspace = true } rand = { workspace = true } -chrono = { workspace = true } \ No newline at end of file +chrono = { workspace = true } +md5 = { workspace = true } diff --git a/crates/kms/Cargo.toml b/crates/kms/Cargo.toml index 3f8c205a..18859588 100644 --- a/crates/kms/Cargo.toml +++ b/crates/kms/Cargo.toml @@ -31,15 +31,12 @@ workspace = true # Core dependencies async-trait = { workspace = true } tokio = { workspace = true, features = ["full"] } -futures = { workspace = true } -bytes = { workspace = true } uuid = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } -anyhow = { workspace = true } once_cell = { workspace = true } # Cryptography @@ -62,15 +59,11 @@ md5 = { workspace = true } # HTTP client for Vault reqwest = { workspace = true } -vaultrs = { version = "0.7.2" } - -# Internal dependencies -rustfs-crypto = { workspace = true } +vaultrs = { workspace = true } [dev-dependencies] tokio-test = { workspace = true } tempfile = { workspace = true } -test-case = { workspace = true } [features] -default = [] \ No newline at end of file +default = [] diff --git a/crates/targets/src/target/webhook.rs b/crates/targets/src/target/webhook.rs index 74d5fbd3..d2de20e9 100644 --- a/crates/targets/src/target/webhook.rs +++ b/crates/targets/src/target/webhook.rs @@ -128,7 +128,7 @@ where // Build HTTP client let mut client_builder = Client::builder() .timeout(Duration::from_secs(30)) - .user_agent(rustfs_utils::sys::get_user_agent(rustfs_utils::sys::ServiceType::Basis)); + .user_agent(rustfs_utils::get_user_agent(rustfs_utils::ServiceType::Basis)); // Supplementary certificate processing logic if !args.client_cert.is_empty() && !args.client_key.is_empty() { diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index d87569c0..2b636f26 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -77,5 +77,11 @@ mod notify; #[cfg(feature = "sys")] pub mod sys; +#[cfg(feature = "sys")] +pub use sys::user_agent::*; + +#[cfg(feature = "sys")] +pub use sys::envs::*; + #[cfg(feature = "notify")] pub use notify::*; diff --git a/crates/utils/src/sys/envs.rs b/crates/utils/src/sys/envs.rs new file mode 100644 index 00000000..6f7fea5e --- /dev/null +++ b/crates/utils/src/sys/envs.rs @@ -0,0 +1,42 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; + +pub fn get_env_usize(key: &str, default: usize) -> usize { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +pub fn get_env_u64(key: &str, default: u64) -> u64 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +pub fn get_env_u32(key: &str, default: u32) -> u32 { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} +pub fn get_env_str(key: &str, default: &str) -> String { + env::var(key).unwrap_or_else(|_| default.to_string()) +} +pub fn get_env_opt_u64(key: &str) -> Option { + env::var(key).ok().and_then(|v| v.parse().ok()) +} + +pub fn get_env_bool(key: &str, default: bool) -> bool { + env::var(key) + .ok() + .and_then(|v| match v.to_lowercase().as_str() { + "1" | "true" | "yes" => Some(true), + "0" | "false" | "no" => Some(false), + _ => None, + }) + .unwrap_or(default) +} diff --git a/crates/utils/src/sys/mod.rs b/crates/utils/src/sys/mod.rs index a4144db7..23a3fe33 100644 --- a/crates/utils/src/sys/mod.rs +++ b/crates/utils/src/sys/mod.rs @@ -12,7 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod user_agent; - -pub use user_agent::ServiceType; -pub use user_agent::get_user_agent; +pub(crate) mod envs; +pub(crate) mod user_agent; diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 43edfd64..4705a94a 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -80,7 +80,6 @@ mime_guess = { workspace = true } opentelemetry = { workspace = true } percent-encoding = { workspace = true } pin-project-lite.workspace = true -rand.workspace = true reqwest = { workspace = true } rustls = { workspace = true } rust-embed = { workspace = true, features = ["interpolate-folder-path"] } @@ -136,9 +135,7 @@ mimalloc = "0.1" pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] } [dev-dependencies] -serial_test = "3.1" uuid = { workspace = true, features = ["v4"] } -tracing-subscriber = { workspace = true } [build-dependencies] http.workspace = true diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index 5211188b..48a90d79 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -159,12 +159,6 @@ impl Operation for NotificationTarget { warn!("failed to read request body: {:?}", e); s3_error!(InvalidRequest, "failed to read request body") })?; - let mut kvs_map: HashMap = serde_json::from_slice(&body) - .map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?; - // If there is an enable key, add an enable key value to "on" - if !kvs_map.contains_key(ENABLE_KEY) { - kvs_map.insert(ENABLE_KEY.to_string(), EnableState::On.to_string()); - } // 1. Get the allowed key range let allowed_keys: std::collections::HashSet<&str> = match target_type { diff --git a/rustfs/src/admin/utils.rs b/rustfs/src/admin/utils.rs index 68bf4014..80c3ea2a 100644 --- a/rustfs/src/admin/utils.rs +++ b/rustfs/src/admin/utils.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub fn has_space_be(s: &str) -> bool { +pub(crate) fn has_space_be(s: &str) -> bool { s.trim().len() != s.len() } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 118a19ba..5f39827e 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -25,7 +25,6 @@ mod storage; mod update; mod version; -// Ensure the correct path for parse_license is imported use crate::admin::console::init_console_cfg; use crate::server::{ SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, init_event_notifier, shutdown_event_notifier, @@ -41,20 +40,20 @@ use rustfs_ahm::{ }; use rustfs_common::globals::set_global_addr; use rustfs_config::{DEFAULT_UPDATE_CHECK, ENV_UPDATE_CHECK}; -use rustfs_ecstore::bucket::metadata_sys; -use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys; -use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool; -use rustfs_ecstore::config as ecconfig; -use rustfs_ecstore::config::GLOBAL_CONFIG_SYS; -use rustfs_ecstore::store_api::BucketOptions; use rustfs_ecstore::{ StorageAPI, + bucket::metadata_sys, + bucket::metadata_sys::init_bucket_metadata_sys, + cmd::bucket_replication::init_bucket_replication_pool, + config as ecconfig, + config::GLOBAL_CONFIG_SYS, endpoints::EndpointServerPools, global::{set_global_rustfs_port, shutdown_background_services}, notification_sys::new_global_notification_sys, set_global_endpoints, store::ECStore, store::init_local_disks, + store_api::BucketOptions, update_erasure_type, }; use rustfs_iam::init_iam_sys; @@ -62,7 +61,6 @@ use rustfs_notify::global::notifier_instance; use rustfs_obs::{init_obs, set_global_guard}; use rustfs_targets::arn::TargetID; use rustfs_utils::net::parse_and_resolve_address; -// KMS is now managed dynamically via API use s3s::s3_error; use std::io::{Error, Result}; use std::str::FromStr; @@ -97,8 +95,29 @@ fn print_server_info() { info!("Docs: https://rustfs.com/docs/"); } -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { + let mut builder = server::get_tokio_runtime_builder(); + builder.enable_all(); + if server::print_tokio_thread_enable() { + builder.on_thread_start(|| { + println!( + "RustFS Worker Thread running - initializing resources time: {:?}, thread id: {:?}", + chrono::Utc::now().to_rfc3339(), + std::thread::current().id() + ); + }); + builder.on_thread_stop(|| { + println!( + "RustFS Worker Thread stopping - cleaning up resources time: {:?}, thread id: {:?}", + chrono::Utc::now().to_rfc3339(), + std::thread::current().id() + ) + }); + } + let runtime = builder.build().expect("Failed to build Tokio runtime"); + runtime.block_on(async_main()) +} +async fn async_main() -> Result<()> { // Parse the obtained parameters let opt = config::Opt::parse(); diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index 415a0f32..cb6512a0 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -16,26 +16,23 @@ use crate::admin; use crate::auth::IAMAuth; use crate::config; -use crate::server::hybrid::hybrid; -use crate::server::layer::RedirectLayer; -use crate::server::{ServiceState, ServiceStateManager}; +use crate::server::{ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer}; use crate::storage; use bytes::Bytes; use http::{HeaderMap, Request as HttpRequest, Response}; -use hyper_util::server::graceful::GracefulShutdown; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ConnBuilder, + server::graceful::GracefulShutdown, service::TowerToHyperService, }; -use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use rustfs_ecstore::rpc::make_server; use rustfs_obs::SystemObserver; use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; -use s3s::service::S3Service; -use s3s::{host::MultiDomain, service::S3ServiceBuilder}; +use s3s::{host::MultiDomain, service::S3Service, service::S3ServiceBuilder}; use socket2::SockRef; use std::io::{Error, Result}; use std::net::SocketAddr; @@ -50,8 +47,6 @@ use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use tower_http::trace::TraceLayer; use tracing::{Span, debug, error, info, instrument, warn}; -const MI_B: usize = 1024 * 1024; - /// Parse CORS allowed origins from configuration fn parse_cors_origins(origins: Option<&String>) -> CorsLayer { use http::Method; @@ -644,3 +639,51 @@ fn get_listen_backlog() -> i32 { DEFAULT_BACKLOG } } + +/// Customize the Tokio runtime configuration +/// These configurations can be adjusted by environment variables +/// to optimize performance based on the deployment environment +pub(crate) fn get_tokio_runtime_builder() -> tokio::runtime::Builder { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + + // Worker threads + let worker_threads = rustfs_utils::get_env_usize(rustfs_config::ENV_WORKER_THREADS, rustfs_config::DEFAULT_WORKER_THREADS); + builder.worker_threads(worker_threads); + + // Max blocking threads + let max_blocking_threads = + rustfs_utils::get_env_usize(rustfs_config::ENV_MAX_BLOCKING_THREADS, rustfs_config::DEFAULT_MAX_BLOCKING_THREADS); + builder.max_blocking_threads(max_blocking_threads); + + // Thread stack size + let thread_stack_size = + rustfs_utils::get_env_usize(rustfs_config::ENV_THREAD_STACK_SIZE, rustfs_config::DEFAULT_THREAD_STACK_SIZE); + builder.thread_stack_size(thread_stack_size); + + // Thread keep alive + let thread_keep_alive = + rustfs_utils::get_env_u64(rustfs_config::ENV_THREAD_KEEP_ALIVE, rustfs_config::DEFAULT_THREAD_KEEP_ALIVE); + builder.thread_keep_alive(Duration::from_secs(thread_keep_alive)); + + // Global queue interval + let global_queue_interval = + rustfs_utils::get_env_u32(rustfs_config::ENV_GLOBAL_QUEUE_INTERVAL, rustfs_config::DEFAULT_GLOBAL_QUEUE_INTERVAL); + builder.global_queue_interval(global_queue_interval); + + // Thread name + let thread_name = rustfs_utils::get_env_str(rustfs_config::ENV_THREAD_NAME, rustfs_config::DEFAULT_THREAD_NAME); + builder.thread_name(thread_name.clone()); + println!( + "Starting Tokio runtime with configured parameters:\n\ + worker_threads: {}, max_blocking_threads: {}, thread_stack_size: {}, thread_keep_alive: {}, \ + global_queue_interval: {}, thread_name: {}", + worker_threads, max_blocking_threads, thread_stack_size, thread_keep_alive, global_queue_interval, thread_name + ); + builder +} + +/// Whether to print tokio threads +/// This can be useful for debugging purposes +pub(crate) fn print_tokio_thread_enable() -> bool { + rustfs_utils::get_env_bool(rustfs_config::ENV_THREAD_PRINT_ENABLED, rustfs_config::DEFAULT_THREAD_PRINT_ENABLED) +} diff --git a/rustfs/src/server/mod.rs b/rustfs/src/server/mod.rs index af28ecf8..1d6a69e7 100644 --- a/rustfs/src/server/mod.rs +++ b/rustfs/src/server/mod.rs @@ -26,7 +26,7 @@ mod event; pub(crate) use audit::{start_audit_system, stop_audit_system}; pub(crate) use console::start_console_server; pub(crate) use event::{init_event_notifier, shutdown_event_notifier}; -pub(crate) use http::start_http_server; +pub(crate) use http::{get_tokio_runtime_builder, print_tokio_thread_enable, start_http_server}; pub(crate) use service_state::SHUTDOWN_TIMEOUT; pub(crate) use service_state::ServiceState; pub(crate) use service_state::ServiceStateManager; diff --git a/scripts/run.sh b/scripts/run.sh index 1fcfa365..e61025fa 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -72,6 +72,16 @@ export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs" export RUSTFS_SINKS_FILE_BUFFER_SIZE=12 export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000 export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100 + +#tokio runtime +export RUSTFS_RUNTIME_WORKER_THREADS=16 +export RUSTFS_RUNTIME_MAX_BLOCKING_THREADS=1024 +export RUSTFS_RUNTIME_THREAD_PRINT_ENABLED=true +# shellcheck disable=SC2125 +export RUSTFS_RUNTIME_THREAD_STACK_SIZE=1024*1024 +export RUSTFS_RUNTIME_THREAD_KEEP_ALIVE=60 +export RUSTFS_RUNTIME_GLOBAL_QUEUE_INTERVAL=31 + # # Kafka sink 配置 #export RUSTFS_SINKS_KAFKA_BROKERS=localhost:9092