Inject GlobalReadiness into HTTP server pipeline and gate traffic until FullReady (#1255)

This commit is contained in:
houseme
2025-12-25 00:19:03 +08:00
committed by GitHub
parent 7e75c9b1f5
commit 82a6e78845
36 changed files with 668 additions and 129 deletions

51
Cargo.lock generated
View File

@@ -216,9 +216,12 @@ dependencies = [
[[package]]
name = "arc-swap"
version = "1.7.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e"
dependencies = [
"rustversion",
]
[[package]]
name = "argon2"
@@ -691,9 +694,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.118.0"
version = "1.119.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3e6b7079f85d9ea9a70643c9f89f50db70f5ada868fa9cfe08c1ffdf51abc13"
checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -942,9 +945,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.9.6"
version = "1.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247"
checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -6851,22 +6854,19 @@ dependencies = [
[[package]]
name = "rmp"
version = "0.8.14"
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.3.0"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155"
dependencies = [
"byteorder",
"rmp",
"serde",
]
@@ -7040,6 +7040,7 @@ dependencies = [
"hex-simd",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.8.1",
"hyper-util",
"jemalloc_pprof",
@@ -7753,9 +7754,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e"
checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34"
dependencies = [
"bitflags 2.10.0",
"errno",
@@ -8123,15 +8124,15 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.146"
version = "1.0.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "217ca874ae0207aac254aa02c957ded05585a90892cc8d87f9e5fa49669dadd8"
checksum = "6af14725505314343e673e9ecb7cd7e8a36aa9791eb936235a3567cc31447ae4"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
"serde_core",
"zmij",
]
[[package]]
@@ -8851,14 +8852,14 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.23.0"
version = "3.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c"
dependencies = [
"fastrand",
"getrandom 0.3.4",
"once_cell",
"rustix 1.1.2",
"rustix 1.1.3",
"windows-sys 0.61.2",
]
@@ -10248,7 +10249,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156"
dependencies = [
"libc",
"rustix 1.1.2",
"rustix 1.1.3",
]
[[package]]
@@ -10429,6 +10430,12 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3"
[[package]]
name = "zmij"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e404bcd8afdaf006e529269d3e85a743f9480c3cef60034d77860d02964f3ba"
[[package]]
name = "zopfli"
version = "0.8.3"

View File

@@ -109,6 +109,7 @@ hyper-rustls = { version = "0.27.7", default-features = false, features = ["nati
hyper-util = { version = "0.1.19", features = ["tokio", "server-auto", "server-graceful"] }
http = "1.4.0"
http-body = "1.0.1"
http-body-util = "0.1.3"
reqwest = { version = "0.12.28", default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
socket2 = "0.6.1"
tokio = { version = "1.48.0", features = ["fs", "rt-multi-thread"] }
@@ -131,10 +132,10 @@ form_urlencoded = "1.2.2"
prost = "0.14.1"
quick-xml = "0.38.4"
rmcp = { version = "0.12.0" }
rmp = { version = "0.8.14" }
rmp-serde = { version = "1.3.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.146", features = ["raw_value"] }
serde_json = { version = "1.0.147", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
schemars = "1.1.0"
@@ -163,13 +164,13 @@ time = { version = "0.3.44", features = ["std", "parsing", "formatting", "macros
# Utilities and Tools
anyhow = "1.0.100"
arc-swap = "1.7.1"
arc-swap = "1.8.0"
astral-tokio-tar = "0.5.6"
atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.12" }
aws-credential-types = { version = "1.2.11" }
aws-sdk-s3 = { version = "1.117.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "rustls", "rt-tokio"] }
aws-smithy-types = { version = "1.3.5" }
base64 = "0.22.1"
base64-simd = "0.8.0"
@@ -235,7 +236,7 @@ strum = { version = "0.27.2", features = ["derive"] }
sysctl = "0.7.1"
sysinfo = "0.37.2"
temp-env = "0.3.6"
tempfile = "3.23.0"
tempfile = "3.24.0"
test-case = "3.3.1"
thiserror = "2.0.17"
tracing = { version = "0.1.44" }

View File

@@ -19,6 +19,10 @@ pub mod globals;
pub mod heal_channel;
pub mod last_minute;
pub mod metrics;
mod readiness;
pub use globals::*;
pub use readiness::{GlobalReadiness, SystemStage};
// is ','
pub static DEFAULT_DELIMITER: u8 = 44;

View File

@@ -0,0 +1,136 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU8, Ordering};
/// Represents the various stages of system startup
#[repr(u8)]
pub enum SystemStage {
Booting = 0,
StorageReady = 1, // Disks online, Quorum met
IamReady = 2, // Users and Policies loaded into cache
FullReady = 3, // System ready to serve all traffic
}
/// Global readiness tracker for the service
/// This struct uses atomic operations to track the readiness status of various components
/// of the service in a thread-safe manner.
pub struct GlobalReadiness {
status: AtomicU8,
}
impl Default for GlobalReadiness {
fn default() -> Self {
Self::new()
}
}
impl GlobalReadiness {
/// Create a new GlobalReadiness instance with initial status as Starting
/// # Returns
/// A new instance of GlobalReadiness
pub fn new() -> Self {
Self {
status: AtomicU8::new(SystemStage::Booting as u8),
}
}
/// Update the system to a new stage
///
/// # Arguments
/// * `step` - The SystemStage step to mark as ready
pub fn mark_stage(&self, step: SystemStage) {
self.status.fetch_max(step as u8, Ordering::SeqCst);
}
/// Check if the service is fully ready
/// # Returns
/// `true` if the service is fully ready, `false` otherwise
pub fn is_ready(&self) -> bool {
self.status.load(Ordering::SeqCst) == SystemStage::FullReady as u8
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_initial_state() {
let readiness = GlobalReadiness::new();
assert!(!readiness.is_ready());
assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::Booting as u8);
}
#[test]
fn test_mark_stage_progression() {
let readiness = GlobalReadiness::new();
readiness.mark_stage(SystemStage::StorageReady);
assert!(!readiness.is_ready());
assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::StorageReady as u8);
readiness.mark_stage(SystemStage::IamReady);
assert!(!readiness.is_ready());
assert_eq!(readiness.status.load(Ordering::SeqCst), SystemStage::IamReady as u8);
readiness.mark_stage(SystemStage::FullReady);
assert!(readiness.is_ready());
}
#[test]
fn test_no_regression() {
let readiness = GlobalReadiness::new();
readiness.mark_stage(SystemStage::FullReady);
readiness.mark_stage(SystemStage::IamReady); // Should not regress
assert!(readiness.is_ready());
}
#[test]
fn test_concurrent_marking() {
let readiness = Arc::new(GlobalReadiness::new());
let mut handles = vec![];
for _ in 0..10 {
let r = Arc::clone(&readiness);
handles.push(thread::spawn(move || {
r.mark_stage(SystemStage::StorageReady);
r.mark_stage(SystemStage::IamReady);
r.mark_stage(SystemStage::FullReady);
}));
}
for h in handles {
h.join().unwrap();
}
assert!(readiness.is_ready());
}
#[test]
fn test_is_ready_only_at_full_ready() {
let readiness = GlobalReadiness::new();
assert!(!readiness.is_ready());
readiness.mark_stage(SystemStage::StorageReady);
assert!(!readiness.is_ready());
readiness.mark_stage(SystemStage::IamReady);
assert!(!readiness.is_ready());
readiness.mark_stage(SystemStage::FullReady);
assert!(readiness.is_ready());
}
}

View File

@@ -23,7 +23,7 @@ use crate::{
};
use crate::data_usage::load_data_usage_cache;
use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState};
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState};
use rustfs_madmin::{
BackendDisks, Disk, ErasureSetInfo, ITEM_INITIALIZING, ITEM_OFFLINE, ITEM_ONLINE, InfoMessage, ServerProperties,
};

View File

@@ -19,11 +19,7 @@ use crate::{
// utils::os::get_drive_stats,
};
use chrono::Utc;
use rustfs_common::{
globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_ADDR},
heal_channel::DriveState,
metrics::global_metrics,
};
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_ADDR, heal_channel::DriveState, metrics::global_metrics};
use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};
use rustfs_utils::os::get_drive_stats;
use serde::{Deserialize, Serialize};

View File

@@ -40,7 +40,7 @@ use futures::future::join_all;
use http::HeaderMap;
use rustfs_common::heal_channel::HealOpts;
use rustfs_common::{
globals::GLOBAL_LOCAL_NODE_NAME,
GLOBAL_LOCAL_NODE_NAME,
heal_channel::{DriveState, HealItemType},
};
use rustfs_filemeta::FileInfo;

View File

@@ -55,8 +55,8 @@ use futures::future::join_all;
use http::HeaderMap;
use lazy_static::lazy_static;
use rand::Rng as _;
use rustfs_common::globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT};
use rustfs_common::heal_channel::{HealItemType, HealOpts};
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT};
use rustfs_filemeta::FileInfo;
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf};

View File

@@ -109,6 +109,9 @@ pub enum Error {
#[error("io error: {0}")]
Io(std::io::Error),
#[error("system already initialized")]
IamSysAlreadyInitialized,
}
impl PartialEq for Error {
@@ -162,6 +165,7 @@ impl Clone for Error {
Error::PolicyTooLarge => Error::PolicyTooLarge,
Error::ConfigNotFound => Error::ConfigNotFound,
Error::Io(e) => Error::Io(std::io::Error::new(e.kind(), e.to_string())),
Error::IamSysAlreadyInitialized => Error::IamSysAlreadyInitialized,
}
}
}
@@ -226,6 +230,7 @@ impl From<rustfs_policy::error::Error> for Error {
rustfs_policy::error::Error::StringError(s) => Error::StringError(s),
rustfs_policy::error::Error::CryptoError(e) => Error::CryptoError(e),
rustfs_policy::error::Error::ErrCredMalformed => Error::ErrCredMalformed,
rustfs_policy::error::Error::IamSysAlreadyInitialized => Error::IamSysAlreadyInitialized,
}
}
}

