Fix/issue #1001 dead node detection (#1054)

Co-authored-by: weisd <im@weisd.in>
Co-authored-by: Jitterx69 <mohit@example.com>
This commit is contained in:
Jitter
2025-12-08 09:59:46 +05:30
committed by GitHub
parent 834025d9e3
commit 76d25d9a20
9 changed files with 369 additions and 79 deletions

2
Cargo.lock generated
View File

@@ -7157,6 +7157,7 @@ dependencies = [
"serde",
"tokio",
"tonic",
"tracing",
"uuid",
]
@@ -7484,6 +7485,7 @@ dependencies = [
"tonic",
"tonic-prost",
"tonic-prost-build",
"tracing",
]
[[package]]

View File

@@ -39,3 +39,4 @@ path-clean = { workspace = true }
rmp-serde = { workspace = true }
async-trait = { workspace = true }
s3s = { workspace = true }
tracing = { workspace = true }

View File

@@ -28,3 +28,28 @@ pub static GLOBAL_Conn_Map: LazyLock<RwLock<HashMap<String, Channel>>> = LazyLoc
pub async fn set_global_addr(addr: &str) {
*GLOBAL_Rustfs_Addr.write().await = addr.to_string();
}
/// Evict a stale/dead connection from the global connection cache.
/// This is critical for cluster recovery when a node dies unexpectedly (e.g., power-off).
/// By removing the cached connection, subsequent requests will establish a fresh connection.
pub async fn evict_connection(addr: &str) {
let removed = GLOBAL_Conn_Map.write().await.remove(addr);
if removed.is_some() {
tracing::warn!("Evicted stale connection from cache: {}", addr);
}
}
/// Check if a connection exists in the cache for the given address.
pub async fn has_cached_connection(addr: &str) -> bool {
GLOBAL_Conn_Map.read().await.contains_key(addr)
}
/// Clear all cached connections. Useful for full cluster reset/recovery.
pub async fn clear_all_connections() {
let mut map = GLOBAL_Conn_Map.write().await;
let count = map.len();
map.clear();
if count > 0 {
tracing::warn!("Cleared {} cached connections from global map", count);
}
}

View File

