Add Disk Timeout and Health Check Functionality (#1196)

Signed-off-by: weisd <im@weisd.in>
Co-authored-by: loverustfs <hello@rustfs.com>
This commit is contained in:
weisd
2025-12-22 17:15:19 +08:00
committed by GitHub
parent 08f1a31f3f
commit 80cfb4feab
17 changed files with 2017 additions and 649 deletions

1
Cargo.lock generated
View File

@@ -7302,6 +7302,7 @@ dependencies = [
"tonic",
"tower",
"tracing",
"tracing-subscriber",
"url",
"urlencoding",
"uuid",

View File

@@ -468,14 +468,17 @@ impl HealManager {
let active_heals = self.active_heals.clone();
let cancel_token = self.cancel_token.clone();
let storage = self.storage.clone();
info!(
"start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}",
config.read().await.heal_interval
);
let mut duration = {
let config = config.read().await;
config.heal_interval
};
if duration < Duration::from_secs(1) {
duration = Duration::from_secs(1);
}
info!("start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}", duration);
tokio::spawn(async move {
let mut interval = interval(config.read().await.heal_interval);
let mut interval = interval(duration);
loop {
tokio::select! {

View File

@@ -30,7 +30,7 @@ use rustfs_ecstore::{
bucket::versioning::VersioningApi,
bucket::versioning_sys::BucketVersioningSys,
data_usage::{aggregate_local_snapshots, compute_bucket_usage, store_data_usage_in_backend},
disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
disk::{DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
set_disk::SetDisks,
store_api::ObjectInfo,
};
@@ -1977,7 +1977,7 @@ impl Scanner {
} else {
// Apply lifecycle actions
if let Some(lifecycle_config) = &lifecycle_config {
if let Disk::Local(_local_disk) = &**disk {
if disk.is_local() {
let vcfg = BucketVersioningSys::get(bucket).await.ok();
let mut scanner_item = ScannerItem {

View File

@@ -21,10 +21,11 @@ use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetad
use rustfs_ecstore::endpoints::EndpointServerPools;
use rustfs_ecstore::store::ECStore;
use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI};
use std::sync::Arc;
use std::sync::{Arc, Once};
use tempfile::TempDir;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::Level;
/// Build a minimal single-node ECStore over a temp directory and populate objects.
async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECStore>) {
@@ -74,8 +75,22 @@ async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc<ECS
(temp_dir, store)
}
static INIT: Once = Once::new();
fn init_tracing(filter_level: Level) {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_max_level(filter_level)
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
.with_thread_names(true)
.try_init();
});
}
#[tokio::test]
async fn fallback_builds_full_counts_over_100_objects() {
init_tracing(Level::ERROR);
let (_tmp, store) = create_store_with_objects(1000).await;
let scanner = Scanner::new(None, None);

View File

@@ -38,9 +38,13 @@ use walkdir::WalkDir;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
static INIT: Once = Once::new();
fn init_tracing() {
pub fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
.with_thread_names(true)
.try_init();
});
}
@@ -356,7 +360,7 @@ mod serial_tests {
// Create heal manager with faster interval
let cfg = HealConfig {
heal_interval: Duration::from_secs(2),
heal_interval: Duration::from_secs(1),
..Default::default()
};
let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg));

View File

@@ -113,6 +113,7 @@ faster-hex = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
criterion = { workspace = true, features = ["html_reports"] }
temp-env = { workspace = true }
tracing-subscriber = { workspace = true }
[build-dependencies]
shadow-rs = { workspace = true, features = ["build", "metadata"] }

View File

@@ -16,7 +16,7 @@ use crate::disk::error::DiskError;
use crate::disk::{self, DiskAPI, DiskStore, WalkDirOptions};
use futures::future::join_all;
use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof};
use std::{future::Future, pin::Pin, sync::Arc};
use std::{future::Future, pin::Pin};
use tokio::spawn;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
@@ -71,14 +71,14 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
let mut jobs: Vec<tokio::task::JoinHandle<std::result::Result<(), DiskError>>> = Vec::new();
let mut readers = Vec::with_capacity(opts.disks.len());
let fds = Arc::new(opts.fallback_disks.clone());
let fds = opts.fallback_disks.iter().flatten().cloned().collect::<Vec<_>>();
let cancel_rx = CancellationToken::new();
for disk in opts.disks.iter() {
let opdisk = disk.clone();
let opts_clone = opts.clone();
let fds_clone = fds.clone();
let mut fds_clone = fds.clone();
let cancel_rx_clone = cancel_rx.clone();
let (rd, mut wr) = tokio::io::duplex(64);
readers.push(MetacacheReader::new(rd));
@@ -113,21 +113,20 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d
}
while need_fallback {
// warn!("list_path_raw: while need_fallback start");
let disk = match fds_clone.iter().find(|d| d.is_some()) {
Some(d) => {
if let Some(disk) = d.clone() {
disk
} else {
warn!("list_path_raw: fallback disk is none");
break;
}
}
None => {
warn!("list_path_raw: fallback disk is none2");
break;
let disk_op = {
if fds_clone.is_empty() {
None
} else {
let disk = fds_clone.remove(0);
if disk.is_online().await { Some(disk.clone()) } else { None }
}
};
let Some(disk) = disk_op else {
warn!("list_path_raw: fallback disk is none");
break;
};
match disk
.as_ref()
.walk_dir(

View File

@@ -0,0 +1,770 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskError, DiskInfo, DiskInfoOptions, DiskLocation, Endpoint, Error,
FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, Result, UpdateMetadataOpts, VolumeInfo,
WalkDirOptions, local::LocalDisk,
};
use bytes::Bytes;
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_utils::string::parse_bool_with_default;
use std::{
path::PathBuf,
sync::{
Arc,
atomic::{AtomicI64, AtomicU32, Ordering},
},
time::Duration,
};
use tokio::{sync::RwLock, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use uuid::Uuid;
/// Disk health status constants
const DISK_HEALTH_OK: u32 = 0;
const DISK_HEALTH_FAULTY: u32 = 1;
pub const ENV_RUSTFS_DRIVE_ACTIVE_MONITORING: &str = "RUSTFS_DRIVE_ACTIVE_MONITORING";
pub const ENV_RUSTFS_DRIVE_MAX_TIMEOUT_DURATION: &str = "RUSTFS_DRIVE_MAX_TIMEOUT_DURATION";
pub const CHECK_EVERY: Duration = Duration::from_secs(15);
pub const SKIP_IF_SUCCESS_BEFORE: Duration = Duration::from_secs(5);
pub const CHECK_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
lazy_static::lazy_static! {
static ref TEST_OBJ: String = format!("health-check-{}", Uuid::new_v4());
static ref TEST_DATA: Bytes = Bytes::from(vec![42u8; 2048]);
static ref TEST_BUCKET: String = ".rustfs.sys/tmp".to_string();
}
pub fn get_max_timeout_duration() -> Duration {
std::env::var(ENV_RUSTFS_DRIVE_MAX_TIMEOUT_DURATION)
.map(|v| Duration::from_secs(v.parse::<u64>().unwrap_or(30)))
.unwrap_or(Duration::from_secs(30))
}
/// DiskHealthTracker tracks the health status of a disk.
/// Similar to Go's diskHealthTracker.
#[derive(Debug)]
pub struct DiskHealthTracker {
/// Atomic timestamp of last successful operation
pub last_success: AtomicI64,
/// Atomic timestamp of last operation start
pub last_started: AtomicI64,
/// Atomic disk status (OK or Faulty)
pub status: AtomicU32,
/// Atomic number of waiting operations
pub waiting: AtomicU32,
}
impl DiskHealthTracker {
/// Create a new disk health tracker
pub fn new() -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
Self {
last_success: AtomicI64::new(now),
last_started: AtomicI64::new(now),
status: AtomicU32::new(DISK_HEALTH_OK),
waiting: AtomicU32::new(0),
}
}
/// Log a successful operation
pub fn log_success(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
self.last_success.store(now, Ordering::Relaxed);
}
/// Check if disk is faulty
pub fn is_faulty(&self) -> bool {
self.status.load(Ordering::Relaxed) == DISK_HEALTH_FAULTY
}
/// Set disk as faulty
pub fn set_faulty(&self) {
self.status.store(DISK_HEALTH_FAULTY, Ordering::Relaxed);
}
/// Set disk as OK
pub fn set_ok(&self) {
self.status.store(DISK_HEALTH_OK, Ordering::Relaxed);
}
pub fn swap_ok_to_faulty(&self) -> bool {
self.status
.compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}
/// Increment waiting operations counter
pub fn increment_waiting(&self) {
self.waiting.fetch_add(1, Ordering::Relaxed);
}
/// Decrement waiting operations counter
pub fn decrement_waiting(&self) {
self.waiting.fetch_sub(1, Ordering::Relaxed);
}
/// Get waiting operations count
pub fn waiting_count(&self) -> u32 {
self.waiting.load(Ordering::Relaxed)
}
/// Get last success timestamp
pub fn last_success(&self) -> i64 {
self.last_success.load(Ordering::Relaxed)
}
}
impl Default for DiskHealthTracker {
fn default() -> Self {
Self::new()
}
}
/// Health check context key for tracking disk operations
#[derive(Debug, Clone)]
struct HealthDiskCtxKey;
#[derive(Debug)]
struct HealthDiskCtxValue {
last_success: Arc<AtomicI64>,
}
impl HealthDiskCtxValue {
fn log_success(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
self.last_success.store(now, Ordering::Relaxed);
}
}
/// LocalDiskWrapper wraps a DiskStore with health tracking capabilities.
/// This is similar to Go's xlStorageDiskIDCheck.
#[derive(Debug, Clone)]
pub struct LocalDiskWrapper {
/// The underlying disk store
disk: Arc<LocalDisk>,
/// Health tracker
health: Arc<DiskHealthTracker>,
/// Whether health checking is enabled
health_check: bool,
/// Cancellation token for monitoring tasks
cancel_token: CancellationToken,
/// Disk ID for stale checking
disk_id: Arc<RwLock<Option<Uuid>>>,
}
impl LocalDiskWrapper {
/// Create a new LocalDiskWrapper
pub fn new(disk: Arc<LocalDisk>, health_check: bool) -> Self {
// Check environment variable for health check override
// Default to true if not set, but only enable if both param and env are true
let env_health_check = std::env::var(ENV_RUSTFS_DRIVE_ACTIVE_MONITORING)
.map(|v| parse_bool_with_default(&v, true))
.unwrap_or(true);
let ret = Self {
disk,
health: Arc::new(DiskHealthTracker::new()),
health_check: health_check && env_health_check,
cancel_token: CancellationToken::new(),
disk_id: Arc::new(RwLock::new(None)),
};
ret.start_monitoring();
ret
}
pub fn get_disk(&self) -> Arc<LocalDisk> {
self.disk.clone()
}
/// Start the disk monitoring if health_check is enabled
pub fn start_monitoring(&self) {
if self.health_check {
let health = Arc::clone(&self.health);
let cancel_token = self.cancel_token.clone();
let disk = Arc::clone(&self.disk);
tokio::spawn(async move {
Self::monitor_disk_writable(disk, health, cancel_token).await;
});
}
}
/// Stop the disk monitoring
pub async fn stop_monitoring(&self) {
self.cancel_token.cancel();
}
/// Monitor disk writability periodically
async fn monitor_disk_writable(disk: Arc<LocalDisk>, health: Arc<DiskHealthTracker>, cancel_token: CancellationToken) {
// TODO: config interval
let mut interval = time::interval(CHECK_EVERY);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
return;
}
_ = interval.tick() => {
if cancel_token.is_cancelled() {
return;
}
if health.status.load(Ordering::Relaxed) != DISK_HEALTH_OK {
continue;
}
let last_success_nanos = health.last_success.load(Ordering::Relaxed);
let elapsed = Duration::from_nanos(
(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64 - last_success_nanos) as u64
);
if elapsed < SKIP_IF_SUCCESS_BEFORE {
continue;
}
tokio::time::sleep(Duration::from_secs(1)).await;
debug!("health check: performing health check");
if Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, true, CHECK_TIMEOUT_DURATION).await.is_err() && health.swap_ok_to_faulty() {
// Health check failed, disk is considered faulty
health.increment_waiting(); // Balance the increment from failed operation
let health_clone = Arc::clone(&health);
let disk_clone = disk.clone();
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
Self::monitor_disk_status(disk_clone, health_clone, cancel_clone).await;
});
}
}
}
}
}
/// Perform a health check by writing and reading a test file
async fn perform_health_check(
disk: Arc<LocalDisk>,
test_bucket: &str,
test_filename: &str,
test_data: &Bytes,
check_faulty_only: bool,
timeout_duration: Duration,
) -> Result<()> {
// Perform health check with timeout
let health_check_result = tokio::time::timeout(timeout_duration, async {
// Try to write test data
disk.write_all(test_bucket, test_filename, test_data.clone()).await?;
// Try to read back the data
let read_data = disk.read_all(test_bucket, test_filename).await?;
// Verify data integrity
if read_data.len() != test_data.len() {
warn!(
"health check: test file data length mismatch: expected {} bytes, got {}",
test_data.len(),
read_data.len()
);
if check_faulty_only {
return Ok(());
}
return Err(DiskError::FaultyDisk);
}
// Clean up
disk.delete(
test_bucket,
test_filename,
DeleteOptions {
recursive: false,
immediate: false,
undo_write: false,
old_data_dir: None,
},
)
.await?;
Ok(())
})
.await;
match health_check_result {
Ok(result) => match result {
Ok(()) => Ok(()),
Err(e) => {
debug!("health check: failed: {:?}", e);
if e == DiskError::FaultyDisk {
return Err(e);
}
if check_faulty_only { Ok(()) } else { Err(e) }
}
},
Err(_) => {
// Timeout occurred
warn!("health check: timeout after {:?}", timeout_duration);
Err(DiskError::FaultyDisk)
}
}
}
/// Monitor disk status and try to bring it back online
async fn monitor_disk_status(disk: Arc<LocalDisk>, health: Arc<DiskHealthTracker>, cancel_token: CancellationToken) {
const CHECK_EVERY: Duration = Duration::from_secs(5);
let mut interval = time::interval(CHECK_EVERY);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
return;
}
_ = interval.tick() => {
if cancel_token.is_cancelled() {
return;
}
match Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, false, CHECK_TIMEOUT_DURATION).await {
Ok(_) => {
info!("Disk {} is back online", disk.to_string());
health.set_ok();
health.decrement_waiting();
return;
}
Err(e) => {
warn!("Disk {} still faulty: {:?}", disk.to_string(), e);
}
}
}
}
}
}
async fn check_id(&self, want_id: Option<Uuid>) -> Result<()> {
if want_id.is_none() {
return Ok(());
}
let stored_disk_id = self.disk.get_disk_id().await?;
if stored_disk_id != want_id {
return Err(Error::other(format!("Disk ID mismatch wanted {:?}, got {:?}", want_id, stored_disk_id)));
}
Ok(())
}
/// Check if disk ID is stale
async fn check_disk_stale(&self) -> Result<()> {
let Some(current_disk_id) = *self.disk_id.read().await else {
return Ok(());
};
let stored_disk_id = match self.disk.get_disk_id().await? {
Some(id) => id,
None => return Ok(()), // Empty disk ID is allowed during initialization
};
if current_disk_id != stored_disk_id {
return Err(DiskError::DiskNotFound);
}
Ok(())
}
/// Set the disk ID
pub async fn set_disk_id_internal(&self, id: Option<Uuid>) -> Result<()> {
let mut disk_id = self.disk_id.write().await;
*disk_id = id;
Ok(())
}
/// Get the current disk ID
pub async fn get_current_disk_id(&self) -> Option<Uuid> {
*self.disk_id.read().await
}
/// Track disk health for an operation.
/// This method should wrap disk operations to ensure health checking.
pub async fn track_disk_health<T, F, Fut>(&self, operation: F, timeout_duration: Duration) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
// Check if disk is faulty
if self.health.is_faulty() {
warn!("disk {} health is faulty, returning error", self.to_string());
return Err(DiskError::FaultyDisk);
}
// Check if disk is stale
self.check_disk_stale().await?;
// Record operation start
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
self.health.last_started.store(now, Ordering::Relaxed);
self.health.increment_waiting();
if timeout_duration == Duration::ZERO {
let result = operation().await;
self.health.decrement_waiting();
if result.is_ok() {
self.health.log_success();
}
return result;
}
// Execute the operation with timeout
let result = tokio::time::timeout(timeout_duration, operation()).await;
match result {
Ok(operation_result) => {
// Log success and decrement waiting counter
if operation_result.is_ok() {
self.health.log_success();
}
self.health.decrement_waiting();
operation_result
}
Err(_) => {
// Timeout occurred, mark disk as potentially faulty and decrement waiting counter
self.health.decrement_waiting();
warn!("disk operation timeout after {:?}", timeout_duration);
Err(DiskError::other(format!("disk operation timeout after {:?}", timeout_duration)))
}
}
}
}
#[async_trait::async_trait]
impl DiskAPI for LocalDiskWrapper {
fn to_string(&self) -> String {
self.disk.to_string()
}
async fn is_online(&self) -> bool {
let Ok(Some(disk_id)) = self.disk.get_disk_id().await else {
return false;
};
let Some(current_disk_id) = *self.disk_id.read().await else {
return false;
};
current_disk_id == disk_id
}
fn is_local(&self) -> bool {
self.disk.is_local()
}
fn host_name(&self) -> String {
self.disk.host_name()
}
fn endpoint(&self) -> Endpoint {
self.disk.endpoint()
}
async fn close(&self) -> Result<()> {
self.stop_monitoring().await;
self.disk.close().await
}
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
self.disk.get_disk_id().await
}
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
self.set_disk_id_internal(id).await
}
fn path(&self) -> PathBuf {
self.disk.path()
}
fn get_disk_location(&self) -> DiskLocation {
self.disk.get_disk_location()
}
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo> {
if opts.noop && opts.metrics {
let mut info = DiskInfo::default();
// Add health metrics
info.metrics.total_waiting = self.health.waiting_count();
if self.health.is_faulty() {
return Err(DiskError::FaultyDisk);
}
return Ok(info);
}
if self.health.is_faulty() {
return Err(DiskError::FaultyDisk);
}
let result = self.disk.disk_info(opts).await?;
if let Some(current_disk_id) = *self.disk_id.read().await
&& Some(current_disk_id) != result.id
{
return Err(DiskError::DiskNotFound);
};
Ok(result)
}
async fn make_volume(&self, volume: &str) -> Result<()> {
self.track_disk_health(|| async { self.disk.make_volume(volume).await }, get_max_timeout_duration())
.await
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
self.track_disk_health(|| async { self.disk.make_volumes(volumes).await }, get_max_timeout_duration())
.await
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
self.track_disk_health(|| async { self.disk.list_volumes().await }, Duration::ZERO)
.await
}
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
self.track_disk_health(|| async { self.disk.stat_volume(volume).await }, get_max_timeout_duration())
.await
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
self.track_disk_health(|| async { self.disk.delete_volume(volume).await }, Duration::ZERO)
.await
}
async fn walk_dir<W: tokio::io::AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
self.track_disk_health(|| async { self.disk.walk_dir(opts, wr).await }, Duration::ZERO)
.await
}
async fn delete_version(
&self,
volume: &str,
path: &str,
fi: FileInfo,
force_del_marker: bool,
opts: DeleteOptions,
) -> Result<()> {
self.track_disk_health(
|| async { self.disk.delete_version(volume, path, fi, force_del_marker, opts).await },
get_max_timeout_duration(),
)
.await
}
async fn delete_versions(&self, volume: &str, versions: Vec<FileInfoVersions>, opts: DeleteOptions) -> Vec<Option<Error>> {
// Check if disk is faulty before proceeding
if self.health.is_faulty() {
return vec![Some(DiskError::FaultyDisk); versions.len()];
}
// Check if disk is stale
if let Err(e) = self.check_disk_stale().await {
return vec![Some(e); versions.len()];
}
// Record operation start
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
self.health.last_started.store(now, Ordering::Relaxed);
self.health.increment_waiting();
// Execute the operation
let result = self.disk.delete_versions(volume, versions, opts).await;
self.health.decrement_waiting();
let has_err = result.iter().any(|e| e.is_some());
if !has_err {
// Log success and decrement waiting counter
self.health.log_success();
}
result
}
async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> {
self.track_disk_health(|| async { self.disk.delete_paths(volume, paths).await }, get_max_timeout_duration())
.await
}
async fn write_metadata(&self, org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
self.track_disk_health(
|| async { self.disk.write_metadata(org_volume, volume, path, fi).await },
get_max_timeout_duration(),
)
.await
}
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()> {
self.track_disk_health(
|| async { self.disk.update_metadata(volume, path, fi, opts).await },
get_max_timeout_duration(),
)
.await
}
async fn read_version(
&self,
org_volume: &str,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
self.track_disk_health(
|| async { self.disk.read_version(org_volume, volume, path, version_id, opts).await },
get_max_timeout_duration(),
)
.await
}
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
self.track_disk_health(|| async { self.disk.read_xl(volume, path, read_data).await }, get_max_timeout_duration())
.await
}
async fn rename_data(
&self,
src_volume: &str,
src_path: &str,
fi: FileInfo,
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
self.track_disk_health(
|| async { self.disk.rename_data(src_volume, src_path, fi, dst_volume, dst_path).await },
get_max_timeout_duration(),
)
.await
}
async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result<Vec<String>> {
self.track_disk_health(
|| async { self.disk.list_dir(origvolume, volume, dir_path, count).await },
get_max_timeout_duration(),
)
.await
}
async fn read_file(&self, volume: &str, path: &str) -> Result<crate::disk::FileReader> {
self.track_disk_health(|| async { self.disk.read_file(volume, path).await }, get_max_timeout_duration())
.await
}
async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<crate::disk::FileReader> {
self.track_disk_health(
|| async { self.disk.read_file_stream(volume, path, offset, length).await },
get_max_timeout_duration(),
)
.await
}
async fn append_file(&self, volume: &str, path: &str) -> Result<crate::disk::FileWriter> {
self.track_disk_health(|| async { self.disk.append_file(volume, path).await }, Duration::ZERO)
.await
}
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: i64) -> Result<crate::disk::FileWriter> {
self.track_disk_health(
|| async { self.disk.create_file(origvolume, volume, path, file_size).await },
Duration::ZERO,
)
.await
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
self.track_disk_health(
|| async { self.disk.rename_file(src_volume, src_path, dst_volume, dst_path).await },
get_max_timeout_duration(),
)
.await
}
async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Bytes) -> Result<()> {
self.track_disk_health(
|| async { self.disk.rename_part(src_volume, src_path, dst_volume, dst_path, meta).await },
get_max_timeout_duration(),
)
.await
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
self.track_disk_health(|| async { self.disk.delete(volume, path, opt).await }, get_max_timeout_duration())
.await
}
async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
self.track_disk_health(|| async { self.disk.verify_file(volume, path, fi).await }, Duration::ZERO)
.await
}
async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
self.track_disk_health(|| async { self.disk.check_parts(volume, path, fi).await }, Duration::ZERO)
.await
}
async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result<Vec<ObjectPartInfo>> {
self.track_disk_health(|| async { self.disk.read_parts(bucket, paths).await }, Duration::ZERO)
.await
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
self.track_disk_health(|| async { self.disk.read_multiple(req).await }, Duration::ZERO)
.await
}
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> {
self.track_disk_health(|| async { self.disk.write_all(volume, path, data).await }, get_max_timeout_duration())
.await
}
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
self.track_disk_health(|| async { self.disk.read_all(volume, path).await }, get_max_timeout_duration())
.await
}
}

