diff --git a/Cargo.lock b/Cargo.lock index 6c704a65..e96b2da1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,9 +216,12 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e" +dependencies = [ + "rustversion", +] [[package]] name = "argon2" @@ -691,9 +694,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.118.0" +version = "1.119.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3e6b7079f85d9ea9a70643c9f89f50db70f5ada868fa9cfe08c1ffdf51abc13" +checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c" dependencies = [ "aws-credential-types", "aws-runtime", @@ -942,9 +945,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.6" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247" +checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -6851,22 +6854,19 @@ dependencies = [ [[package]] name = "rmp" -version = "0.8.14" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4" +checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c" dependencies = [ - "byteorder", "num-traits", - "paste", ] [[package]] name = "rmp-serde" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db" +checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" dependencies = [ - "byteorder", "rmp", "serde", ] @@ -7040,6 +7040,7 @@ dependencies = [ "hex-simd", "http 1.4.0", "http-body 1.0.1", + "http-body-util", "hyper 1.8.1", "hyper-util", "jemalloc_pprof", @@ -7753,9 +7754,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ "bitflags 2.10.0", "errno", @@ -8123,15 +8124,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.146" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "217ca874ae0207aac254aa02c957ded05585a90892cc8d87f9e5fa49669dadd8" +checksum = "6af14725505314343e673e9ecb7cd7e8a36aa9791eb936235a3567cc31447ae4" dependencies = [ "itoa", "memchr", - "ryu", "serde", "serde_core", + "zmij", ] [[package]] @@ -8851,14 +8852,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.23.0" +version = "3.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ "fastrand", "getrandom 0.3.4", "once_cell", - "rustix 1.1.2", + "rustix 1.1.3", "windows-sys 0.61.2", ] @@ -10248,7 +10249,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" dependencies = [ "libc", - "rustix 1.1.2", + "rustix 1.1.3", ] [[package]] @@ -10429,6 +10430,12 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" +[[package]] +name = "zmij" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e404bcd8afdaf006e529269d3e85a743f9480c3cef60034d77860d02964f3ba" + [[package]] name = "zopfli" version = "0.8.3" diff --git a/Cargo.toml b/Cargo.toml index 1dc03cc9..ae85bdc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ hyper-rustls = { version = "0.27.7", default-features = false, features = ["nati hyper-util = { version = "0.1.19", features = ["tokio", "server-auto", "server-graceful"] } http = "1.4.0" http-body = "1.0.1" +http-body-util = "0.1.3" reqwest = { version = "0.12.28", default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2", "system-proxy", "stream", "json", "blocking"] } socket2 = "0.6.1" tokio = { version = "1.48.0", features = ["fs", "rt-multi-thread"] } @@ -131,10 +132,10 @@ form_urlencoded = "1.2.2" prost = "0.14.1" quick-xml = "0.38.4" rmcp = { version = "0.12.0" } -rmp = { version = "0.8.14" } -rmp-serde = { version = "1.3.0" } +rmp = { version = "0.8.15" } +rmp-serde = { version = "1.3.1" } serde = { version = "1.0.228", features = ["derive"] } -serde_json = { version = "1.0.146", features = ["raw_value"] } +serde_json = { version = "1.0.147", features = ["raw_value"] } serde_urlencoded = "0.7.1" schemars = "1.1.0" @@ -163,13 +164,13 @@ time = { version = "0.3.44", features = ["std", "parsing", "formatting", "macros # Utilities and Tools anyhow = "1.0.100" -arc-swap = "1.7.1" +arc-swap = "1.8.0" astral-tokio-tar = "0.5.6" atoi = "2.0.0" atomic_enum = "0.3.0" aws-config = { version = "1.8.12" } aws-credential-types = { version = "1.2.11" } -aws-sdk-s3 = { version = "1.117.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] } +aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] } aws-smithy-types = { version = "1.3.5" } base64 = "0.22.1" base64-simd = "0.8.0" @@ -235,7 +236,7 @@ strum = { version = "0.27.2", features = ["derive"] } sysctl = "0.7.1" sysinfo = "0.37.2" temp-env = "0.3.6" -tempfile = "3.23.0" +tempfile = "3.24.0" test-case = "3.3.1" thiserror = "2.0.17" tracing = { version = "0.1.44" } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 09dc164a..c239d4b3 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -19,6 +19,10 @@ pub mod globals; pub mod heal_channel; pub mod last_minute; pub mod metrics; +mod readiness; + +pub use globals::*; +pub use readiness::{GlobalReadiness, SystemStage}; // is ',' pub static DEFAULT_DELIMITER: u8 = 44; diff --git a/crates/common/src/readiness.rs b/crates/common/src/readiness.rs new file mode 100644 index 00000000..1a0b50d3 --- /dev/null +++ b/crates/common/src/readiness.rs @@ -0,0 +1,136 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU8, Ordering}; + +/// Represents the various stages of system startup +#[repr(u8)] +pub enum SystemStage { + Booting = 0, + StorageReady = 1, // Disks online, Quorum met + IamReady = 2, // Users and Policies loaded into cache + FullReady = 3, // System ready to serve all traffic +} + +/// Global readiness tracker for the service +/// This struct uses atomic operations to track the readiness status of various components +/// of the service in a thread-safe manner. +pub struct GlobalReadiness { + status: AtomicU8, +} + +impl Default for GlobalReadiness { + fn default() -> Self { + Self::new() + } +} + +impl GlobalReadiness { + /// Create a new GlobalReadiness instance with initial status as Starting + /// # Returns + /// A new instance of GlobalReadiness + pub fn new() -> Self { + Self { + status: AtomicU8::new(SystemStage::Booting as u8), + } + } + + /// Update the system to a new stage + /// + /// # Arguments + /// * `step` - The SystemStage step to mark as ready + pub fn mark_stage(&self, step: SystemStage) { + self.status.fetch_max(step as u8, Ordering::SeqCst); + } + + /// Check if the service is fully ready + /// # Returns + /// `true` if the service is fully ready, `false` otherwise + pub fn is_ready(&self) -> bool { + self.status.load(Ordering::SeqCst) == SystemStage::FullReady as u8 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::thread; + + #[test] + fn test_initial_state() { + let readiness = GlobalReadiness::new(); + assert!(!readiness.is_ready()); + assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::Booting as u8); + } + + #[test] + fn test_mark_stage_progression() { + let readiness = GlobalReadiness::new(); + readiness.mark_stage(SystemStage::StorageReady); + assert!(!readiness.is_ready()); + assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::StorageReady as u8); + + readiness.mark_stage(SystemStage::IamReady); + assert!(!readiness.is_ready()); + assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::IamReady as u8); + + readiness.mark_stage(SystemStage::FullReady); + assert!(readiness.is_ready()); + } + + #[test] + fn test_no_regression() { + let readiness = GlobalReadiness::new(); + readiness.mark_stage(SystemStage::FullReady); + readiness.mark_stage(SystemStage::IamReady); // Should not regress + assert!(readiness.is_ready()); + } + + #[test] + fn test_concurrent_marking() { + let readiness = Arc::new(GlobalReadiness::new()); + let mut handles = vec![]; + + for _ in 0..10 { + let r = Arc::clone(&readiness); + handles.push(thread::spawn(move || { + r.mark_stage(SystemStage::StorageReady); + r.mark_stage(SystemStage::IamReady); + r.mark_stage(SystemStage::FullReady); + })); + } + + for h in handles { + h.join().unwrap(); + } + + assert!(readiness.is_ready()); + } + + #[test] + fn test_is_ready_only_at_full_ready() { + let readiness = GlobalReadiness::new(); + assert!(!readiness.is_ready()); + + readiness.mark_stage(SystemStage::StorageReady); + assert!(!readiness.is_ready()); + + readiness.mark_stage(SystemStage::IamReady); + assert!(!readiness.is_ready()); + + readiness.mark_stage(SystemStage::FullReady); + assert!(readiness.is_ready()); + } +} diff --git a/crates/ecstore/src/admin_server_info.rs b/crates/ecstore/src/admin_server_info.rs index 9117f8c0..324ec388 100644 --- a/crates/ecstore/src/admin_server_info.rs +++ b/crates/ecstore/src/admin_server_info.rs @@ -23,7 +23,7 @@ use crate::{ }; use crate::data_usage::load_data_usage_cache; -use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState}; +use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState}; use rustfs_madmin::{ BackendDisks, Disk, ErasureSetInfo, ITEM_INITIALIZING, ITEM_OFFLINE, ITEM_ONLINE, InfoMessage, ServerProperties, }; diff --git a/crates/ecstore/src/metrics_realtime.rs b/crates/ecstore/src/metrics_realtime.rs index 4d938a48..2bbe6456 100644 --- a/crates/ecstore/src/metrics_realtime.rs +++ b/crates/ecstore/src/metrics_realtime.rs @@ -19,11 +19,7 @@ use crate::{ // utils::os::get_drive_stats, }; use chrono::Utc; -use rustfs_common::{ - globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_ADDR}, - heal_channel::DriveState, - metrics::global_metrics, -}; +use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_ADDR, heal_channel::DriveState, metrics::global_metrics}; use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics}; use rustfs_utils::os::get_drive_stats; use serde::{Deserialize, Serialize}; diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 90812cd8..ad6fc5c9 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -40,7 +40,7 @@ use futures::future::join_all; use http::HeaderMap; use rustfs_common::heal_channel::HealOpts; use rustfs_common::{ - globals::GLOBAL_LOCAL_NODE_NAME, + GLOBAL_LOCAL_NODE_NAME, heal_channel::{DriveState, HealItemType}, }; use rustfs_filemeta::FileInfo; diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 2259e5b5..23f127e5 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -55,8 +55,8 @@ use futures::future::join_all; use http::HeaderMap; use lazy_static::lazy_static; use rand::Rng as _; -use rustfs_common::globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT}; use rustfs_common::heal_channel::{HealItemType, HealOpts}; +use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT}; use rustfs_filemeta::FileInfo; use rustfs_madmin::heal_commands::HealResultItem; use rustfs_utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf}; diff --git a/crates/iam/src/error.rs b/crates/iam/src/error.rs index 2a654c43..82d4bbc4 100644 --- a/crates/iam/src/error.rs +++ b/crates/iam/src/error.rs @@ -109,6 +109,9 @@ pub enum Error { #[error("io error: {0}")] Io(std::io::Error), + + #[error("system already initialized")] + IamSysAlreadyInitialized, } impl PartialEq for Error { @@ -162,6 +165,7 @@ impl Clone for Error { Error::PolicyTooLarge => Error::PolicyTooLarge, Error::ConfigNotFound => Error::ConfigNotFound, Error::Io(e) => Error::Io(std::io::Error::new(e.kind(), e.to_string())), + Error::IamSysAlreadyInitialized => Error::IamSysAlreadyInitialized, } } } @@ -226,6 +230,7 @@ impl From for Error { rustfs_policy::error::Error::StringError(s) => Error::StringError(s), rustfs_policy::error::Error::CryptoError(e) => Error::CryptoError(e), rustfs_policy::error::Error::ErrCredMalformed => Error::ErrCredMalformed, + rustfs_policy::error::Error::IamSysAlreadyInitialized => Error::IamSysAlreadyInitialized, } } } diff --git a/crates/iam/src/lib.rs b/crates/iam/src/lib.rs index ebefb72f..f217b84e 100644 --- a/crates/iam/src/lib.rs +++ b/crates/iam/src/lib.rs @@ -18,30 +18,58 @@ use rustfs_ecstore::store::ECStore; use std::sync::{Arc, OnceLock}; use store::object::ObjectStore; use sys::IamSys; -use tracing::{debug, instrument}; +use tracing::{error, info, instrument}; pub mod cache; pub mod error; pub mod manager; pub mod store; -pub mod utils; - pub mod sys; +pub mod utils; static IAM_SYS: OnceLock>> = OnceLock::new(); #[instrument(skip(ecstore))] pub async fn init_iam_sys(ecstore: Arc) -> Result<()> { - debug!("init iam system"); - let s = IamCache::new(ObjectStore::new(ecstore)).await; + if IAM_SYS.get().is_some() { + info!("IAM system already initialized, skipping."); + return Ok(()); + } - IAM_SYS.get_or_init(move || IamSys::new(s).into()); + info!("Starting IAM system initialization sequence..."); + + // 1. Create the persistent storage adapter + let storage_adapter = ObjectStore::new(ecstore); + + // 2. Create the cache manager. + // The `new` method now performs a blocking initial load from disk. + let cache_manager = IamCache::new(storage_adapter).await; + + // 3. Construct the system interface + let iam_instance = Arc::new(IamSys::new(cache_manager)); + + // 4. Securely set the global singleton + if IAM_SYS.set(iam_instance).is_err() { + error!("Critical: Race condition detected during IAM initialization!"); + return Err(Error::IamSysAlreadyInitialized); + } + + info!("IAM system initialization completed successfully."); Ok(()) } #[inline] pub fn get() -> Result>> { - IAM_SYS.get().map(Arc::clone).ok_or(Error::IamSysNotInitialized) + let sys = IAM_SYS.get().map(Arc::clone).ok_or(Error::IamSysNotInitialized)?; + + // Double-check the internal readiness state. The OnceLock is only set + // after initialization and data loading complete, so this is a defensive + // guard to ensure callers never operate on a partially initialized system. + if !sys.is_ready() { + return Err(Error::IamSysNotInitialized); + } + + Ok(sys) } pub fn get_global_iam_sys() -> Option>> { diff --git a/crates/iam/src/manager.rs b/crates/iam/src/manager.rs index fbcb1185..5fa5220b 100644 --- a/crates/iam/src/manager.rs +++ b/crates/iam/src/manager.rs @@ -37,6 +37,7 @@ use rustfs_policy::{ use rustfs_utils::path::path_join_buf; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::sync::atomic::AtomicU8; use std::{ collections::{HashMap, HashSet}, sync::{ @@ -76,9 +77,19 @@ fn get_iam_format_file_path() -> String { path_join_buf(&[&IAM_CONFIG_PREFIX, IAM_FORMAT_FILE]) } +#[repr(u8)] +#[derive(Debug, PartialEq)] +pub enum IamState { + Uninitialized = 0, + Loading = 1, + Ready = 2, + Error = 3, +} + pub struct IamCache { pub cache: Cache, pub api: T, + pub state: Arc, pub loading: Arc, pub roles: HashMap>, pub send_chan: Sender, @@ -89,12 +100,19 @@ impl IamCache where T: Store, { + /// Create a new IAM system instance + /// # Arguments + /// * `api` - The storage backend implementing the Store trait + /// + /// # Returns + /// An Arc-wrapped instance of IamSystem pub(crate) async fn new(api: T) -> Arc { let (sender, receiver) = mpsc::channel::(100); let sys = Arc::new(Self { api, cache: Cache::default(), + state: Arc::new(AtomicU8::new(IamState::Uninitialized as u8)), loading: Arc::new(AtomicBool::new(false)), send_chan: sender, roles: HashMap::new(), @@ -105,10 +123,32 @@ where sys } + /// Initialize the IAM system async fn init(self: Arc, receiver: Receiver) -> Result<()> { + self.state.store(IamState::Loading as u8, Ordering::SeqCst); + // Ensure the IAM format file is persisted first self.clone().save_iam_formatter().await?; - self.clone().load().await?; + // Critical: Load all existing users/policies into memory cache + const MAX_RETRIES: usize = 3; + for attempt in 0..MAX_RETRIES { + if let Err(e) = self.clone().load().await { + if attempt == MAX_RETRIES - 1 { + self.state.store(IamState::Error as u8, Ordering::SeqCst); + error!("IAM fail to load initial data after {} attempts: {:?}", MAX_RETRIES, e); + return Err(e); + } else { + warn!("IAM load failed, retrying... attempt {}", attempt + 1); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } else { + break; + } + } + self.state.store(IamState::Ready as u8, Ordering::SeqCst); + info!("IAM System successfully initialized and marked as READY"); + + // Background ticker for synchronization // Check if environment variable is set let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK").is_ok(); @@ -152,6 +192,11 @@ where Ok(()) } + /// Check if IAM system is ready + pub fn is_ready(&self) -> bool { + self.state.load(Ordering::SeqCst) == IamState::Ready as u8 + } + async fn _notify(&self) { self.send_chan.send(OffsetDateTime::now_utc().unix_timestamp()).await.unwrap(); } diff --git a/crates/iam/src/store/object.rs b/crates/iam/src/store/object.rs index 0390587c..5479465b 100644 --- a/crates/iam/src/store/object.rs +++ b/crates/iam/src/store/object.rs @@ -38,7 +38,7 @@ use std::sync::LazyLock; use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc::{self, Sender}; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; +use tracing::{debug, error, info, warn}; pub static IAM_CONFIG_PREFIX: LazyLock = LazyLock::new(|| format!("{RUSTFS_CONFIG_PREFIX}/iam")); pub static IAM_CONFIG_USERS_PREFIX: LazyLock = LazyLock::new(|| format!("{RUSTFS_CONFIG_PREFIX}/iam/users/")); @@ -341,6 +341,27 @@ impl ObjectStore { Ok(policies) } + /// Checks if the underlying ECStore is ready for metadata operations. + /// This prevents silent failures during the storage boot-up phase. + /// + /// Performs a lightweight probe by attempting to read a known configuration object. + /// If the object is not found, it indicates the storage metadata is not ready. + /// The upper-level caller should handle retries if needed. + async fn check_storage_readiness(&self) -> Result<()> { + // Probe path for a fixed object under the IAM root prefix. + // If it doesn't exist, the system bucket or metadata is not ready. + let probe_path = format!("{}/format.json", *IAM_CONFIG_PREFIX); + + match read_config(self.object_api.clone(), &probe_path).await { + Ok(_) => Ok(()), + Err(rustfs_ecstore::error::StorageError::ConfigNotFound) => Err(Error::other(format!( + "Storage metadata not ready: probe object '{}' not found (expected IAM config to be initialized)", + probe_path + ))), + Err(e) => Err(e.into()), + } + } + // async fn load_policy(&self, name: &str) -> Result { // let mut policy = self // .load_iam_config::(&format!("config/iam/policies/{name}/policy.json")) @@ -398,13 +419,50 @@ impl Store for ObjectStore { Ok(serde_json::from_slice(&data)?) } + /// Saves IAM configuration with a retry mechanism on failure. + /// + /// Attempts to save the IAM configuration up to 5 times if the storage layer is not ready, + /// using exponential backoff between attempts (starting at 200ms, doubling each retry). + /// + /// # Arguments + /// + /// * `item` - The IAM configuration item to save, must implement `Serialize` and `Send`. + /// * `path` - The path where the configuration will be saved. + /// + /// # Returns + /// + /// * `Result<()>` - `Ok(())` on success, or an `Error` if all attempts fail. #[tracing::instrument(level = "debug", skip(self, item, path))] async fn save_iam_config(&self, item: Item, path: impl AsRef + Send) -> Result<()> { let mut data = serde_json::to_vec(&item)?; data = Self::encrypt_data(&data)?; - save_config(self.object_api.clone(), path.as_ref(), data).await?; - Ok(()) + let mut attempts = 0; + let max_attempts = 5; + let path_ref = path.as_ref(); + + loop { + match save_config(self.object_api.clone(), path_ref, data.clone()).await { + Ok(_) => { + debug!("Successfully saved IAM config to {}", path_ref); + return Ok(()); + } + Err(e) if attempts < max_attempts => { + attempts += 1; + // Exponential backoff: 200ms, 400ms, 800ms... + let wait_ms = 200 * (1 << attempts); + warn!( + "Storage layer not ready for IAM write (attempt {}/{}). Retrying in {}ms. Path: {}, Error: {:?}", + attempts, max_attempts, wait_ms, path_ref, e + ); + tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await; + } + Err(e) => { + error!("Final failure saving IAM config to {}: {:?}", path_ref, e); + return Err(e.into()); + } + } + } } async fn delete_iam_config(&self, path: impl AsRef + Send) -> Result<()> { delete_config(self.object_api.clone(), path.as_ref()).await?; @@ -418,8 +476,16 @@ impl Store for ObjectStore { user_identity: UserIdentity, _ttl: Option, ) -> Result<()> { - self.save_iam_config(user_identity, get_user_identity_path(name, user_type)) - .await + // Pre-check storage health + self.check_storage_readiness().await?; + + let path = get_user_identity_path(name, user_type); + debug!("Saving IAM identity to path: {}", path); + + self.save_iam_config(user_identity, path).await.map_err(|e| { + error!("ObjectStore save failure for {}: {:?}", name, e); + e + }) } async fn delete_user_identity(&self, name: &str, user_type: UserType) -> Result<()> { self.delete_iam_config(get_user_identity_path(name, user_type)) diff --git a/crates/iam/src/sys.rs b/crates/iam/src/sys.rs index f5b931d9..a05cdb6b 100644 --- a/crates/iam/src/sys.rs +++ b/crates/iam/src/sys.rs @@ -67,6 +67,13 @@ pub struct IamSys { } impl IamSys { + /// Create a new IamSys instance with the given IamCache store + /// + /// # Arguments + /// * `store` - An Arc to the IamCache instance + /// + /// # Returns + /// A new instance of IamSys pub fn new(store: Arc>) -> Self { tokio::spawn(async move { match opa::lookup_config().await { @@ -87,6 +94,11 @@ impl IamSys { roles_map: HashMap::new(), } } + + /// Check if the IamSys has a watcher configured + /// + /// # Returns + /// `true` if a watcher is configured, `false` otherwise pub fn has_watcher(&self) -> bool { self.store.api.has_watcher() } @@ -859,6 +871,11 @@ impl IamSys { self.get_combined_policy(&policies).await.is_allowed(args).await } + + /// Check if the underlying store is ready + pub fn is_ready(&self) -> bool { + self.store.is_ready() + } } fn is_allowed_by_session_policy(args: &Args<'_>) -> (bool, bool) { diff --git a/crates/policy/src/error.rs b/crates/policy/src/error.rs index 04c58a02..5a0adce1 100644 --- a/crates/policy/src/error.rs +++ b/crates/policy/src/error.rs @@ -89,6 +89,7 @@ pub enum Error { #[error("invalid access_key")] InvalidAccessKey, + #[error("action not allowed")] IAMActionNotAllowed, @@ -106,6 +107,9 @@ pub enum Error { #[error("io error: {0}")] Io(std::io::Error), + + #[error("system already initialized")] + IamSysAlreadyInitialized, } impl Error { diff --git a/crates/protos/src/lib.rs b/crates/protos/src/lib.rs index 42fab1f4..e54f1edf 100644 --- a/crates/protos/src/lib.rs +++ b/crates/protos/src/lib.rs @@ -16,7 +16,7 @@ mod generated; use proto_gen::node_service::node_service_client::NodeServiceClient; -use rustfs_common::globals::{GLOBAL_CONN_MAP, GLOBAL_ROOT_CERT, evict_connection}; +use rustfs_common::{GLOBAL_CONN_MAP, GLOBAL_ROOT_CERT, evict_connection}; use std::{error::Error, time::Duration}; use tonic::{ Request, Status, diff --git a/crates/s3select-api/src/lib.rs b/crates/s3select-api/src/lib.rs index 3cee17e7..322bb436 100644 --- a/crates/s3select-api/src/lib.rs +++ b/crates/s3select-api/src/lib.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Display; - use datafusion::{common::DataFusionError, sql::sqlparser::parser::ParserError}; use snafu::{Backtrace, Location, Snafu}; +use std::fmt::Display; pub mod object_store; pub mod query; diff --git a/crates/s3select-api/src/query/mod.rs b/crates/s3select-api/src/query/mod.rs index 6e1529f9..d83af94b 100644 --- a/crates/s3select-api/src/query/mod.rs +++ b/crates/s3select-api/src/query/mod.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use s3s::dto::SelectObjectContentInput; +use std::sync::Arc; pub mod analyzer; pub mod ast; diff --git a/crates/s3select-api/src/query/session.rs b/crates/s3select-api/src/query/session.rs index e96bc638..ab790542 100644 --- a/crates/s3select-api/src/query/session.rs +++ b/crates/s3select-api/src/query/session.rs @@ -12,20 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - +use crate::query::Context; +use crate::{QueryError, QueryResult, object_store::EcObjectStore}; use datafusion::{ execution::{SessionStateBuilder, context::SessionState, runtime_env::RuntimeEnvBuilder}, parquet::data_type::AsBytes, prelude::SessionContext, }; use object_store::{ObjectStore, memory::InMemory, path::Path}; +use std::sync::Arc; use tracing::error; -use crate::{QueryError, QueryResult, object_store::EcObjectStore}; - -use super::Context; - #[derive(Clone)] pub struct SessionCtx { _desc: Arc, diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index e54a52fd..bac807ec 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -72,6 +72,7 @@ hyper.workspace = true hyper-util.workspace = true http.workspace = true http-body.workspace = true +http-body-util.workspace = true reqwest = { workspace = true } socket2 = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal", "process", "io-util"] } diff --git a/rustfs/src/admin/auth.rs b/rustfs/src/admin/auth.rs index 8b994097..2f101099 100644 --- a/rustfs/src/admin/auth.rs +++ b/rustfs/src/admin/auth.rs @@ -1,6 +1,18 @@ -use std::collections::HashMap; -use std::sync::Arc; +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use crate::auth::get_condition_values; use http::HeaderMap; use rustfs_iam::store::object::ObjectStore; use rustfs_iam::sys::IamSys; @@ -9,8 +21,8 @@ use rustfs_policy::policy::Args; use rustfs_policy::policy::action::Action; use s3s::S3Result; use s3s::s3_error; - -use crate::auth::get_condition_values; +use std::collections::HashMap; +use std::sync::Arc; pub async fn validate_admin_request( headers: &HeaderMap, diff --git a/rustfs/src/admin/console.rs b/rustfs/src/admin/console.rs index 0fe66040..b541edf1 100644 --- a/rustfs/src/admin/console.rs +++ b/rustfs/src/admin/console.rs @@ -14,6 +14,7 @@ use crate::config::build; use crate::license::get_license; +use crate::server::{CONSOLE_PREFIX, FAVICON_PATH, HEALTH_PREFIX, RUSTFS_ADMIN_PREFIX}; use axum::{ Router, body::Body, @@ -45,9 +46,6 @@ use tower_http::timeout::TimeoutLayer; use tower_http::trace::TraceLayer; use tracing::{debug, error, info, instrument, warn}; -pub(crate) const CONSOLE_PREFIX: &str = "/rustfs/console"; -const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3"; - #[derive(RustEmbed)] #[folder = "$CARGO_MANIFEST_DIR/static"] struct StaticFiles; @@ -457,7 +455,7 @@ fn get_console_config_from_env() -> (bool, u32, u64, String) { /// # Returns: /// - `true` if the path is for console access, `false` otherwise. pub fn is_console_path(path: &str) -> bool { - path == "/favicon.ico" || path.starts_with(CONSOLE_PREFIX) + path == FAVICON_PATH || path.starts_with(CONSOLE_PREFIX) } /// Setup comprehensive middleware stack with tower-http features @@ -477,11 +475,11 @@ fn setup_console_middleware_stack( auth_timeout: u64, ) -> Router { let mut app = Router::new() - .route("/favicon.ico", get(static_handler)) + .route(FAVICON_PATH, get(static_handler)) .route(&format!("{CONSOLE_PREFIX}/license"), get(license_handler)) .route(&format!("{CONSOLE_PREFIX}/config.json"), get(config_handler)) .route(&format!("{CONSOLE_PREFIX}/version"), get(version_handler)) - .route(&format!("{CONSOLE_PREFIX}/health"), get(health_check).head(health_check)) + .route(&format!("{CONSOLE_PREFIX}{HEALTH_PREFIX}"), get(health_check).head(health_check)) .nest(CONSOLE_PREFIX, Router::new().fallback_service(get(static_handler))) .fallback_service(get(static_handler)); diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 6f2636a7..821e948f 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -72,7 +72,6 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::debug; use tracing::{error, info, warn}; use url::Host; -// use url::UrlQuery; pub mod bucket_meta; pub mod event; diff --git a/rustfs/src/admin/handlers/rebalance.rs b/rustfs/src/admin/handlers/rebalance.rs index ca5b60f5..736c8754 100644 --- a/rustfs/src/admin/handlers/rebalance.rs +++ b/rustfs/src/admin/handlers/rebalance.rs @@ -12,8 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::{ + admin::{auth::validate_admin_request, router::Operation}, + auth::{check_key_valid, get_session_token}, +}; use http::{HeaderMap, StatusCode}; use matchit::Params; +use rustfs_ecstore::rebalance::RebalanceMeta; use rustfs_ecstore::{ StorageAPI, error::StorageError, @@ -33,12 +38,6 @@ use std::time::Duration; use time::OffsetDateTime; use tracing::warn; -use crate::{ - admin::{auth::validate_admin_request, router::Operation}, - auth::{check_key_valid, get_session_token}, -}; -use rustfs_ecstore::rebalance::RebalanceMeta; - #[derive(Debug, Clone, Deserialize, Serialize)] pub struct RebalanceResp { pub id: String, diff --git a/rustfs/src/admin/handlers/trace.rs b/rustfs/src/admin/handlers/trace.rs index 8b1e0b84..1b9577a1 100644 --- a/rustfs/src/admin/handlers/trace.rs +++ b/rustfs/src/admin/handlers/trace.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::admin::router::Operation; use http::StatusCode; use hyper::Uri; use matchit::Params; @@ -20,8 +21,6 @@ use rustfs_madmin::service_commands::ServiceTraceOpts; use s3s::{Body, S3Request, S3Response, S3Result, s3_error}; use tracing::warn; -use crate::admin::router::Operation; - #[allow(dead_code)] fn extract_trace_options(uri: &Uri) -> S3Result { let mut st_opts = ServiceTraceOpts::default(); diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 01d4942c..22f6a881 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -22,6 +22,7 @@ pub mod utils; #[cfg(test)] mod console_test; +use crate::server::{ADMIN_PREFIX, HEALTH_PREFIX, PROFILE_CPU_PATH, PROFILE_MEMORY_PATH}; use handlers::{ GetReplicationMetricsHandler, HealthCheckHandler, IsAdminHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler, bucket_meta, @@ -37,17 +38,21 @@ use router::{AdminOperation, S3Router}; use rpc::register_rpc_route; use s3s::route::S3Route; -const ADMIN_PREFIX: &str = "/rustfs/admin"; -// const ADMIN_PREFIX: &str = "/minio/admin"; - +/// Create admin router +/// +/// # Arguments +/// * `console_enabled` - Whether the console is enabled +/// +/// # Returns +/// An instance of S3Route for admin operations pub fn make_admin_route(console_enabled: bool) -> std::io::Result { let mut r: S3Router = S3Router::new(console_enabled); // Health check endpoint for monitoring and orchestration - r.insert(Method::GET, "/health", AdminOperation(&HealthCheckHandler {}))?; - r.insert(Method::HEAD, "/health", AdminOperation(&HealthCheckHandler {}))?; - r.insert(Method::GET, "/profile/cpu", AdminOperation(&TriggerProfileCPU {}))?; - r.insert(Method::GET, "/profile/memory", AdminOperation(&TriggerProfileMemory {}))?; + r.insert(Method::GET, HEALTH_PREFIX, AdminOperation(&HealthCheckHandler {}))?; + r.insert(Method::HEAD, HEALTH_PREFIX, AdminOperation(&HealthCheckHandler {}))?; + r.insert(Method::GET, PROFILE_CPU_PATH, AdminOperation(&TriggerProfileCPU {}))?; + r.insert(Method::GET, PROFILE_MEMORY_PATH, AdminOperation(&TriggerProfileMemory {}))?; // 1 r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?; diff --git a/rustfs/src/admin/router.rs b/rustfs/src/admin/router.rs index fd3c3306..09c390cf 100644 --- a/rustfs/src/admin/router.rs +++ b/rustfs/src/admin/router.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::admin::ADMIN_PREFIX; use crate::admin::console::is_console_path; use crate::admin::console::make_console_server; -use crate::admin::rpc::RPC_PREFIX; +use crate::server::{ADMIN_PREFIX, HEALTH_PREFIX, PROFILE_CPU_PATH, PROFILE_MEMORY_PATH, RPC_PREFIX}; use hyper::HeaderMap; use hyper::Method; use hyper::StatusCode; @@ -86,12 +85,12 @@ where fn is_match(&self, method: &Method, uri: &Uri, headers: &HeaderMap, _: &mut Extensions) -> bool { let path = uri.path(); // Profiling endpoints - if method == Method::GET && (path == "/profile/cpu" || path == "/profile/memory") { + if method == Method::GET && (path == PROFILE_CPU_PATH || path == PROFILE_MEMORY_PATH) { return true; } // Health check - if (method == Method::HEAD || method == Method::GET) && path == "/health" { + if (method == Method::HEAD || method == Method::GET) && path == HEALTH_PREFIX { return true; } @@ -117,12 +116,12 @@ where let path = req.uri.path(); // Profiling endpoints - if req.method == Method::GET && (path == "/profile/cpu" || path == "/profile/memory") { + if req.method == Method::GET && (path == PROFILE_CPU_PATH || path == PROFILE_MEMORY_PATH) { return Ok(()); } // Health check - if (req.method == Method::HEAD || req.method == Method::GET) && path == "/health" { + if (req.method == Method::HEAD || req.method == Method::GET) && path == HEALTH_PREFIX { return Ok(()); } diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs index 7df37404..8098236d 100644 --- a/rustfs/src/admin/rpc.rs +++ b/rustfs/src/admin/rpc.rs @@ -15,6 +15,7 @@ use super::router::AdminOperation; use super::router::Operation; use super::router::S3Router; +use crate::server::RPC_PREFIX; use futures::StreamExt; use http::StatusCode; use hyper::Method; @@ -36,8 +37,6 @@ use tokio::io::AsyncWriteExt; use tokio_util::io::ReaderStream; use tracing::warn; -pub const RPC_PREFIX: &str = "/rustfs/rpc"; - pub fn register_rpc_route(r: &mut S3Router) -> std::io::Result<()> { r.insert( Method::GET, diff --git a/rustfs/src/auth.rs b/rustfs/src/auth.rs index cc2d24c2..79cb2922 100644 --- a/rustfs/src/auth.rs +++ b/rustfs/src/auth.rs @@ -66,7 +66,7 @@ const SIGN_V2_ALGORITHM: &str = "AWS "; const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; const STREAMING_CONTENT_SHA256: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; const STREAMING_CONTENT_SHA256_TRAILER: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER"; -pub const UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"; +pub(crate) const UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"; const ACTION_HEADER: &str = "Action"; const AMZ_CREDENTIAL: &str = "X-Amz-Credential"; const AMZ_ACCESS_KEY_ID: &str = "AWSAccessKeyId"; diff --git a/rustfs/src/init.rs b/rustfs/src/init.rs index 397829ea..1db6eca7 100644 --- a/rustfs/src/init.rs +++ b/rustfs/src/init.rs @@ -13,7 +13,8 @@ // limitations under the License. use crate::storage::ecfs::{process_lambda_configurations, process_queue_configurations, process_topic_configurations}; -use crate::{admin, config}; +use crate::{admin, config, version}; +use chrono::Datelike; use rustfs_config::{DEFAULT_UPDATE_CHECK, ENV_UPDATE_CHECK}; use rustfs_ecstore::bucket::metadata_sys; use rustfs_notify::notifier_global; @@ -23,6 +24,21 @@ use std::env; use std::io::Error; use tracing::{debug, error, info, instrument, warn}; +#[instrument] +pub(crate) fn print_server_info() { + let current_year = chrono::Utc::now().year(); + // Use custom macros to print server information + info!("RustFS Object Storage Server"); + info!("Copyright: 2024-{} RustFS, Inc", current_year); + info!("License: Apache-2.0 https://www.apache.org/licenses/LICENSE-2.0"); + info!("Version: {}", version::get_version()); + info!("Docs: https://rustfs.com/docs/"); +} + +/// Initialize the asynchronous update check system. +/// This function checks if update checking is enabled via +/// environment variable or default configuration. If enabled, +/// it spawns an asynchronous task to check for updates with a timeout. pub(crate) fn init_update_check() { let update_check_enable = env::var(ENV_UPDATE_CHECK) .unwrap_or_else(|_| DEFAULT_UPDATE_CHECK.to_string()) @@ -70,6 +86,12 @@ pub(crate) fn init_update_check() { }); } +/// Add existing bucket notification configurations to the global notifier system. +/// This function retrieves notification configurations for each bucket +/// and registers the corresponding event rules with the notifier system. +/// It processes queue, topic, and lambda configurations and maps them to event rules. +/// # Arguments +/// * `buckets` - A vector of bucket names to process #[instrument(skip_all)] pub(crate) async fn add_bucket_notification_configuration(buckets: Vec) { let region_opt = rustfs_ecstore::global::get_global_region(); @@ -128,6 +150,15 @@ pub(crate) async fn add_bucket_notification_configuration(buckets: Vec) } /// Initialize KMS system and configure if enabled +/// +/// This function initializes the global KMS service manager. If KMS is enabled +/// via command line options, it configures and starts the service accordingly. +/// If not enabled, it attempts to load any persisted KMS configuration from +/// cluster storage and starts the service if found. +/// # Arguments +/// * `opt` - The application configuration options +/// +/// Returns `std::io::Result<()>` indicating success or failure #[instrument(skip(opt))] pub(crate) async fn init_kms_system(opt: &config::Opt) -> std::io::Result<()> { // Initialize global KMS service manager (starts in NotConfigured state) diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 2ab33bc0..38a85518 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -25,19 +25,20 @@ mod update; mod version; // Ensure the correct path for parse_license is imported -use crate::init::{add_bucket_notification_configuration, init_buffer_profile_system, init_kms_system, init_update_check}; +use crate::init::{ + add_bucket_notification_configuration, init_buffer_profile_system, init_kms_system, init_update_check, print_server_info, +}; use crate::server::{ SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, init_cert, init_event_notifier, shutdown_event_notifier, start_audit_system, start_http_server, stop_audit_system, wait_for_shutdown, }; -use chrono::Datelike; use clap::Parser; use license::init_license; use rustfs_ahm::{ Scanner, create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager, scanner::data_scanner::ScannerConfig, shutdown_ahm_services, }; -use rustfs_common::globals::set_global_addr; +use rustfs_common::{GlobalReadiness, SystemStage, set_global_addr}; use rustfs_ecstore::{ StorageAPI, bucket::metadata_sys::init_bucket_metadata_sys, @@ -69,25 +70,6 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -const LOGO: &str = r#" - -░█▀▄░█░█░█▀▀░▀█▀░█▀▀░█▀▀ -░█▀▄░█░█░▀▀█░░█░░█▀▀░▀▀█ -░▀░▀░▀▀▀░▀▀▀░░▀░░▀░░░▀▀▀ - -"#; - -#[instrument] -fn print_server_info() { - let current_year = chrono::Utc::now().year(); - // Use custom macros to print server information - info!("RustFS Object Storage Server"); - info!("Copyright: 2024-{} RustFS, Inc", current_year); - info!("License: Apache-2.0 https://www.apache.org/licenses/LICENSE-2.0"); - info!("Version: {}", version::get_version()); - info!("Docs: https://rustfs.com/docs/"); -} - fn main() -> Result<()> { let runtime = server::get_tokio_runtime_builder() .build() @@ -120,7 +102,7 @@ async fn async_main() -> Result<()> { } // print startup logo - info!("{}", LOGO); + info!("{}", server::LOGO); // Initialize performance profiling if enabled profiling::init_from_env().await; @@ -143,6 +125,8 @@ async fn async_main() -> Result<()> { #[instrument(skip(opt))] async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); + // 1. Initialize global readiness tracker + let readiness = Arc::new(GlobalReadiness::new()); if let Some(region) = &opt.region { rustfs_ecstore::global::set_global_region(region.clone()); @@ -214,14 +198,14 @@ async fn run(opt: config::Opt) -> Result<()> { let s3_shutdown_tx = { let mut s3_opt = opt.clone(); s3_opt.console_enable = false; - let s3_shutdown_tx = start_http_server(&s3_opt, state_manager.clone()).await?; + let s3_shutdown_tx = start_http_server(&s3_opt, state_manager.clone(), readiness.clone()).await?; Some(s3_shutdown_tx) }; let console_shutdown_tx = if opt.console_enable && !opt.console_address.is_empty() { let mut console_opt = opt.clone(); console_opt.address = console_opt.console_address.clone(); - let console_shutdown_tx = start_http_server(&console_opt, state_manager.clone()).await?; + let console_shutdown_tx = start_http_server(&console_opt, state_manager.clone(), readiness.clone()).await?; Some(console_shutdown_tx) } else { None @@ -236,6 +220,7 @@ async fn run(opt: config::Opt) -> Result<()> { let ctx = CancellationToken::new(); // init store + // 2. Start Storage Engine (ECStore) let store = ECStore::new(server_addr, endpoint_pools.clone(), ctx.clone()) .await .inspect_err(|err| { @@ -243,9 +228,9 @@ async fn run(opt: config::Opt) -> Result<()> { })?; ecconfig::init(); - // config system configuration, wait for 1 second if failed - let mut retry_count = 0; + // // Initialize global configuration system + let mut retry_count = 0; while let Err(e) = GLOBAL_CONFIG_SYS.init(store.clone()).await { error!("GLOBAL_CONFIG_SYS.init failed {:?}", e); // TODO: check error type @@ -255,8 +240,8 @@ async fn run(opt: config::Opt) -> Result<()> { } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } - - // init replication_pool + readiness.mark_stage(SystemStage::StorageReady); + // init replication_pool init_background_replication(store.clone()).await; // Initialize KMS system if enabled init_kms_system(&opt).await?; @@ -289,7 +274,10 @@ async fn run(opt: config::Opt) -> Result<()> { init_bucket_metadata_sys(store.clone(), buckets.clone()).await; + // 3. Initialize IAM System (Blocking load) + // This ensures data is in memory before moving forward init_iam_sys(store.clone()).await.map_err(Error::other)?; + readiness.mark_stage(SystemStage::IamReady); add_bucket_notification_configuration(buckets.clone()).await; @@ -341,6 +329,15 @@ async fn run(opt: config::Opt) -> Result<()> { init_update_check(); + println!( + "RustFS server started successfully at {}, current time: {}", + &server_address, + chrono::offset::Utc::now().to_string() + ); + info!(target: "rustfs::main::run","server started successfully at {}", &server_address); + // 4. Mark as Full Ready now that critical components are warm + readiness.mark_stage(SystemStage::FullReady); + // Perform hibernation for 1 second tokio::time::sleep(SHUTDOWN_TIMEOUT).await; // listen to the shutdown signal diff --git a/rustfs/src/server/cert.rs b/rustfs/src/server/cert.rs index 6dba5c05..93013be0 100644 --- a/rustfs/src/server/cert.rs +++ b/rustfs/src/server/cert.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rustfs_common::globals::set_global_root_cert; +use rustfs_common::set_global_root_cert; use rustfs_config::{RUSTFS_CA_CERT, RUSTFS_PUBLIC_CERT, RUSTFS_TLS_CERT}; use tracing::{debug, info}; diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index 2b4484cb..53a03bca 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -17,7 +17,7 @@ use super::compress::{CompressionConfig, CompressionPredicate}; use crate::admin; use crate::auth::IAMAuth; use crate::config; -use crate::server::{ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer}; +use crate::server::{ReadinessGateLayer, ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer}; use crate::storage; use crate::storage::tonic_service::make_server; use bytes::Bytes; @@ -29,6 +29,7 @@ use hyper_util::{ service::TowerToHyperService, }; use metrics::{counter, histogram}; +use rustfs_common::GlobalReadiness; use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_utils::net::parse_and_resolve_address; @@ -112,6 +113,7 @@ fn get_cors_allowed_origins() -> String { pub async fn start_http_server( opt: &config::Opt, worker_state_manager: ServiceStateManager, + readiness: Arc, ) -> Result> { let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?; let server_port = server_addr.port(); @@ -208,7 +210,7 @@ pub async fn start_http_server( println!("Console WebUI (localhost): {protocol}://127.0.0.1:{server_port}/rustfs/console/index.html",); } else { info!(target: "rustfs::main::startup","RustFS API: {api_endpoints} {localhost_endpoint}"); - println!("RustFS API: {api_endpoints} {localhost_endpoint}"); + println!("RustFS Http API: {api_endpoints} {localhost_endpoint}"); println!("RustFS Start Time: {now_time}"); if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) { warn!( @@ -388,6 +390,7 @@ pub async fn start_http_server( cors_layer: cors_layer.clone(), compression_config: compression_config.clone(), is_console, + readiness: readiness.clone(), }; process_connection(socket, tls_acceptor.clone(), connection_ctx, graceful.clone()); @@ -490,6 +493,7 @@ struct ConnectionContext { cors_layer: CorsLayer, compression_config: CompressionConfig, is_console: bool, + readiness: Arc, } /// Process a single incoming TCP connection. @@ -513,6 +517,7 @@ fn process_connection( cors_layer, compression_config, is_console, + readiness, } = context; // Build services inside each connected task to avoid passing complex service types across tasks, @@ -523,6 +528,9 @@ fn process_connection( let hybrid_service = ServiceBuilder::new() .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid)) .layer(CatchPanicLayer::new()) + // CRITICAL: Insert ReadinessGateLayer before business logic + // This stops requests from hitting IAMAuth or Storage if they are not ready. + .layer(ReadinessGateLayer::new(readiness)) .layer( TraceLayer::new_for_http() .make_span_with(|request: &HttpRequest<_>| { diff --git a/rustfs/src/server/mod.rs b/rustfs/src/server/mod.rs index 630f6f94..28af0093 100644 --- a/rustfs/src/server/mod.rs +++ b/rustfs/src/server/mod.rs @@ -19,6 +19,8 @@ mod event; mod http; mod hybrid; mod layer; +mod prefix; +mod readiness; mod runtime; mod service_state; @@ -26,6 +28,8 @@ pub(crate) use audit::{start_audit_system, stop_audit_system}; pub(crate) use cert::init_cert; pub(crate) use event::{init_event_notifier, shutdown_event_notifier}; pub(crate) use http::start_http_server; +pub(crate) use prefix::*; +pub(crate) use readiness::ReadinessGateLayer; pub(crate) use runtime::get_tokio_runtime_builder; pub(crate) use service_state::SHUTDOWN_TIMEOUT; pub(crate) use service_state::ServiceState; diff --git a/rustfs/src/server/prefix.rs b/rustfs/src/server/prefix.rs new file mode 100644 index 00000000..bdb8216a --- /dev/null +++ b/rustfs/src/server/prefix.rs @@ -0,0 +1,55 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Predefined CPU profiling path for RustFS server. +/// This path is used to access CPU profiling data. +pub(crate) const PROFILE_CPU_PATH: &str = "/profile/cpu"; + +/// This path is used to access memory profiling data. +pub(crate) const PROFILE_MEMORY_PATH: &str = "/profile/memory"; + +/// Favicon path to handle browser requests for the favicon. +/// This path serves the favicon.ico file. +pub(crate) const FAVICON_PATH: &str = "/favicon.ico"; + +/// Predefined health check path for RustFS server. +/// This path is used to check the health status of the server. +pub(crate) const HEALTH_PREFIX: &str = "/health"; + +/// Predefined administrative prefix for RustFS server routes. +/// This prefix is used for endpoints that handle administrative tasks +/// such as configuration, monitoring, and management. +pub(crate) const ADMIN_PREFIX: &str = "/rustfs/admin"; + +/// Environment variable name for overriding the default +/// administrative prefix path. +pub(crate) const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3"; + +/// Predefined console prefix for RustFS server routes. +/// This prefix is used for endpoints that handle console-related tasks +/// such as user interface and management. +pub(crate) const CONSOLE_PREFIX: &str = "/rustfs/console"; + +/// Predefined RPC prefix for RustFS server routes. +/// This prefix is used for endpoints that handle remote procedure calls (RPC). +pub(crate) const RPC_PREFIX: &str = "/rustfs/rpc"; + +/// LOGO art for RustFS server. +pub(crate) const LOGO: &str = r#" + +░█▀▄░█░█░█▀▀░▀█▀░█▀▀░█▀▀ +░█▀▄░█░█░▀▀█░░█░░█▀▀░▀▀█ +░▀░▀░▀▀▀░▀▀▀░░▀░░▀░░░▀▀▀ + +"#; diff --git a/rustfs/src/server/readiness.rs b/rustfs/src/server/readiness.rs new file mode 100644 index 00000000..a79ad083 --- /dev/null +++ b/rustfs/src/server/readiness.rs @@ -0,0 +1,129 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use http::{Request as HttpRequest, Response, StatusCode}; +use http_body::Body; +use http_body_util::{BodyExt, Full}; +use hyper::body::Incoming; +use rustfs_common::GlobalReadiness; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +/// ReadinessGateLayer ensures that the system components (IAM, Storage) +/// are fully initialized before allowing any request to proceed. +#[derive(Clone)] +pub struct ReadinessGateLayer { + readiness: Arc, +} + +impl ReadinessGateLayer { + /// Create a new ReadinessGateLayer + /// # Arguments + /// * `readiness` - An Arc to the GlobalReadiness instance + /// + /// # Returns + /// A new instance of ReadinessGateLayer + pub fn new(readiness: Arc) -> Self { + Self { readiness } + } +} + +impl Layer for ReadinessGateLayer { + type Service = ReadinessGateService; + + /// Wrap the inner service with ReadinessGateService + /// # Arguments + /// * `inner` - The inner service to wrap + /// # Returns + /// An instance of ReadinessGateService + fn layer(&self, inner: S) -> Self::Service { + ReadinessGateService { + inner, + readiness: self.readiness.clone(), + } + } +} + +#[derive(Clone)] +pub struct ReadinessGateService { + inner: S, + readiness: Arc, +} + +type BoxError = Box; +type BoxBody = http_body_util::combinators::UnsyncBoxBody; +impl Service> for ReadinessGateService +where + S: Service, Response = Response> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Send + 'static, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, +{ + type Response = Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: HttpRequest) -> Self::Future { + let mut inner = self.inner.clone(); + let readiness = self.readiness.clone(); + Box::pin(async move { + let path = req.uri().path(); + // 1) Exact match: fixed probe/resource path + let is_exact_probe = matches!( + path, + crate::server::PROFILE_MEMORY_PATH + | crate::server::PROFILE_CPU_PATH + | crate::server::HEALTH_PREFIX + | crate::server::FAVICON_PATH + ); + + // 2) Prefix matching: the entire set of route prefixes (including their subpaths) + let is_prefix_probe = path.starts_with(crate::server::RUSTFS_ADMIN_PREFIX) + || path.starts_with(crate::server::CONSOLE_PREFIX) + || path.starts_with(crate::server::RPC_PREFIX) + || path.starts_with(crate::server::ADMIN_PREFIX); + + let is_probe = is_exact_probe || is_prefix_probe; + if !is_probe && !readiness.is_ready() { + let body: BoxBody = Full::new(Bytes::from_static(b"Service not ready")) + .map_err(|e| -> BoxError { Box::new(e) }) + .boxed_unsync(); + + let resp = Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .header(http::header::RETRY_AFTER, "5") + .header(http::header::CONTENT_TYPE, "text/plain; charset=utf-8") + .header(http::header::CACHE_CONTROL, "no-store") + .body(body) + .expect("failed to build not ready response"); + return Ok(resp); + } + let resp = inner.call(req).await?; + // System is ready, forward to the actual S3/RPC handlers + // Transparently converts any response body into a BoxBody, and then Trace/Cors/Compression continues to work + let (parts, body) = resp.into_parts(); + let body: BoxBody = body.map_err(Into::into).boxed_unsync(); + Ok(Response::from_parts(parts, body)) + }) + } +} diff --git a/rustfs/src/storage/tonic_service.rs b/rustfs/src/storage/tonic_service.rs index 5ca8ab22..c0f86cb4 100644 --- a/rustfs/src/storage/tonic_service.rs +++ b/rustfs/src/storage/tonic_service.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use futures::Stream; use futures_util::future::join_all; use rmp_serde::{Deserializer, Serializer}; -use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::HealOpts}; +use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, heal_channel::HealOpts}; use rustfs_ecstore::{ admin_server_info::get_local_server_property, bucket::{metadata::load_bucket_metadata, metadata_sys},