@@ -190,16 +190,32 @@ impl NotificationSys {
pub async fn storage_info<S: StorageAPI>(&self, api: &S) -> rustfs_madmin::StorageInfo {
let mut futures = Vec::with_capacity(self.peer_clients.len());
let endpoints = get_global_endpoints();
let peer_timeout = Duration::from_secs(2); // Same timeout as server_info
for client in self.peer_clients.iter() {
let endpoints = endpoints.clone();
futures.push(async move {
if let Some(client) = client {
match client.local_storage_info().await {
Ok(info) => Some(info),
Err(_) => Some(rustfs_madmin::StorageInfo {
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),
..Default::default()
}),
let host = client.host.to_string();
// Wrap in timeout to ensure we don't hang on dead peers
match timeout(peer_timeout, client.local_storage_info()).await {
Ok(Ok(info)) => Some(info),
Ok(Err(err)) => {
warn!("peer {} storage_info failed: {}", host, err);
Some(rustfs_madmin::StorageInfo {
disks: get_offline_disks(&host, &endpoints),
..Default::default()
})
}
Err(_) => {
warn!("peer {} storage_info timed out after {:?}", host, peer_timeout);
client.evict_connection().await;
Some(rustfs_madmin::StorageInfo {
disks: get_offline_disks(&host, &endpoints),
..Default::default()
})
}
}
} else {
None
@@ -230,13 +246,19 @@ impl NotificationSys {
futures.push(async move {
if let Some(client) = client {
let host = client.host.to_string();
call_peer_with_timeout(
peer_timeout,
&host,
|| client.server_info(),
|| offline_server_properties(&host, &endpoints),
)
.await
match timeout(peer_timeout, client.server_info()).await {
Ok(Ok(info)) => info,
Ok(Err(err)) => {
warn!("peer {} server_info failed: {}", host, err);
// client.server_info handles eviction internally on error, but fallback needed
offline_server_properties(&host, &endpoints)
}
Err(_) => {
warn!("peer {} server_info timed out after {:?}", host, peer_timeout);
client.evict_connection().await;
offline_server_properties(&host, &endpoints)
}
}
} else {
ServerProperties::default()
}

View File

@@ -26,7 +26,7 @@ use rustfs_madmin::{
net::NetInfo,
};
use rustfs_protos::{
node_service_time_out_client,
evict_failed_connection, node_service_time_out_client,
proto_gen::node_service::{
DeleteBucketMetadataRequest, DeletePolicyRequest, DeleteServiceAccountRequest, DeleteUserRequest, GetCpusRequest,
GetMemInfoRequest, GetMetricsRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest,
@@ -82,10 +82,25 @@ impl PeerRestClient {
(remote, all)
}
/// Evict the connection to this peer from the global cache.
/// This should be called when communication with this peer fails.
pub async fn evict_connection(&self) {
evict_failed_connection(&self.grid_host).await;
}
}
impl PeerRestClient {
pub async fn local_storage_info(&self) -> Result<rustfs_madmin::StorageInfo> {
let result = self.local_storage_info_inner().await;
if result.is_err() {
// Evict stale connection on any error for cluster recovery
self.evict_connection().await;
}
result
}
async fn local_storage_info_inner(&self) -> Result<rustfs_madmin::StorageInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
@@ -107,6 +122,15 @@ impl PeerRestClient {
}
pub async fn server_info(&self) -> Result<ServerProperties> {
let result = self.server_info_inner().await;
if result.is_err() {
// Evict stale connection on any error for cluster recovery
self.evict_connection().await;
}
result
}
async fn server_info_inner(&self) -> Result<ServerProperties> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
@@ -478,7 +502,11 @@ impl PeerRestClient {
access_key: access_key.to_string(),
});
let response = client.delete_user(request).await?.into_inner();
let result = client.delete_user(request).await;
if result.is_err() {
self.evict_connection().await;
}
let response = result?.into_inner();
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::other(msg));
@@ -496,7 +524,11 @@ impl PeerRestClient {
access_key: access_key.to_string(),
});
let response = client.delete_service_account(request).await?.into_inner();
let result = client.delete_service_account(request).await;
if result.is_err() {
self.evict_connection().await;
}
let response = result?.into_inner();
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::other(msg));
@@ -515,7 +547,11 @@ impl PeerRestClient {
temp,
});
let response = client.load_user(request).await?.into_inner();
let result = client.load_user(request).await;
if result.is_err() {
self.evict_connection().await;
}
let response = result?.into_inner();
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::other(msg));
@@ -533,7 +569,11 @@ impl PeerRestClient {
access_key: access_key.to_string(),
});
let response = client.load_service_account(request).await?.into_inner();
let result = client.load_service_account(request).await;
if result.is_err() {
self.evict_connection().await;
}
let response = result?.into_inner();
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::other(msg));
@@ -551,7 +591,11 @@ impl PeerRestClient {
group: group.to_string(),
});
let response = client.load_group(request).await?.into_inner();
let result = client.load_group(request).await;
if result.is_err() {
self.evict_connection().await;
}
let response = result?.into_inner();
if !response.success {
if let Some(msg) = response.error_info {
return Err(Error::other(msg));

View File

@@ -240,14 +240,19 @@ impl<T: Store> IamSys<T> {
return;
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_user(name, is_temp).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_user failed: {}", err);
// Fire-and-forget notification to peers - don't block auth operations
// This is critical for cluster recovery: login should not wait for dead peers
let name = name.to_string();
tokio::spawn(async move {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_user(&name, is_temp).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_user failed (non-blocking): {}", err);
}
}
}
}
});
}
async fn notify_for_service_account(&self, name: &str) {
@@ -255,14 +260,18 @@ impl<T: Store> IamSys<T> {
return;
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_service_account(name).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_service_account failed: {}", err);
// Fire-and-forget notification to peers - don't block service account operations
let name = name.to_string();
tokio::spawn(async move {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_service_account(&name).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_service_account failed (non-blocking): {}", err);
}
}
}
}
});
}
pub async fn current_policies(&self, name: &str) -> String {
@@ -571,14 +580,18 @@ impl<T: Store> IamSys<T> {
return;
}
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_group(group).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_group failed: {}", err);
// Fire-and-forget notification to peers - don't block group operations
let group = group.to_string();
tokio::spawn(async move {
if let Some(notification_sys) = get_global_notification_sys() {
let resp = notification_sys.load_group(&group).await;
for r in resp {
if let Some(err) = r.err {
warn!("notify load_group failed (non-blocking): {}", err);
}
}
}
}
});
}
pub async fn create_user(&self, access_key: &str, args: &AddOrUpdateUserReq) -> Result<OffsetDateTime> {

View File

@@ -38,4 +38,5 @@ flatbuffers = { workspace = true }
prost = { workspace = true }
tonic = { workspace = true, features = ["transport"] }
tonic-prost = { workspace = true }
tonic-prost-build = { workspace = true }
tonic-prost-build = { workspace = true }
tracing = { workspace = true }

View File

@@ -19,17 +19,87 @@ use std::{error::Error, time::Duration};
pub use generated::*;
use proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_common::globals::GLOBAL_Conn_Map;
use rustfs_common::globals::{GLOBAL_Conn_Map, evict_connection};
use tonic::{
Request, Status,
metadata::MetadataValue,
service::interceptor::InterceptedService,
transport::{Channel, Endpoint},
};
use tracing::{debug, warn};
// Default 100 MB
pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024;
/// Timeout for connection establishment - reduced for faster failure detection
const CONNECT_TIMEOUT_SECS: u64 = 3;
/// TCP keepalive interval - how often to probe the connection
const TCP_KEEPALIVE_SECS: u64 = 10;
/// HTTP/2 keepalive interval - application-layer heartbeat
const HTTP2_KEEPALIVE_INTERVAL_SECS: u64 = 5;
/// HTTP/2 keepalive timeout - how long to wait for PING ACK
const HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 3;
/// Overall RPC timeout - maximum time for any single RPC operation
const RPC_TIMEOUT_SECS: u64 = 30;
/// Creates a new gRPC channel with optimized keepalive settings for cluster resilience.
///
/// This function is designed to detect dead peers quickly:
/// - Fast connection timeout (3s instead of default 30s+)
/// - Aggressive TCP keepalive (10s)
/// - HTTP/2 PING every 5s, timeout at 3s
/// - Overall RPC timeout of 30s (reduced from 60s)
async fn create_new_channel(addr: &str) -> Result<Channel, Box<dyn Error>> {
debug!("Creating new gRPC channel to: {}", addr);
let connector = Endpoint::from_shared(addr.to_string())?
// Fast connection timeout for dead peer detection
.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
// TCP-level keepalive - OS will probe connection
.tcp_keepalive(Some(Duration::from_secs(TCP_KEEPALIVE_SECS)))
// HTTP/2 PING frames for application-layer health check
.http2_keep_alive_interval(Duration::from_secs(HTTP2_KEEPALIVE_INTERVAL_SECS))
// How long to wait for PING ACK before considering connection dead
.keep_alive_timeout(Duration::from_secs(HTTP2_KEEPALIVE_TIMEOUT_SECS))
// Send PINGs even when no active streams (critical for idle connections)
.keep_alive_while_idle(true)
// Overall timeout for any RPC - fail fast on unresponsive peers
.timeout(Duration::from_secs(RPC_TIMEOUT_SECS));
let channel = connector.connect().await?;
// Cache the new connection
{
GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone());
}
debug!("Successfully created and cached gRPC channel to: {}", addr);
Ok(channel)
}
/// Get a gRPC client for the NodeService with robust connection handling.
///
/// This function implements several resilience features:
/// 1. Connection caching for performance
/// 2. Automatic eviction of stale/dead connections on error
/// 3. Optimized keepalive settings for fast dead peer detection
/// 4. Reduced timeouts to fail fast when peers are unresponsive
///
/// # Connection Lifecycle
/// - Cached connections are reused for subsequent calls
/// - On any connection error, the cached connection is evicted
/// - Fresh connections are established with aggressive keepalive settings
///
/// # Cluster Power-Off Recovery
/// When a node experiences abrupt power-off:
/// 1. The cached connection will fail on next use
/// 2. The connection is automatically evicted from cache
/// 3. Subsequent calls will attempt fresh connections
/// 4. If node is still down, connection will fail fast (3s timeout)
pub async fn node_service_time_out_client(
addr: &String,
) -> Result<
@@ -40,25 +110,18 @@ pub async fn node_service_time_out_client(
> {
let token: MetadataValue<_> = "rustfs rpc".parse()?;
let channel = { GLOBAL_Conn_Map.read().await.get(addr).cloned() };
// Try to get cached channel
let cached_channel = { GLOBAL_Conn_Map.read().await.get(addr).cloned() };
let channel = match channel {
Some(channel) => channel,
None => {
let connector = Endpoint::from_shared(addr.to_string())?
.connect_timeout(Duration::from_secs(5))
.tcp_keepalive(Some(Duration::from_secs(10)))
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(3))
.keep_alive_while_idle(true)
.timeout(Duration::from_secs(60));
let channel = connector.connect().await?;
{
GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone());
}
let channel = match cached_channel {
Some(channel) => {
debug!("Using cached gRPC channel for: {}", addr);
channel
}
None => {
// No cached connection, create new one
create_new_channel(addr).await?
}
};
Ok(NodeServiceClient::with_interceptor(
@@ -69,3 +132,31 @@ pub async fn node_service_time_out_client(
}),
))
}
/// Get a gRPC client with automatic connection eviction on failure.
///
/// This is the preferred method for cluster operations as it ensures
/// that failed connections are automatically cleaned up from the cache.
///
/// Returns the client and the address for later eviction if needed.
pub async fn node_service_client_with_eviction(
addr: &String,
) -> Result<
(
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
String,
),
Box<dyn Error>,
> {
let client = node_service_time_out_client(addr).await?;
Ok((client, addr.clone()))
}
/// Evict a connection from the cache after a failure.
/// This should be called when an RPC fails to ensure fresh connections are tried.
pub async fn evict_failed_connection(addr: &str) {
warn!("Evicting failed gRPC connection: {}", addr);
evict_connection(addr).await;
}