View File

@@ -69,7 +69,7 @@ use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FormatInfo {
pub id: Option<Uuid>,
pub data: Bytes,
@@ -77,16 +77,6 @@ pub struct FormatInfo {
pub last_check: Option<OffsetDateTime>,
}
impl FormatInfo {
pub fn last_check_valid(&self) -> bool {
let now = OffsetDateTime::now_utc();
self.file_info.is_some()
&& self.id.is_some()
&& self.last_check.is_some()
&& (now.unix_timestamp() - self.last_check.unwrap().unix_timestamp() <= 1)
}
}
/// A helper enum to handle internal buffer types for writing data.
pub enum InternalBuf<'a> {
Ref(&'a [u8]),
@@ -185,7 +175,7 @@ impl LocalDisk {
};
let root_clone = root.clone();
let update_fn: UpdateFn<DiskInfo> = Box::new(move || {
let disk_id = id.map_or("".to_string(), |id| id.to_string());
let disk_id = id;
let root = root_clone.clone();
Box::pin(async move {
match get_disk_info(root.clone()).await {
@@ -200,7 +190,7 @@ impl LocalDisk {
minor: info.minor,
fs_type: info.fstype,
root_disk: root,
id: disk_id.to_string(),
id: disk_id,
..Default::default()
};
// if root {
@@ -1295,7 +1285,7 @@ impl DiskAPI for LocalDisk {
}
#[tracing::instrument(skip(self))]
async fn is_online(&self) -> bool {
self.check_format_json().await.is_ok()
true
}
#[tracing::instrument(skip(self))]
@@ -1342,24 +1332,40 @@ impl DiskAPI for LocalDisk {
#[tracing::instrument(level = "debug", skip(self))]
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
let mut format_info = self.format_info.write().await;
let format_info = {
let format_info = self.format_info.read().await;
format_info.clone()
};
let id = format_info.id;
if format_info.last_check_valid() {
return Ok(id);
// if format_info.last_check_valid() {
// return Ok(id);
// }
if format_info.file_info.is_some() && id.is_some() {
// check last check time
if let Some(last_check) = format_info.last_check {
if last_check.unix_timestamp() + 1 < OffsetDateTime::now_utc().unix_timestamp() {
return Ok(id);
}
}
}
let file_meta = self.check_format_json().await?;
if let Some(file_info) = &format_info.file_info {
if super::fs::same_file(&file_meta, file_info) {
let mut format_info = self.format_info.write().await;
format_info.last_check = Some(OffsetDateTime::now_utc());
drop(format_info);
return Ok(id);
}
}
debug!("get_disk_id: read format.json");
let b = fs::read(&self.format_path).await.map_err(to_unformatted_disk_error)?;
let fm = FormatV3::try_from(b.as_slice()).map_err(|e| {
@@ -1375,20 +1381,19 @@ impl DiskAPI for LocalDisk {
return Err(DiskError::InconsistentDisk);
}
let mut format_info = self.format_info.write().await;
format_info.id = Some(disk_id);
format_info.file_info = Some(file_meta);
format_info.data = b.into();
format_info.last_check = Some(OffsetDateTime::now_utc());
drop(format_info);
Ok(Some(disk_id))
}
#[tracing::instrument(skip(self))]
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
async fn set_disk_id(&self, _id: Option<Uuid>) -> Result<()> {
// No setup is required locally
// TODO: add check_id_store
let mut format_info = self.format_info.write().await;
format_info.id = id;
Ok(())
}
@@ -2438,6 +2443,10 @@ impl DiskAPI for LocalDisk {
info.endpoint = self.endpoint.to_string();
info.scanning = self.scanning.load(Ordering::SeqCst) == 1;
if info.id.is_none() {
info.id = self.get_disk_id().await.unwrap_or(None);
}
Ok(info)
}
}
@@ -2705,39 +2714,6 @@ mod test {
}
}
#[tokio::test]
async fn test_format_info_last_check_valid() {
let now = OffsetDateTime::now_utc();
// Valid format info
let valid_format_info = FormatInfo {
id: Some(Uuid::new_v4()),
data: vec![1, 2, 3].into(),
file_info: Some(fs::metadata("../../../..").await.unwrap()),
last_check: Some(now),
};
assert!(valid_format_info.last_check_valid());
// Invalid format info (missing id)
let invalid_format_info = FormatInfo {
id: None,
data: vec![1, 2, 3].into(),
file_info: Some(fs::metadata("../../../..").await.unwrap()),
last_check: Some(now),
};
assert!(!invalid_format_info.last_check_valid());
// Invalid format info (old timestamp)
let old_time = OffsetDateTime::now_utc() - time::Duration::seconds(10);
let old_format_info = FormatInfo {
id: Some(Uuid::new_v4()),
data: vec![1, 2, 3].into(),
file_info: Some(fs::metadata("../../../..").await.unwrap()),
last_check: Some(old_time),
};
assert!(!old_format_info.last_check_valid());
}
#[tokio::test]
async fn test_read_file_exists() {
let test_file = "./test_read_exists.txt";

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod disk_store;
pub mod endpoint;
pub mod error;
pub mod error_conv;
@@ -30,6 +31,7 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json";
pub const STORAGE_FORMAT_FILE: &str = "xl.meta";
pub const STORAGE_FORMAT_FILE_BACKUP: &str = "xl.meta.bkp";
use crate::disk::disk_store::LocalDiskWrapper;
use crate::rpc::RemoteDisk;
use bytes::Bytes;
use endpoint::Endpoint;
@@ -51,7 +53,7 @@ pub type FileWriter = Box<dyn AsyncWrite + Send + Sync + Unpin>;
#[derive(Debug)]
pub enum Disk {
Local(Box<LocalDisk>),
Local(Box<LocalDiskWrapper>),
Remote(Box<RemoteDisk>),
}
@@ -398,7 +400,7 @@ impl DiskAPI for Disk {
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
if ep.is_local {
let s = LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Disk::Local(Box::new(s))))
Ok(Arc::new(Disk::Local(Box::new(LocalDiskWrapper::new(Arc::new(s), opt.health_check)))))
} else {
let remote_disk = RemoteDisk::new(ep, opt).await?;
Ok(Arc::new(Disk::Remote(Box::new(remote_disk))))
@@ -534,7 +536,7 @@ pub struct DiskInfo {
pub scanning: bool,
pub endpoint: String,
pub mount_path: String,
pub id: String,
pub id: Option<Uuid>,
pub rotational: bool,
pub metrics: DiskMetrics,
pub error: String,
@@ -1015,7 +1017,7 @@ mod tests {
let endpoint = Endpoint::try_from(test_dir).unwrap();
let local_disk = LocalDisk::new(&endpoint, false).await.unwrap();
let disk = Disk::Local(Box::new(local_disk));
let disk = Disk::Local(Box::new(LocalDiskWrapper::new(Arc::new(local_disk), false)));
// Test basic methods
assert!(disk.is_local());

View File

@@ -13,14 +13,18 @@
// limitations under the License.
use crate::bucket::metadata_sys;
use crate::disk::error::DiskError;
use crate::disk::error::{Error, Result};
use crate::disk::error_reduce::{BUCKET_OP_IGNORED_ERRS, is_all_buckets_not_found, reduce_write_quorum_errs};
use crate::disk::{DiskAPI, DiskStore};
use crate::disk::{DiskAPI, DiskStore, disk_store::get_max_timeout_duration};
use crate::global::GLOBAL_LOCAL_DISK_MAP;
use crate::store::all_local_disk;
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::{
disk::{self, VolumeInfo},
disk::{
self, VolumeInfo,
disk_store::{CHECK_EVERY, CHECK_TIMEOUT_DURATION, DiskHealthTracker},
},
endpoints::{EndpointServerPools, Node},
store_api::{BucketInfo, BucketOptions, DeleteBucketOptions, MakeBucketOptions},
};
@@ -32,10 +36,11 @@ use rustfs_protos::node_service_time_out_client;
use rustfs_protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::RwLock;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tokio::{net::TcpStream, sync::RwLock, time};
use tokio_util::sync::CancellationToken;
use tonic::Request;
use tracing::info;
use tracing::{debug, info, warn};
type Client = Arc<Box<dyn PeerS3Client>>;
@@ -559,16 +564,160 @@ pub struct RemotePeerS3Client {
pub node: Option<Node>,
pub pools: Option<Vec<usize>>,
addr: String,
/// Health tracker for connection monitoring
health: Arc<DiskHealthTracker>,
/// Cancellation token for monitoring tasks
cancel_token: CancellationToken,
}
impl RemotePeerS3Client {
pub fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string();
Self { node, pools, addr }
let client = Self {
node,
pools,
addr,
health: Arc::new(DiskHealthTracker::new()),
cancel_token: CancellationToken::new(),
};
// Start health monitoring
client.start_health_monitoring();
client
}
pub fn get_addr(&self) -> String {
self.addr.clone()
}
/// Start health monitoring for the remote peer
fn start_health_monitoring(&self) {
let health = Arc::clone(&self.health);
let cancel_token = self.cancel_token.clone();
let addr = self.addr.clone();
tokio::spawn(async move {
Self::monitor_remote_peer_health(addr, health, cancel_token).await;
});
}
/// Monitor remote peer health periodically
async fn monitor_remote_peer_health(addr: String, health: Arc<DiskHealthTracker>, cancel_token: CancellationToken) {
let mut interval = time::interval(CHECK_EVERY);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
debug!("Health monitoring cancelled for remote peer: {}", addr);
return;
}
_ = interval.tick() => {
if cancel_token.is_cancelled() {
return;
}
// Skip health check if peer is already marked as faulty
if health.is_faulty() {
continue;
}
// Perform basic connectivity check
if Self::perform_connectivity_check(&addr).await.is_err() && health.swap_ok_to_faulty() {
warn!("Remote peer health check failed for {}: marking as faulty", addr);
// Start recovery monitoring
let health_clone = Arc::clone(&health);
let addr_clone = addr.clone();
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
Self::monitor_remote_peer_recovery(addr_clone, health_clone, cancel_clone).await;
});
}
}
}
}
}
/// Monitor remote peer recovery and mark as healthy when recovered
async fn monitor_remote_peer_recovery(addr: String, health: Arc<DiskHealthTracker>, cancel_token: CancellationToken) {
let mut interval = time::interval(Duration::from_secs(5)); // Check every 5 seconds
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
return;
}
_ = interval.tick() => {
if Self::perform_connectivity_check(&addr).await.is_ok() {
info!("Remote peer recovered: {}", addr);
health.set_ok();
return;
}
}
}
}
}
/// Perform basic connectivity check for remote peer
async fn perform_connectivity_check(addr: &str) -> Result<()> {
use tokio::time::timeout;
let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?;
let Some(host) = url.host_str() else {
return Err(Error::other("No host in URL".to_string()));
};
let port = url.port_or_known_default().unwrap_or(80);
// Try to establish TCP connection
match timeout(CHECK_TIMEOUT_DURATION, TcpStream::connect((host, port))).await {
Ok(Ok(_)) => Ok(()),
_ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))),
}
}
/// Execute operation with timeout and health tracking
async fn execute_with_timeout<T, F, Fut>(&self, operation: F, timeout_duration: Duration) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
// Check if peer is faulty
if self.health.is_faulty() {
return Err(DiskError::FaultyDisk);
}
// Record operation start
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
self.health.last_started.store(now, std::sync::atomic::Ordering::Relaxed);
self.health.increment_waiting();
// Execute operation with timeout
let result = time::timeout(timeout_duration, operation()).await;
match result {
Ok(operation_result) => {
// Log success and decrement waiting counter
if operation_result.is_ok() {
self.health.log_success();
}
self.health.decrement_waiting();
operation_result
}
Err(_) => {
// Timeout occurred, mark peer as potentially faulty
self.health.decrement_waiting();
warn!("Remote peer operation timeout after {:?}", timeout_duration);
Err(Error::other(format!("Remote peer operation timeout after {:?}", timeout_duration)))
}
}
}
}
#[async_trait]
@@ -578,115 +727,145 @@ impl PeerS3Client for RemotePeerS3Client {
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
let options: String = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(HealBucketRequest {
bucket: bucket.to_string(),
options,
});
let response = client.heal_bucket(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
self.execute_with_timeout(
|| async {
let options: String = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(HealBucketRequest {
bucket: bucket.to_string(),
options,
});
let response = client.heal_bucket(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
Ok(HealResultItem {
heal_item_type: HealItemType::Bucket.to_string(),
bucket: bucket.to_string(),
set_count: 0,
..Default::default()
})
Ok(HealResultItem {
heal_item_type: HealItemType::Bucket.to_string(),
bucket: bucket.to_string(),
set_count: 0,
..Default::default()
})
},
get_max_timeout_duration(),
)
.await
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
let bucket_infos = response
.bucket_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<BucketInfo>(&json_str).ok())
.collect();
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
let bucket_infos = response
.bucket_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<BucketInfo>(&json_str).ok())
.collect();
Ok(bucket_infos)
Ok(bucket_infos)
},
get_max_timeout_duration(),
)
.await
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
});
let response = client.make_bucket(request).await?.into_inner();
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
});
let response = client.make_bucket(request).await?.into_inner();
// TODO: deal with error
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
// TODO: deal with error
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
Ok(())
Ok(())
},
get_max_timeout_duration(),
)
.await
}
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
});
let response = client.get_bucket_info(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
let bucket_info = serde_json::from_str::<BucketInfo>(&response.bucket_info)?;
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
});
let response = client.get_bucket_info(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
let bucket_info = serde_json::from_str::<BucketInfo>(&response.bucket_info)?;
Ok(bucket_info)
Ok(bucket_info)
},
get_max_timeout_duration(),
)
.await
}
async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});
let response = client.delete_bucket(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});
let response = client.delete_bucket(request).await?.into_inner();
if !response.success {
return if let Some(err) = response.error {
Err(err.into())
} else {
Err(Error::other(""))
};
}
Ok(())
Ok(())
},
get_max_timeout_duration(),
)
.await
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -174,56 +174,56 @@ impl SetDisks {
})
}
async fn cached_disk_health(&self, index: usize) -> Option<bool> {
let cache = self.disk_health_cache.read().await;
cache
.get(index)
.and_then(|entry| entry.as_ref().and_then(|state| state.cached_value()))
}
// async fn cached_disk_health(&self, index: usize) -> Option<bool> {
// let cache = self.disk_health_cache.read().await;
// cache
// .get(index)
// .and_then(|entry| entry.as_ref().and_then(|state| state.cached_value()))
// }
async fn update_disk_health(&self, index: usize, online: bool) {
let mut cache = self.disk_health_cache.write().await;
if cache.len() <= index {
cache.resize(index + 1, None);
}
cache[index] = Some(DiskHealthEntry {
last_check: Instant::now(),
online,
});
}
// async fn update_disk_health(&self, index: usize, online: bool) {
// let mut cache = self.disk_health_cache.write().await;
// if cache.len() <= index {
// cache.resize(index + 1, None);
// }
// cache[index] = Some(DiskHealthEntry {
// last_check: Instant::now(),
// online,
// });
// }
async fn is_disk_online_cached(&self, index: usize, disk: &DiskStore) -> bool {
if let Some(online) = self.cached_disk_health(index).await {
return online;
}
// async fn is_disk_online_cached(&self, index: usize, disk: &DiskStore) -> bool {
// if let Some(online) = self.cached_disk_health(index).await {
// return online;
// }
let disk_clone = disk.clone();
let online = timeout(DISK_ONLINE_TIMEOUT, async move { disk_clone.is_online().await })
.await
.unwrap_or(false);
self.update_disk_health(index, online).await;
online
}
// let disk_clone = disk.clone();
// let online = timeout(DISK_ONLINE_TIMEOUT, async move { disk_clone.is_online().await })
// .await
// .unwrap_or(false);
// self.update_disk_health(index, online).await;
// online
// }
async fn filter_online_disks(&self, disks: Vec<Option<DiskStore>>) -> (Vec<Option<DiskStore>>, usize) {
let mut filtered = Vec::with_capacity(disks.len());
let mut online_count = 0;
// async fn filter_online_disks(&self, disks: Vec<Option<DiskStore>>) -> (Vec<Option<DiskStore>>, usize) {
// let mut filtered = Vec::with_capacity(disks.len());
// let mut online_count = 0;
for (idx, disk) in disks.into_iter().enumerate() {
if let Some(disk_store) = disk {
if self.is_disk_online_cached(idx, &disk_store).await {
filtered.push(Some(disk_store));
online_count += 1;
} else {
filtered.push(None);
}
} else {
filtered.push(None);
}
}
// for (idx, disk) in disks.into_iter().enumerate() {
// if let Some(disk_store) = disk {
// if self.is_disk_online_cached(idx, &disk_store).await {
// filtered.push(Some(disk_store));
// online_count += 1;
// } else {
// filtered.push(None);
// }
// } else {
// filtered.push(None);
// }
// }
(filtered, online_count)
}
// (filtered, online_count)
// }
fn format_lock_error(&self, bucket: &str, object: &str, mode: &str, err: &LockResult) -> String {
match err {
LockResult::Timeout => {
@@ -259,9 +259,28 @@ impl SetDisks {
}
async fn get_online_disks(&self) -> Vec<Option<DiskStore>> {
let disks = self.get_disks_internal().await;
let (filtered, _) = self.filter_online_disks(disks).await;
filtered.into_iter().filter(|disk| disk.is_some()).collect()
let mut disks = self.get_disks_internal().await;
// TODO: diskinfo filter online
let mut new_disk = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if let Some(d) = disk {
if d.is_online().await {
new_disk.push(disk.clone());
}
}
}
let mut rng = rand::rng();
disks.shuffle(&mut rng);
new_disk
// let disks = self.get_disks_internal().await;
// let (filtered, _) = self.filter_online_disks(disks).await;
// filtered.into_iter().filter(|disk| disk.is_some()).collect()
}
async fn get_online_local_disks(&self) -> Vec<Option<DiskStore>> {
let mut disks = self.get_online_disks().await;
@@ -1467,7 +1486,9 @@ impl SetDisks {
let object = object.clone();
let version_id = version_id.clone();
tokio::spawn(async move {
if let Some(disk) = disk {
if let Some(disk) = disk
&& disk.is_online().await
{
if version_id.is_empty() {
match disk.read_xl(&bucket, &object, read_data).await {
Ok(info) => {
@@ -1799,14 +1820,14 @@ impl SetDisks {
}
pub async fn renew_disk(&self, ep: &Endpoint) {
debug!("renew_disk start {:?}", ep);
debug!("renew_disk: start {:?}", ep);
let (new_disk, fm) = match Self::connect_endpoint(ep).await {
Ok(res) => res,
Err(e) => {
warn!("connect_endpoint err {:?}", &e);
warn!("renew_disk: connect_endpoint err {:?}", &e);
if ep.is_local && e == DiskError::UnformattedDisk {
info!("unformatteddisk will trigger heal_disk, {:?}", ep);
info!("renew_disk unformatteddisk will trigger heal_disk, {:?}", ep);
let set_disk_id = format!("pool_{}_set_{}", ep.pool_idx, ep.set_idx);
let _ = send_heal_disk(set_disk_id, Some(HealChannelPriority::Normal)).await;
}
@@ -1817,7 +1838,7 @@ impl SetDisks {
let (set_idx, disk_idx) = match self.find_disk_index(&fm) {
Ok(res) => res,
Err(e) => {
warn!("find_disk_index err {:?}", e);
warn!("renew_disk: find_disk_index err {:?}", e);
return;
}
};
@@ -1837,7 +1858,7 @@ impl SetDisks {
}
}
debug!("renew_disk update {:?}", fm.erasure.this);
debug!("renew_disk: update {:?}", fm.erasure.this);
let mut disk_lock = self.disks.write().await;
disk_lock[disk_idx] = Some(new_disk);
@@ -3051,7 +3072,7 @@ impl SetDisks {
for (index, disk) in latest_disks.iter().enumerate() {
if let Some(outdated_disk) = &out_dated_disks[index] {
info!(disk_index = index, "Creating writer for outdated disk");
let writer = create_bitrot_writer(
let writer = match create_bitrot_writer(
is_inline_buffer,
Some(outdated_disk),
RUSTFS_META_TMP_BUCKET,
@@ -3060,7 +3081,19 @@ impl SetDisks {
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await?;
.await
{
Ok(writer) => writer,
Err(err) => {
warn!(
"create_bitrot_writer disk {}, err {:?}, skipping operation",
outdated_disk.to_string(),
err
);
writers.push(None);
continue;
}
};
writers.push(Some(writer));
} else {
info!(disk_index = index, "Skipping writer (disk not outdated)");
@@ -3790,8 +3823,8 @@ impl ObjectIO for SetDisks {
#[tracing::instrument(level = "debug", skip(self, data,))]
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
let disks_snapshot = self.get_disks_internal().await;
let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
let disks = self.get_disks_internal().await;
// let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
// Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop.
let _object_lock_guard = if !opts.no_lock {
@@ -3832,13 +3865,13 @@ impl ObjectIO for SetDisks {
write_quorum += 1
}
if filtered_online < write_quorum {
warn!(
"online disk snapshot {} below write quorum {} for {}/{}; returning erasure write quorum error",
filtered_online, write_quorum, bucket, object
);
return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
}
// if filtered_online < write_quorum {
// warn!(
// "online disk snapshot {} below write quorum {} for {}/{}; returning erasure write quorum error",
// filtered_online, write_quorum, bucket, object
// );
// return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
// }
let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives);
@@ -3877,8 +3910,10 @@ impl ObjectIO for SetDisks {
let mut writers = Vec::with_capacity(shuffle_disks.len());
let mut errors = Vec::with_capacity(shuffle_disks.len());
for disk_op in shuffle_disks.iter() {
if let Some(disk) = disk_op {
let writer = create_bitrot_writer(
if let Some(disk) = disk_op
&& disk.is_online().await
{
let writer = match create_bitrot_writer(
is_inline_buffer,
Some(disk),
RUSTFS_META_TMP_BUCKET,
@@ -3887,29 +3922,16 @@ impl ObjectIO for SetDisks {
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await?;
// let writer = if is_inline_buffer {
// BitrotWriter::new(
// Writer::from_cursor(Cursor::new(Vec::new())),
// erasure.shard_size(),
// HashAlgorithm::HighwayHash256,
// )
// } else {
// let f = match disk
// .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_object, erasure.shard_file_size(data.content_length))
// .await
// {
// Ok(f) => f,
// Err(e) => {
// errors.push(Some(e));
// writers.push(None);
// continue;
// }
// };
// BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256)
// };
.await
{
Ok(writer) => writer,
Err(err) => {
warn!("create_bitrot_writer disk {}, err {:?}, skipping operation", disk.to_string(), err);
errors.push(Some(err));
writers.push(None);
continue;
}
};
writers.push(Some(writer));
errors.push(None);
@@ -4072,7 +4094,7 @@ impl StorageAPI for SetDisks {
async fn local_storage_info(&self) -> rustfs_madmin::StorageInfo {
let disks = self.get_disks_internal().await;
let mut local_disks: Vec<Option<Arc<disk::Disk>>> = Vec::new();
let mut local_disks: Vec<Option<DiskStore>> = Vec::new();
let mut local_endpoints = Vec::new();
for (i, ep) in self.set_endpoints.iter().enumerate() {
@@ -4908,9 +4930,7 @@ impl StorageAPI for SetDisks {
for disk in disks.iter() {
if let Some(disk) = disk {
if disk.is_online().await {
continue;
}
continue;
}
let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err")).await;
break;
@@ -5129,16 +5149,16 @@ impl StorageAPI for SetDisks {
return Err(Error::other(format!("checksum mismatch: {checksum}")));
}
let disks_snapshot = self.get_disks_internal().await;
let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
let disks = self.get_disks_internal().await;
// let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await;
if filtered_online < write_quorum {
warn!(
"online disk snapshot {} below write quorum {} for multipart {}/{}; returning erasure write quorum error",
filtered_online, write_quorum, bucket, object
);
return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
}
// if filtered_online < write_quorum {
// warn!(
// "online disk snapshot {} below write quorum {} for multipart {}/{}; returning erasure write quorum error",
// filtered_online, write_quorum, bucket, object
// );
// return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object]));
// }
let shuffle_disks = Self::shuffle_disks(&disks, &fi.erasure.distribution);
@@ -5152,7 +5172,7 @@ impl StorageAPI for SetDisks {
let mut errors = Vec::with_capacity(shuffle_disks.len());
for disk_op in shuffle_disks.iter() {
if let Some(disk) = disk_op {
let writer = create_bitrot_writer(
let writer = match create_bitrot_writer(
false,
Some(disk),
RUSTFS_META_TMP_BUCKET,
@@ -5161,23 +5181,16 @@ impl StorageAPI for SetDisks {
erasure.shard_size(),
HashAlgorithm::HighwayHash256,
)
.await?;
// let writer = {
// let f = match disk
// .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, erasure.shard_file_size(data.content_length))
// .await
// {
// Ok(f) => f,
// Err(e) => {
// errors.push(Some(e));
// writers.push(None);
// continue;
// }
// };
// BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256)
// };
.await
{
Ok(writer) => writer,
Err(err) => {
warn!("create_bitrot_writer disk {}, err {:?}, skipping operation", disk.to_string(), err);
errors.push(Some(err));
writers.push(None);
continue;
}
};
writers.push(Some(writer));
errors.push(None);
@@ -6769,7 +6782,7 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<ru
healing: res.healing,
scanning: res.scanning,
uuid: res.id.clone(),
uuid: res.id.map_or("".to_string(), |id| id.to_string()),
major: res.major as u32,
minor: res.minor as u32,
model: None,

View File

@@ -255,7 +255,7 @@ impl Sets {
self.connect_disks().await;
// TODO: config interval
let mut interval = tokio::time::interval(Duration::from_secs(15 * 3));
let mut interval = tokio::time::interval(Duration::from_secs(15));
loop {
tokio::select! {
_= interval.tick()=>{

View File

@@ -265,7 +265,10 @@ pub async fn load_format_erasure(disk: &DiskStore, heal: bool) -> disk::error::R
.map_err(|e| match e {
DiskError::FileNotFound => DiskError::UnformattedDisk,
DiskError::DiskNotFound => DiskError::UnformattedDisk,
_ => e,
_ => {
warn!("load_format_erasure err: {:?} {:?}", disk.to_string(), e);
e
}
})?;
let mut fm = FormatV3::try_from(data.as_ref())?;
@@ -312,17 +315,18 @@ async fn save_format_file_all(disks: &[Option<DiskStore>], formats: &[Option<For
}
pub async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -> disk::error::Result<()> {
if disk.is_none() {
let Some(disk) = disk else {
return Err(DiskError::DiskNotFound);
}
};
let format = format.as_ref().unwrap();
let Some(format) = format else {
return Err(DiskError::other("format is none"));
};
let json_data = format.to_json()?;
let tmpfile = Uuid::new_v4().to_string();
let disk = disk.as_ref().unwrap();
disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data.into_bytes().into())
.await?;

View File

@@ -26,6 +26,11 @@ use tonic::{
};
use tracing::{debug, warn};
// Type alias for the complex client type
pub type NodeServiceClientType = NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>;
pub use generated::*;
// Default 100 MB

View File

@@ -48,6 +48,14 @@ pub fn parse_bool(str: &str) -> Result<bool> {
}
}
pub fn parse_bool_with_default(str: &str, default: bool) -> bool {
match str {
"1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => true,
"0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => false,
_ => default,
}
}
/// Matches a simple pattern against a name using wildcards.
///
/// # Arguments