View File

@@ -18,30 +18,58 @@ use rustfs_ecstore::store::ECStore;
use std::sync::{Arc, OnceLock};
use store::object::ObjectStore;
use sys::IamSys;
use tracing::{debug, instrument};
use tracing::{error, info, instrument};
pub mod cache;
pub mod error;
pub mod manager;
pub mod store;
pub mod utils;
pub mod sys;
pub mod utils;
static IAM_SYS: OnceLock<Arc<IamSys<ObjectStore>>> = OnceLock::new();
#[instrument(skip(ecstore))]
pub async fn init_iam_sys(ecstore: Arc<ECStore>) -> Result<()> {
debug!("init iam system");
let s = IamCache::new(ObjectStore::new(ecstore)).await;
if IAM_SYS.get().is_some() {
info!("IAM system already initialized, skipping.");
return Ok(());
}
IAM_SYS.get_or_init(move || IamSys::new(s).into());
info!("Starting IAM system initialization sequence...");
// 1. Create the persistent storage adapter
let storage_adapter = ObjectStore::new(ecstore);
// 2. Create the cache manager.
// The `new` method now performs a blocking initial load from disk.
let cache_manager = IamCache::new(storage_adapter).await;
// 3. Construct the system interface
let iam_instance = Arc::new(IamSys::new(cache_manager));
// 4. Securely set the global singleton
if IAM_SYS.set(iam_instance).is_err() {
error!("Critical: Race condition detected during IAM initialization!");
return Err(Error::IamSysAlreadyInitialized);
}
info!("IAM system initialization completed successfully.");
Ok(())
}
#[inline]
pub fn get() -> Result<Arc<IamSys<ObjectStore>>> {
IAM_SYS.get().map(Arc::clone).ok_or(Error::IamSysNotInitialized)
let sys = IAM_SYS.get().map(Arc::clone).ok_or(Error::IamSysNotInitialized)?;
// Double-check the internal readiness state. The OnceLock is only set
// after initialization and data loading complete, so this is a defensive
// guard to ensure callers never operate on a partially initialized system.
if !sys.is_ready() {
return Err(Error::IamSysNotInitialized);
}
Ok(sys)
}
pub fn get_global_iam_sys() -> Option<Arc<IamSys<ObjectStore>>> {

View File

@@ -37,6 +37,7 @@ use rustfs_policy::{
use rustfs_utils::path::path_join_buf;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::atomic::AtomicU8;
use std::{
collections::{HashMap, HashSet},
sync::{
@@ -76,9 +77,19 @@ fn get_iam_format_file_path() -> String {
path_join_buf(&[&IAM_CONFIG_PREFIX, IAM_FORMAT_FILE])
}
#[repr(u8)]
#[derive(Debug, PartialEq)]
pub enum IamState {
Uninitialized = 0,
Loading = 1,
Ready = 2,
Error = 3,
}
pub struct IamCache<T> {
pub cache: Cache,
pub api: T,
pub state: Arc<AtomicU8>,
pub loading: Arc<AtomicBool>,
pub roles: HashMap<ARN, Vec<String>>,
pub send_chan: Sender<i64>,
@@ -89,12 +100,19 @@ impl<T> IamCache<T>
where
T: Store,
{
/// Create a new IAM system instance
/// # Arguments
/// * `api` - The storage backend implementing the Store trait
///
/// # Returns
/// An Arc-wrapped instance of IamSystem
pub(crate) async fn new(api: T) -> Arc<Self> {
let (sender, receiver) = mpsc::channel::<i64>(100);
let sys = Arc::new(Self {
api,
cache: Cache::default(),
state: Arc::new(AtomicU8::new(IamState::Uninitialized as u8)),
loading: Arc::new(AtomicBool::new(false)),
send_chan: sender,
roles: HashMap::new(),
@@ -105,10 +123,32 @@ where
sys
}
/// Initialize the IAM system
async fn init(self: Arc<Self>, receiver: Receiver<i64>) -> Result<()> {
self.state.store(IamState::Loading as u8, Ordering::SeqCst);
// Ensure the IAM format file is persisted first
self.clone().save_iam_formatter().await?;
self.clone().load().await?;
// Critical: Load all existing users/policies into memory cache
const MAX_RETRIES: usize = 3;
for attempt in 0..MAX_RETRIES {
if let Err(e) = self.clone().load().await {
if attempt == MAX_RETRIES - 1 {
self.state.store(IamState::Error as u8, Ordering::SeqCst);
error!("IAM fail to load initial data after {} attempts: {:?}", MAX_RETRIES, e);
return Err(e);
} else {
warn!("IAM load failed, retrying... attempt {}", attempt + 1);
tokio::time::sleep(Duration::from_secs(1)).await;
}
} else {
break;
}
}
self.state.store(IamState::Ready as u8, Ordering::SeqCst);
info!("IAM System successfully initialized and marked as READY");
// Background ticker for synchronization
// Check if environment variable is set
let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK").is_ok();
@@ -152,6 +192,11 @@ where
Ok(())
}
/// Check if IAM system is ready
pub fn is_ready(&self) -> bool {
self.state.load(Ordering::SeqCst) == IamState::Ready as u8
}
async fn _notify(&self) {
self.send_chan.send(OffsetDateTime::now_utc().unix_timestamp()).await.unwrap();
}

View File

@@ -38,7 +38,7 @@ use std::sync::LazyLock;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::{self, Sender};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use tracing::{debug, error, info, warn};
pub static IAM_CONFIG_PREFIX: LazyLock<String> = LazyLock::new(|| format!("{RUSTFS_CONFIG_PREFIX}/iam"));
pub static IAM_CONFIG_USERS_PREFIX: LazyLock<String> = LazyLock::new(|| format!("{RUSTFS_CONFIG_PREFIX}/iam/users/"));
@@ -341,6 +341,27 @@ impl ObjectStore {
Ok(policies)
}
/// Checks if the underlying ECStore is ready for metadata operations.
/// This prevents silent failures during the storage boot-up phase.
///
/// Performs a lightweight probe by attempting to read a known configuration object.
/// If the object is not found, it indicates the storage metadata is not ready.
/// The upper-level caller should handle retries if needed.
async fn check_storage_readiness(&self) -> Result<()> {
// Probe path for a fixed object under the IAM root prefix.
// If it doesn't exist, the system bucket or metadata is not ready.
let probe_path = format!("{}/format.json", *IAM_CONFIG_PREFIX);
match read_config(self.object_api.clone(), &probe_path).await {
Ok(_) => Ok(()),
Err(rustfs_ecstore::error::StorageError::ConfigNotFound) => Err(Error::other(format!(
"Storage metadata not ready: probe object '{}' not found (expected IAM config to be initialized)",
probe_path
))),
Err(e) => Err(e.into()),
}
}
// async fn load_policy(&self, name: &str) -> Result<PolicyDoc> {
// let mut policy = self
// .load_iam_config::<PolicyDoc>(&format!("config/iam/policies/{name}/policy.json"))
@@ -398,13 +419,50 @@ impl Store for ObjectStore {
Ok(serde_json::from_slice(&data)?)
}
/// Saves IAM configuration with a retry mechanism on failure.
///
/// Attempts to save the IAM configuration up to 5 times if the storage layer is not ready,
/// using exponential backoff between attempts (starting at 200ms, doubling each retry).
///
/// # Arguments
///
/// * `item` - The IAM configuration item to save, must implement `Serialize` and `Send`.
/// * `path` - The path where the configuration will be saved.
///
/// # Returns
///
/// * `Result<()>` - `Ok(())` on success, or an `Error` if all attempts fail.
#[tracing::instrument(level = "debug", skip(self, item, path))]
async fn save_iam_config<Item: Serialize + Send>(&self, item: Item, path: impl AsRef<str> + Send) -> Result<()> {
let mut data = serde_json::to_vec(&item)?;
data = Self::encrypt_data(&data)?;
save_config(self.object_api.clone(), path.as_ref(), data).await?;
Ok(())
let mut attempts = 0;
let max_attempts = 5;
let path_ref = path.as_ref();
loop {
match save_config(self.object_api.clone(), path_ref, data.clone()).await {
Ok(_) => {
debug!("Successfully saved IAM config to {}", path_ref);
return Ok(());
}
Err(e) if attempts < max_attempts => {
attempts += 1;
// Exponential backoff: 200ms, 400ms, 800ms...
let wait_ms = 200 * (1 << attempts);
warn!(
"Storage layer not ready for IAM write (attempt {}/{}). Retrying in {}ms. Path: {}, Error: {:?}",
attempts, max_attempts, wait_ms, path_ref, e
);
tokio::time::sleep(std::time::Duration::from_millis(wait_ms)).await;
}
Err(e) => {
error!("Final failure saving IAM config to {}: {:?}", path_ref, e);
return Err(e.into());
}
}
}
}
async fn delete_iam_config(&self, path: impl AsRef<str> + Send) -> Result<()> {
delete_config(self.object_api.clone(), path.as_ref()).await?;
@@ -418,8 +476,16 @@ impl Store for ObjectStore {
user_identity: UserIdentity,
_ttl: Option<usize>,
) -> Result<()> {
self.save_iam_config(user_identity, get_user_identity_path(name, user_type))
.await
// Pre-check storage health
self.check_storage_readiness().await?;
let path = get_user_identity_path(name, user_type);
debug!("Saving IAM identity to path: {}", path);
self.save_iam_config(user_identity, path).await.map_err(|e| {
error!("ObjectStore save failure for {}: {:?}", name, e);
e
})
}
async fn delete_user_identity(&self, name: &str, user_type: UserType) -> Result<()> {
self.delete_iam_config(get_user_identity_path(name, user_type))

View File

@@ -67,6 +67,13 @@ pub struct IamSys<T> {
}
impl<T: Store> IamSys<T> {
/// Create a new IamSys instance with the given IamCache store
///
/// # Arguments
/// * `store` - An Arc to the IamCache instance
///
/// # Returns
/// A new instance of IamSys
pub fn new(store: Arc<IamCache<T>>) -> Self {
tokio::spawn(async move {
match opa::lookup_config().await {
@@ -87,6 +94,11 @@ impl<T: Store> IamSys<T> {
roles_map: HashMap::new(),
}
}
/// Check if the IamSys has a watcher configured
///
/// # Returns
/// `true` if a watcher is configured, `false` otherwise
pub fn has_watcher(&self) -> bool {
self.store.api.has_watcher()
}
@@ -859,6 +871,11 @@ impl<T: Store> IamSys<T> {
self.get_combined_policy(&policies).await.is_allowed(args).await
}
/// Check if the underlying store is ready
pub fn is_ready(&self) -> bool {
self.store.is_ready()
}
}
fn is_allowed_by_session_policy(args: &Args<'_>) -> (bool, bool) {

View File

@@ -89,6 +89,7 @@ pub enum Error {
#[error("invalid access_key")]
InvalidAccessKey,
#[error("action not allowed")]
IAMActionNotAllowed,
@@ -106,6 +107,9 @@ pub enum Error {
#[error("io error: {0}")]
Io(std::io::Error),
#[error("system already initialized")]
IamSysAlreadyInitialized,
}
impl Error {

View File

@@ -16,7 +16,7 @@
mod generated;
use proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_common::globals::{GLOBAL_CONN_MAP, GLOBAL_ROOT_CERT, evict_connection};
use rustfs_common::{GLOBAL_CONN_MAP, GLOBAL_ROOT_CERT, evict_connection};
use std::{error::Error, time::Duration};
use tonic::{
Request, Status,

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use datafusion::{common::DataFusionError, sql::sqlparser::parser::ParserError};
use snafu::{Backtrace, Location, Snafu};
use std::fmt::Display;
pub mod object_store;
pub mod query;

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use s3s::dto::SelectObjectContentInput;
use std::sync::Arc;
pub mod analyzer;
pub mod ast;

View File

@@ -12,20 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::query::Context;
use crate::{QueryError, QueryResult, object_store::EcObjectStore};
use datafusion::{
execution::{SessionStateBuilder, context::SessionState, runtime_env::RuntimeEnvBuilder},
parquet::data_type::AsBytes,
prelude::SessionContext,
};
use object_store::{ObjectStore, memory::InMemory, path::Path};
use std::sync::Arc;
use tracing::error;
use crate::{QueryError, QueryResult, object_store::EcObjectStore};
use super::Context;
#[derive(Clone)]
pub struct SessionCtx {
_desc: Arc<SessionCtxDesc>,

View File

@@ -72,6 +72,7 @@ hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
http-body-util.workspace = true
reqwest = { workspace = true }
socket2 = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal", "process", "io-util"] }

View File

@@ -1,6 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::auth::get_condition_values;
use http::HeaderMap;
use rustfs_iam::store::object::ObjectStore;
use rustfs_iam::sys::IamSys;
@@ -9,8 +21,8 @@ use rustfs_policy::policy::Args;
use rustfs_policy::policy::action::Action;
use s3s::S3Result;
use s3s::s3_error;
use crate::auth::get_condition_values;
use std::collections::HashMap;
use std::sync::Arc;
pub async fn validate_admin_request(
headers: &HeaderMap,

View File

@@ -14,6 +14,7 @@
use crate::config::build;
use crate::license::get_license;
use crate::server::{CONSOLE_PREFIX, FAVICON_PATH, HEALTH_PREFIX, RUSTFS_ADMIN_PREFIX};
use axum::{
Router,
body::Body,
@@ -45,9 +46,6 @@ use tower_http::timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, instrument, warn};
pub(crate) const CONSOLE_PREFIX: &str = "/rustfs/console";
const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3";
#[derive(RustEmbed)]
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct StaticFiles;
@@ -457,7 +455,7 @@ fn get_console_config_from_env() -> (bool, u32, u64, String) {
/// # Returns:
/// - `true` if the path is for console access, `false` otherwise.
pub fn is_console_path(path: &str) -> bool {
path == "/favicon.ico" || path.starts_with(CONSOLE_PREFIX)
path == FAVICON_PATH || path.starts_with(CONSOLE_PREFIX)
}
/// Setup comprehensive middleware stack with tower-http features
@@ -477,11 +475,11 @@ fn setup_console_middleware_stack(
auth_timeout: u64,
) -> Router {
let mut app = Router::new()
.route("/favicon.ico", get(static_handler))
.route(FAVICON_PATH, get(static_handler))
.route(&format!("{CONSOLE_PREFIX}/license"), get(license_handler))
.route(&format!("{CONSOLE_PREFIX}/config.json"), get(config_handler))
.route(&format!("{CONSOLE_PREFIX}/version"), get(version_handler))
.route(&format!("{CONSOLE_PREFIX}/health"), get(health_check).head(health_check))
.route(&format!("{CONSOLE_PREFIX}{HEALTH_PREFIX}"), get(health_check).head(health_check))
.nest(CONSOLE_PREFIX, Router::new().fallback_service(get(static_handler)))
.fallback_service(get(static_handler));

View File

@@ -72,7 +72,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use tracing::{error, info, warn};
use url::Host;
// use url::UrlQuery;
pub mod bucket_meta;
pub mod event;

View File

@@ -12,8 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{
admin::{auth::validate_admin_request, router::Operation},
auth::{check_key_valid, get_session_token},
};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use rustfs_ecstore::rebalance::RebalanceMeta;
use rustfs_ecstore::{
StorageAPI,
error::StorageError,
@@ -33,12 +38,6 @@ use std::time::Duration;
use time::OffsetDateTime;
use tracing::warn;
use crate::{
admin::{auth::validate_admin_request, router::Operation},
auth::{check_key_valid, get_session_token},
};
use rustfs_ecstore::rebalance::RebalanceMeta;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RebalanceResp {
pub id: String,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::admin::router::Operation;
use http::StatusCode;
use hyper::Uri;
use matchit::Params;
@@ -20,8 +21,6 @@ use rustfs_madmin::service_commands::ServiceTraceOpts;
use s3s::{Body, S3Request, S3Response, S3Result, s3_error};
use tracing::warn;
use crate::admin::router::Operation;
#[allow(dead_code)]
fn extract_trace_options(uri: &Uri) -> S3Result<ServiceTraceOpts> {
let mut st_opts = ServiceTraceOpts::default();

View File

@@ -22,6 +22,7 @@ pub mod utils;
#[cfg(test)]
mod console_test;
use crate::server::{ADMIN_PREFIX, HEALTH_PREFIX, PROFILE_CPU_PATH, PROFILE_MEMORY_PATH};
use handlers::{
GetReplicationMetricsHandler, HealthCheckHandler, IsAdminHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler,
SetRemoteTargetHandler, bucket_meta,
@@ -37,17 +38,21 @@ use router::{AdminOperation, S3Router};
use rpc::register_rpc_route;
use s3s::route::S3Route;
const ADMIN_PREFIX: &str = "/rustfs/admin";
// const ADMIN_PREFIX: &str = "/minio/admin";
/// Create admin router
///
/// # Arguments
/// * `console_enabled` - Whether the console is enabled
///
/// # Returns
/// An instance of S3Route for admin operations
pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new(console_enabled);
// Health check endpoint for monitoring and orchestration
r.insert(Method::GET, "/health", AdminOperation(&HealthCheckHandler {}))?;
r.insert(Method::HEAD, "/health", AdminOperation(&HealthCheckHandler {}))?;
r.insert(Method::GET, "/profile/cpu", AdminOperation(&TriggerProfileCPU {}))?;
r.insert(Method::GET, "/profile/memory", AdminOperation(&TriggerProfileMemory {}))?;
r.insert(Method::GET, HEALTH_PREFIX, AdminOperation(&HealthCheckHandler {}))?;
r.insert(Method::HEAD, HEALTH_PREFIX, AdminOperation(&HealthCheckHandler {}))?;
r.insert(Method::GET, PROFILE_CPU_PATH, AdminOperation(&TriggerProfileCPU {}))?;
r.insert(Method::GET, PROFILE_MEMORY_PATH, AdminOperation(&TriggerProfileMemory {}))?;
// 1
r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?;

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::admin::ADMIN_PREFIX;
use crate::admin::console::is_console_path;
use crate::admin::console::make_console_server;
use crate::admin::rpc::RPC_PREFIX;
use crate::server::{ADMIN_PREFIX, HEALTH_PREFIX, PROFILE_CPU_PATH, PROFILE_MEMORY_PATH, RPC_PREFIX};
use hyper::HeaderMap;
use hyper::Method;
use hyper::StatusCode;
@@ -86,12 +85,12 @@ where
fn is_match(&self, method: &Method, uri: &Uri, headers: &HeaderMap, _: &mut Extensions) -> bool {
let path = uri.path();
// Profiling endpoints
if method == Method::GET && (path == "/profile/cpu" || path == "/profile/memory") {
if method == Method::GET && (path == PROFILE_CPU_PATH || path == PROFILE_MEMORY_PATH) {
return true;
}
// Health check
if (method == Method::HEAD || method == Method::GET) && path == "/health" {
if (method == Method::HEAD || method == Method::GET) && path == HEALTH_PREFIX {
return true;
}
@@ -117,12 +116,12 @@ where
let path = req.uri.path();
// Profiling endpoints
if req.method == Method::GET && (path == "/profile/cpu" || path == "/profile/memory") {
if req.method == Method::GET && (path == PROFILE_CPU_PATH || path == PROFILE_MEMORY_PATH) {
return Ok(());
}
// Health check
if (req.method == Method::HEAD || req.method == Method::GET) && path == "/health" {
if (req.method == Method::HEAD || req.method == Method::GET) && path == HEALTH_PREFIX {
return Ok(());
}

View File

@@ -15,6 +15,7 @@
use super::router::AdminOperation;
use super::router::Operation;
use super::router::S3Router;
use crate::server::RPC_PREFIX;
use futures::StreamExt;
use http::StatusCode;
use hyper::Method;
@@ -36,8 +37,6 @@ use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
use tracing::warn;
pub const RPC_PREFIX: &str = "/rustfs/rpc";
pub fn register_rpc_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()> {
r.insert(
Method::GET,

View File

@@ -66,7 +66,7 @@ const SIGN_V2_ALGORITHM: &str = "AWS ";
const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
const STREAMING_CONTENT_SHA256: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
const STREAMING_CONTENT_SHA256_TRAILER: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";
pub const UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER";
pub(crate) const UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER";
const ACTION_HEADER: &str = "Action";
const AMZ_CREDENTIAL: &str = "X-Amz-Credential";
const AMZ_ACCESS_KEY_ID: &str = "AWSAccessKeyId";

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use crate::storage::ecfs::{process_lambda_configurations, process_queue_configurations, process_topic_configurations};
use crate::{admin, config};
use crate::{admin, config, version};
use chrono::Datelike;
use rustfs_config::{DEFAULT_UPDATE_CHECK, ENV_UPDATE_CHECK};
use rustfs_ecstore::bucket::metadata_sys;
use rustfs_notify::notifier_global;
@@ -23,6 +24,21 @@ use std::env;
use std::io::Error;
use tracing::{debug, error, info, instrument, warn};
#[instrument]
pub(crate) fn print_server_info() {
let current_year = chrono::Utc::now().year();
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
info!("License: Apache-2.0 https://www.apache.org/licenses/LICENSE-2.0");
info!("Version: {}", version::get_version());
info!("Docs: https://rustfs.com/docs/");
}
/// Initialize the asynchronous update check system.
/// This function checks if update checking is enabled via
/// environment variable or default configuration. If enabled,
/// it spawns an asynchronous task to check for updates with a timeout.
pub(crate) fn init_update_check() {
let update_check_enable = env::var(ENV_UPDATE_CHECK)
.unwrap_or_else(|_| DEFAULT_UPDATE_CHECK.to_string())
@@ -70,6 +86,12 @@ pub(crate) fn init_update_check() {
});
}
/// Add existing bucket notification configurations to the global notifier system.
/// This function retrieves notification configurations for each bucket
/// and registers the corresponding event rules with the notifier system.
/// It processes queue, topic, and lambda configurations and maps them to event rules.
/// # Arguments
/// * `buckets` - A vector of bucket names to process
#[instrument(skip_all)]
pub(crate) async fn add_bucket_notification_configuration(buckets: Vec<String>) {
let region_opt = rustfs_ecstore::global::get_global_region();
@@ -128,6 +150,15 @@ pub(crate) async fn add_bucket_notification_configuration(buckets: Vec<String>)
}
/// Initialize KMS system and configure if enabled
///
/// This function initializes the global KMS service manager. If KMS is enabled
/// via command line options, it configures and starts the service accordingly.
/// If not enabled, it attempts to load any persisted KMS configuration from
/// cluster storage and starts the service if found.
/// # Arguments
/// * `opt` - The application configuration options
///
/// Returns `std::io::Result<()>` indicating success or failure
#[instrument(skip(opt))]
pub(crate) async fn init_kms_system(opt: &config::Opt) -> std::io::Result<()> {
// Initialize global KMS service manager (starts in NotConfigured state)

View File

@@ -25,19 +25,20 @@ mod update;
mod version;
// Ensure the correct path for parse_license is imported
use crate::init::{add_bucket_notification_configuration, init_buffer_profile_system, init_kms_system, init_update_check};
use crate::init::{
add_bucket_notification_configuration, init_buffer_profile_system, init_kms_system, init_update_check, print_server_info,
};
use crate::server::{
SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, init_cert, init_event_notifier, shutdown_event_notifier,
start_audit_system, start_http_server, stop_audit_system, wait_for_shutdown,
};
use chrono::Datelike;
use clap::Parser;
use license::init_license;
use rustfs_ahm::{
Scanner, create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager,
scanner::data_scanner::ScannerConfig, shutdown_ahm_services,
};
use rustfs_common::globals::set_global_addr;
use rustfs_common::{GlobalReadiness, SystemStage, set_global_addr};
use rustfs_ecstore::{
StorageAPI,
bucket::metadata_sys::init_bucket_metadata_sys,
@@ -69,25 +70,6 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
const LOGO: &str = r#"
░█▀▄░█░█░█▀▀░▀█▀░█▀▀░█▀▀
░█▀▄░█░█░▀▀█░░█░░█▀▀░▀▀█
░▀░▀░▀▀▀░▀▀▀░░▀░░▀░░░▀▀▀
"#;
#[instrument]
fn print_server_info() {
let current_year = chrono::Utc::now().year();
// Use custom macros to print server information
info!("RustFS Object Storage Server");
info!("Copyright: 2024-{} RustFS, Inc", current_year);
info!("License: Apache-2.0 https://www.apache.org/licenses/LICENSE-2.0");
info!("Version: {}", version::get_version());
info!("Docs: https://rustfs.com/docs/");
}
fn main() -> Result<()> {
let runtime = server::get_tokio_runtime_builder()
.build()
@@ -120,7 +102,7 @@ async fn async_main() -> Result<()> {
}
// print startup logo
info!("{}", LOGO);
info!("{}", server::LOGO);
// Initialize performance profiling if enabled
profiling::init_from_env().await;
@@ -143,6 +125,8 @@ async fn async_main() -> Result<()> {
#[instrument(skip(opt))]
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// 1. Initialize global readiness tracker
let readiness = Arc::new(GlobalReadiness::new());
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
@@ -214,14 +198,14 @@ async fn run(opt: config::Opt) -> Result<()> {
let s3_shutdown_tx = {
let mut s3_opt = opt.clone();
s3_opt.console_enable = false;
let s3_shutdown_tx = start_http_server(&s3_opt, state_manager.clone()).await?;
let s3_shutdown_tx = start_http_server(&s3_opt, state_manager.clone(), readiness.clone()).await?;
Some(s3_shutdown_tx)
};
let console_shutdown_tx = if opt.console_enable && !opt.console_address.is_empty() {
let mut console_opt = opt.clone();
console_opt.address = console_opt.console_address.clone();
let console_shutdown_tx = start_http_server(&console_opt, state_manager.clone()).await?;
let console_shutdown_tx = start_http_server(&console_opt, state_manager.clone(), readiness.clone()).await?;
Some(console_shutdown_tx)
} else {
None
@@ -236,6 +220,7 @@ async fn run(opt: config::Opt) -> Result<()> {
let ctx = CancellationToken::new();
// init store
// 2. Start Storage Engine (ECStore)
let store = ECStore::new(server_addr, endpoint_pools.clone(), ctx.clone())
.await
.inspect_err(|err| {
@@ -243,9 +228,9 @@ async fn run(opt: config::Opt) -> Result<()> {
})?;
ecconfig::init();
// config system configuration, wait for 1 second if failed
let mut retry_count = 0;
// // Initialize global configuration system
let mut retry_count = 0;
while let Err(e) = GLOBAL_CONFIG_SYS.init(store.clone()).await {
error!("GLOBAL_CONFIG_SYS.init failed {:?}", e);
// TODO: check error type
@@ -255,8 +240,8 @@ async fn run(opt: config::Opt) -> Result<()> {
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
// init replication_pool
readiness.mark_stage(SystemStage::StorageReady);
// init replication_pool
init_background_replication(store.clone()).await;
// Initialize KMS system if enabled
init_kms_system(&opt).await?;
@@ -289,7 +274,10 @@ async fn run(opt: config::Opt) -> Result<()> {
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
// 3. Initialize IAM System (Blocking load)
// This ensures data is in memory before moving forward
init_iam_sys(store.clone()).await.map_err(Error::other)?;
readiness.mark_stage(SystemStage::IamReady);
add_bucket_notification_configuration(buckets.clone()).await;
@@ -341,6 +329,15 @@ async fn run(opt: config::Opt) -> Result<()> {
init_update_check();
println!(
"RustFS server started successfully at {}, current time: {}",
&server_address,
chrono::offset::Utc::now().to_string()
);
info!(target: "rustfs::main::run","server started successfully at {}", &server_address);
// 4. Mark as Full Ready now that critical components are warm
readiness.mark_stage(SystemStage::FullReady);
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_common::globals::set_global_root_cert;
use rustfs_common::set_global_root_cert;
use rustfs_config::{RUSTFS_CA_CERT, RUSTFS_PUBLIC_CERT, RUSTFS_TLS_CERT};
use tracing::{debug, info};

View File

@@ -17,7 +17,7 @@ use super::compress::{CompressionConfig, CompressionPredicate};
use crate::admin;
use crate::auth::IAMAuth;
use crate::config;
use crate::server::{ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer};
use crate::server::{ReadinessGateLayer, ServiceState, ServiceStateManager, hybrid::hybrid, layer::RedirectLayer};
use crate::storage;
use crate::storage::tonic_service::make_server;
use bytes::Bytes;
@@ -29,6 +29,7 @@ use hyper_util::{
service::TowerToHyperService,
};
use metrics::{counter, histogram};
use rustfs_common::GlobalReadiness;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_utils::net::parse_and_resolve_address;
@@ -112,6 +113,7 @@ fn get_cors_allowed_origins() -> String {
pub async fn start_http_server(
opt: &config::Opt,
worker_state_manager: ServiceStateManager,
readiness: Arc<GlobalReadiness>,
) -> Result<tokio::sync::broadcast::Sender<()>> {
let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?;
let server_port = server_addr.port();
@@ -208,7 +210,7 @@ pub async fn start_http_server(
println!("Console WebUI (localhost): {protocol}://127.0.0.1:{server_port}/rustfs/console/index.html",);
} else {
info!(target: "rustfs::main::startup","RustFS API: {api_endpoints} {localhost_endpoint}");
println!("RustFS API: {api_endpoints} {localhost_endpoint}");
println!("RustFS Http API: {api_endpoints} {localhost_endpoint}");
println!("RustFS Start Time: {now_time}");
if DEFAULT_ACCESS_KEY.eq(&opt.access_key) && DEFAULT_SECRET_KEY.eq(&opt.secret_key) {
warn!(
@@ -388,6 +390,7 @@ pub async fn start_http_server(
cors_layer: cors_layer.clone(),
compression_config: compression_config.clone(),
is_console,
readiness: readiness.clone(),
};
process_connection(socket, tls_acceptor.clone(), connection_ctx, graceful.clone());
@@ -490,6 +493,7 @@ struct ConnectionContext {
cors_layer: CorsLayer,
compression_config: CompressionConfig,
is_console: bool,
readiness: Arc<GlobalReadiness>,
}
/// Process a single incoming TCP connection.
@@ -513,6 +517,7 @@ fn process_connection(
cors_layer,
compression_config,
is_console,
readiness,
} = context;
// Build services inside each connected task to avoid passing complex service types across tasks,
@@ -523,6 +528,9 @@ fn process_connection(
let hybrid_service = ServiceBuilder::new()
.layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
.layer(CatchPanicLayer::new())
// CRITICAL: Insert ReadinessGateLayer before business logic
// This stops requests from hitting IAMAuth or Storage if they are not ready.
.layer(ReadinessGateLayer::new(readiness))
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {

View File

@@ -19,6 +19,8 @@ mod event;
mod http;
mod hybrid;
mod layer;
mod prefix;
mod readiness;
mod runtime;
mod service_state;
@@ -26,6 +28,8 @@ pub(crate) use audit::{start_audit_system, stop_audit_system};
pub(crate) use cert::init_cert;
pub(crate) use event::{init_event_notifier, shutdown_event_notifier};
pub(crate) use http::start_http_server;
pub(crate) use prefix::*;
pub(crate) use readiness::ReadinessGateLayer;
pub(crate) use runtime::get_tokio_runtime_builder;
pub(crate) use service_state::SHUTDOWN_TIMEOUT;
pub(crate) use service_state::ServiceState;

View File

@@ -0,0 +1,55 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Predefined CPU profiling path for RustFS server.
/// This path is used to access CPU profiling data.
pub(crate) const PROFILE_CPU_PATH: &str = "/profile/cpu";
/// This path is used to access memory profiling data.
pub(crate) const PROFILE_MEMORY_PATH: &str = "/profile/memory";
/// Favicon path to handle browser requests for the favicon.
/// This path serves the favicon.ico file.
pub(crate) const FAVICON_PATH: &str = "/favicon.ico";
/// Predefined health check path for RustFS server.
/// This path is used to check the health status of the server.
pub(crate) const HEALTH_PREFIX: &str = "/health";
/// Predefined administrative prefix for RustFS server routes.
/// This prefix is used for endpoints that handle administrative tasks
/// such as configuration, monitoring, and management.
pub(crate) const ADMIN_PREFIX: &str = "/rustfs/admin";
/// Environment variable name for overriding the default
/// administrative prefix path.
pub(crate) const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3";
/// Predefined console prefix for RustFS server routes.
/// This prefix is used for endpoints that handle console-related tasks
/// such as user interface and management.
pub(crate) const CONSOLE_PREFIX: &str = "/rustfs/console";
/// Predefined RPC prefix for RustFS server routes.
/// This prefix is used for endpoints that handle remote procedure calls (RPC).
pub(crate) const RPC_PREFIX: &str = "/rustfs/rpc";
/// LOGO art for RustFS server.
pub(crate) const LOGO: &str = r#"
░█▀▄░█░█░█▀▀░▀█▀░█▀▀░█▀▀
░█▀▄░█░█░▀▀█░░█░░█▀▀░▀▀█
░▀░▀░▀▀▀░▀▀▀░░▀░░▀░░░▀▀▀
"#;

View File

@@ -0,0 +1,129 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Bytes;
use http::{Request as HttpRequest, Response, StatusCode};
use http_body::Body;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use rustfs_common::GlobalReadiness;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::{Layer, Service};
/// ReadinessGateLayer ensures that the system components (IAM, Storage)
/// are fully initialized before allowing any request to proceed.
#[derive(Clone)]
pub struct ReadinessGateLayer {
readiness: Arc<GlobalReadiness>,
}
impl ReadinessGateLayer {
/// Create a new ReadinessGateLayer
/// # Arguments
/// * `readiness` - An Arc to the GlobalReadiness instance
///
/// # Returns
/// A new instance of ReadinessGateLayer
pub fn new(readiness: Arc<GlobalReadiness>) -> Self {
Self { readiness }
}
}
impl<S> Layer<S> for ReadinessGateLayer {
type Service = ReadinessGateService<S>;
/// Wrap the inner service with ReadinessGateService
/// # Arguments
/// * `inner` - The inner service to wrap
/// # Returns
/// An instance of ReadinessGateService
fn layer(&self, inner: S) -> Self::Service {
ReadinessGateService {
inner,
readiness: self.readiness.clone(),
}
}
}
#[derive(Clone)]
pub struct ReadinessGateService<S> {
inner: S,
readiness: Arc<GlobalReadiness>,
}
type BoxError = Box<dyn std::error::Error + Send + Sync>;
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, BoxError>;
impl<S, B> Service<HttpRequest<Incoming>> for ReadinessGateService<S>
where
S: Service<HttpRequest<Incoming>, Response = Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Send + 'static,
B: Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError> + Send + 'static,
{
type Response = Response<BoxBody>;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: HttpRequest<Incoming>) -> Self::Future {
let mut inner = self.inner.clone();
let readiness = self.readiness.clone();
Box::pin(async move {
let path = req.uri().path();
// 1) Exact match: fixed probe/resource path
let is_exact_probe = matches!(
path,
crate::server::PROFILE_MEMORY_PATH
| crate::server::PROFILE_CPU_PATH
| crate::server::HEALTH_PREFIX
| crate::server::FAVICON_PATH
);
// 2) Prefix matching: the entire set of route prefixes (including their subpaths)
let is_prefix_probe = path.starts_with(crate::server::RUSTFS_ADMIN_PREFIX)
|| path.starts_with(crate::server::CONSOLE_PREFIX)
|| path.starts_with(crate::server::RPC_PREFIX)
|| path.starts_with(crate::server::ADMIN_PREFIX);
let is_probe = is_exact_probe || is_prefix_probe;
if !is_probe && !readiness.is_ready() {
let body: BoxBody = Full::new(Bytes::from_static(b"Service not ready"))
.map_err(|e| -> BoxError { Box::new(e) })
.boxed_unsync();
let resp = Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.header(http::header::RETRY_AFTER, "5")
.header(http::header::CONTENT_TYPE, "text/plain; charset=utf-8")
.header(http::header::CACHE_CONTROL, "no-store")
.body(body)
.expect("failed to build not ready response");
return Ok(resp);
}
let resp = inner.call(req).await?;
// System is ready, forward to the actual S3/RPC handlers
// Transparently converts any response body into a BoxBody, and then Trace/Cors/Compression continues to work
let (parts, body) = resp.into_parts();
let body: BoxBody = body.map_err(Into::into).boxed_unsync();
Ok(Response::from_parts(parts, body))
})
}
}

View File

@@ -16,7 +16,7 @@ use bytes::Bytes;
use futures::Stream;
use futures_util::future::join_all;
use rmp_serde::{Deserializer, Serializer};
use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::HealOpts};
use rustfs_common::{GLOBAL_LOCAL_NODE_NAME, heal_channel::HealOpts};
use rustfs_ecstore::{
admin_server_info::get_local_server_property,
bucket::{metadata::load_bucket_metadata, metadata_sys},