View File

@@ -5,25 +5,30 @@
**Symptoms**:
- The application became unable to upload files.
- The Console Web UI became unresponsive across the cluster.
- The `rustfsadmin` user was unable to log in after a server power-off.
- The performance page displayed 0 storage, 0 objects, and 0 servers online/offline.
- The system "hung" indefinitely, unlike the immediate recovery observed during a graceful process termination (`kill`).
**Root Cause**:
The standard TCP protocol does not immediately detect a silent peer disappearance (power loss) because no `FIN` or `RST` packets are sent. Without active application-layer heartbeats, the surviving nodes kept connections implementation in an `ESTABLISHED` state, waiting indefinitely for responses that would never arrive.
**Root Cause (Multi-Layered)**:
1. **TCP Connection Issue**: The standard TCP protocol does not immediately detect a silent peer disappearance (power loss) because no `FIN` or `RST` packets are sent.
2. **Stale Connection Cache**: Cached gRPC connections in `GLOBAL_Conn_Map` were reused even when the peer was dead, causing blocking on every RPC call.
3. **Blocking IAM Notifications**: Login operations blocked waiting for ALL peers to acknowledge user/policy changes.
4. **No Per-Peer Timeouts**: Console aggregation calls like `server_info()` and `storage_info()` could hang waiting for dead peers.
---
## 2. Technical Approach
To resolve this, we needed to transform the passive failure detection (waiting for TCP timeout) into an active detection mechanism.
To resolve this, we implemented a comprehensive multi-layered resilience strategy.
### Key Objectives:
1. **Fail Fast**: Detect dead peers in seconds, not minutes.
2. **Accuracy**: Distinguish between network congestion and actual node failure.
3. **Safety**: Ensure no thread or task blocks forever on a remote procedure call (RPC).
2. **Evict Stale Connections**: Automatically remove dead connections from cache to force reconnection.
3. **Non-Blocking Operations**: Auth and IAM operations should not wait for dead peers.
4. **Graceful Degradation**: Console should show partial data from healthy nodes, not hang.
---
## 3. Implemented Solution
We modified the internal gRPC client configuration in `crates/protos/src/lib.rs` to implement a multi-layered health check strategy.
### Solution Overview
The fix implements a multi-layered detection strategy covering both Control Plane (RPC) and Data Plane (Streaming):
@@ -43,23 +48,109 @@ The fix implements a multi-layered detection strategy covering both Control Plan
### Configuration Changes
```rust
let connector = Endpoint::from_shared(addr.to_string())?
.connect_timeout(Duration::from_secs(5))
// 1. App-Layer Heartbeats (Primary Detection)
// Sends a hidden HTTP/2 PING frame every 5 seconds.
.http2_keep_alive_interval(Duration::from_secs(5))
// If PING is not acknowledged within 3 seconds, closes connection.
.keep_alive_timeout(Duration::from_secs(3))
// Ensures PINGs are sent even when no active requests are in flight.
.keep_alive_while_idle(true)
// 2. Transport-Layer Keepalive (OS Backup)
.tcp_keepalive(Some(Duration::from_secs(10)))
// 3. Global Safety Net
// Hard deadline for any RPC operation.
.timeout(Duration::from_secs(60));
pub async fn storage_info<S: StorageAPI>(&self, api: &S) -> rustfs_madmin::StorageInfo {
let peer_timeout = Duration::from_secs(2);
for client in self.peer_clients.iter() {
futures.push(async move {
if let Some(client) = client {
match timeout(peer_timeout, client.local_storage_info()).await {
Ok(Ok(info)) => Some(info),
Ok(Err(_)) | Err(_) => {
// Return offline status for dead peer
Some(rustfs_madmin::StorageInfo {
disks: get_offline_disks(&host, &endpoints),
..Default::default()
})
}
}
}
});
}
// Rest continues even if some peers are down
}
```
### Outcome
- **Detection Time**: Reduced from ~15+ minutes (OS default) to **~8 seconds** (5s interval + 3s timeout).
- **Behavior**: When a node loses power, surviving peers now detect the lost connection almost immediately, throwing a protocol error that triggers standard cluster recovery/failover logic.
- **Result**: The cluster now handles power-offs with the same resilience as graceful shutdowns.
### Fix 4: Enhanced gRPC Client Configuration
**File Modified**: `crates/protos/src/lib.rs`
**Configuration**:
```rust
const CONNECT_TIMEOUT_SECS: u64 = 3; // Reduced from 5s
const TCP_KEEPALIVE_SECS: u64 = 10; // OS-level keepalive
const HTTP2_KEEPALIVE_INTERVAL_SECS: u64 = 5; // HTTP/2 PING interval
const HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 3; // PING ACK timeout
const RPC_TIMEOUT_SECS: u64 = 30; // Reduced from 60s
let connector = Endpoint::from_shared(addr.to_string())?
.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
.tcp_keepalive(Some(Duration::from_secs(TCP_KEEPALIVE_SECS)))
.http2_keep_alive_interval(Duration::from_secs(HTTP2_KEEPALIVE_INTERVAL_SECS))
.keep_alive_timeout(Duration::from_secs(HTTP2_KEEPALIVE_TIMEOUT_SECS))
.keep_alive_while_idle(true)
.timeout(Duration::from_secs(RPC_TIMEOUT_SECS));
```
---
## 4. Files Changed Summary
| File | Change |
|------|--------|
| `crates/common/src/globals.rs` | Added `evict_connection()`, `has_cached_connection()`, `clear_all_connections()` |
| `crates/common/Cargo.toml` | Added `tracing` dependency |
| `crates/protos/src/lib.rs` | Refactored to use constants, added `evict_failed_connection()`, improved documentation |
| `crates/protos/Cargo.toml` | Added `tracing` dependency |
| `crates/ecstore/src/rpc/peer_rest_client.rs` | Added auto-eviction on RPC failure for `server_info()` and `local_storage_info()` |
| `crates/ecstore/src/notification_sys.rs` | Added per-peer timeout to `storage_info()` |
| `crates/iam/src/sys.rs` | Made `notify_for_user()`, `notify_for_service_account()`, `notify_for_group()` non-blocking |
---
## 5. Test Results
All 299 tests pass:
```
test result: ok. 299 passed; 0 failed; 0 ignored
```
---
## 6. Expected Behavior After Fix
| Scenario | Before | After |
|----------|--------|-------|
| Node power-off | Cluster hangs indefinitely | Cluster recovers in ~8 seconds |
| Login during node failure | Login hangs | Login succeeds immediately |
| Console during node failure | Shows 0/0/0 | Shows partial data from healthy nodes |
| Upload during node failure | Upload stops | Upload fails fast, can be retried |
| Stale cached connection | Blocks forever | Auto-evicted, fresh connection attempted |
---
## 7. Verification Steps
1. **Start a 3+ node RustFS cluster**
2. **Test Console Recovery**:
- Access console dashboard
- Forcefully kill one node (e.g., `kill -9`)
- Verify dashboard updates within 10 seconds showing offline status
3. **Test Login Recovery**:
- Kill a node while logged out
- Attempt login with `rustfsadmin`
- Verify login succeeds within 5 seconds
4. **Test Upload Recovery**:
- Start a large file upload
- Kill the target node mid-upload
- Verify upload fails fast (not hangs) and can be retried
---
## 8. Related Issues
- Issue #1001: Cluster Recovery from Abrupt Power-Off
- PR #1035: fix(net): resolve 1GB upload hang and macos build
## 9. Contributors
- Initial keepalive fix: Original PR #1035
- Deep-rooted reliability fix: This update