diff --git a/Cargo.lock b/Cargo.lock index 08641555..ef5e6041 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7302,6 +7302,7 @@ dependencies = [ "tonic", "tower", "tracing", + "tracing-subscriber", "url", "urlencoding", "uuid", diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index 39c5f8fd..4e287e38 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -468,14 +468,17 @@ impl HealManager { let active_heals = self.active_heals.clone(); let cancel_token = self.cancel_token.clone(); let storage = self.storage.clone(); - - info!( - "start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}", - config.read().await.heal_interval - ); + let mut duration = { + let config = config.read().await; + config.heal_interval + }; + if duration < Duration::from_secs(1) { + duration = Duration::from_secs(1); + } + info!("start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}", duration); tokio::spawn(async move { - let mut interval = interval(config.read().await.heal_interval); + let mut interval = interval(duration); loop { tokio::select! { diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 93ea5fec..eaf85255 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -30,7 +30,7 @@ use rustfs_ecstore::{ bucket::versioning::VersioningApi, bucket::versioning_sys::BucketVersioningSys, data_usage::{aggregate_local_snapshots, compute_bucket_usage, store_data_usage_in_backend}, - disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions}, + disk::{DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions}, set_disk::SetDisks, store_api::ObjectInfo, }; @@ -1977,7 +1977,7 @@ impl Scanner { } else { // Apply lifecycle actions if let Some(lifecycle_config) = &lifecycle_config { - if let Disk::Local(_local_disk) = &**disk { + if disk.is_local() { let vcfg = BucketVersioningSys::get(bucket).await.ok(); let mut scanner_item = ScannerItem { diff --git a/crates/ahm/tests/data_usage_fallback_test.rs b/crates/ahm/tests/data_usage_fallback_test.rs index 48fd5457..03a7cfe5 100644 --- a/crates/ahm/tests/data_usage_fallback_test.rs +++ b/crates/ahm/tests/data_usage_fallback_test.rs @@ -21,10 +21,11 @@ use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetad use rustfs_ecstore::endpoints::EndpointServerPools; use rustfs_ecstore::store::ECStore; use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI}; -use std::sync::Arc; +use std::sync::{Arc, Once}; use tempfile::TempDir; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; +use tracing::Level; /// Build a minimal single-node ECStore over a temp directory and populate objects. async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc) { @@ -74,8 +75,22 @@ async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc, Arc, Arc)> = OnceLock::new(); static INIT: Once = Once::new(); -fn init_tracing() { +pub fn init_tracing() { INIT.call_once(|| { - let _ = tracing_subscriber::fmt::try_init(); + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()) + .with_thread_names(true) + .try_init(); }); } @@ -356,7 +360,7 @@ mod serial_tests { // Create heal manager with faster interval let cfg = HealConfig { - heal_interval: Duration::from_secs(2), + heal_interval: Duration::from_secs(1), ..Default::default() }; let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); diff --git a/crates/ecstore/Cargo.toml b/crates/ecstore/Cargo.toml index b2cfda4d..bd021c19 100644 --- a/crates/ecstore/Cargo.toml +++ b/crates/ecstore/Cargo.toml @@ -113,6 +113,7 @@ faster-hex = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } criterion = { workspace = true, features = ["html_reports"] } temp-env = { workspace = true } +tracing-subscriber = { workspace = true } [build-dependencies] shadow-rs = { workspace = true, features = ["build", "metadata"] } diff --git a/crates/ecstore/src/cache_value/metacache_set.rs b/crates/ecstore/src/cache_value/metacache_set.rs index 621ffea7..b71b2c30 100644 --- a/crates/ecstore/src/cache_value/metacache_set.rs +++ b/crates/ecstore/src/cache_value/metacache_set.rs @@ -16,7 +16,7 @@ use crate::disk::error::DiskError; use crate::disk::{self, DiskAPI, DiskStore, WalkDirOptions}; use futures::future::join_all; use rustfs_filemeta::{MetaCacheEntries, MetaCacheEntry, MetacacheReader, is_io_eof}; -use std::{future::Future, pin::Pin, sync::Arc}; +use std::{future::Future, pin::Pin}; use tokio::spawn; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -71,14 +71,14 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d let mut jobs: Vec>> = Vec::new(); let mut readers = Vec::with_capacity(opts.disks.len()); - let fds = Arc::new(opts.fallback_disks.clone()); + let fds = opts.fallback_disks.iter().flatten().cloned().collect::>(); let cancel_rx = CancellationToken::new(); for disk in opts.disks.iter() { let opdisk = disk.clone(); let opts_clone = opts.clone(); - let fds_clone = fds.clone(); + let mut fds_clone = fds.clone(); let cancel_rx_clone = cancel_rx.clone(); let (rd, mut wr) = tokio::io::duplex(64); readers.push(MetacacheReader::new(rd)); @@ -113,21 +113,20 @@ pub async fn list_path_raw(rx: CancellationToken, opts: ListPathRawOptions) -> d } while need_fallback { - // warn!("list_path_raw: while need_fallback start"); - let disk = match fds_clone.iter().find(|d| d.is_some()) { - Some(d) => { - if let Some(disk) = d.clone() { - disk - } else { - warn!("list_path_raw: fallback disk is none"); - break; - } - } - None => { - warn!("list_path_raw: fallback disk is none2"); - break; + let disk_op = { + if fds_clone.is_empty() { + None + } else { + let disk = fds_clone.remove(0); + if disk.is_online().await { Some(disk.clone()) } else { None } } }; + + let Some(disk) = disk_op else { + warn!("list_path_raw: fallback disk is none"); + break; + }; + match disk .as_ref() .walk_dir( diff --git a/crates/ecstore/src/disk/disk_store.rs b/crates/ecstore/src/disk/disk_store.rs new file mode 100644 index 00000000..3ccd8c7d --- /dev/null +++ b/crates/ecstore/src/disk/disk_store.rs @@ -0,0 +1,770 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::disk::{ + CheckPartsResp, DeleteOptions, DiskAPI, DiskError, DiskInfo, DiskInfoOptions, DiskLocation, Endpoint, Error, + FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, Result, UpdateMetadataOpts, VolumeInfo, + WalkDirOptions, local::LocalDisk, +}; +use bytes::Bytes; +use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo}; +use rustfs_utils::string::parse_bool_with_default; +use std::{ + path::PathBuf, + sync::{ + Arc, + atomic::{AtomicI64, AtomicU32, Ordering}, + }, + time::Duration, +}; +use tokio::{sync::RwLock, time}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +/// Disk health status constants +const DISK_HEALTH_OK: u32 = 0; +const DISK_HEALTH_FAULTY: u32 = 1; + +pub const ENV_RUSTFS_DRIVE_ACTIVE_MONITORING: &str = "RUSTFS_DRIVE_ACTIVE_MONITORING"; +pub const ENV_RUSTFS_DRIVE_MAX_TIMEOUT_DURATION: &str = "RUSTFS_DRIVE_MAX_TIMEOUT_DURATION"; +pub const CHECK_EVERY: Duration = Duration::from_secs(15); +pub const SKIP_IF_SUCCESS_BEFORE: Duration = Duration::from_secs(5); +pub const CHECK_TIMEOUT_DURATION: Duration = Duration::from_secs(5); + +lazy_static::lazy_static! { + static ref TEST_OBJ: String = format!("health-check-{}", Uuid::new_v4()); + static ref TEST_DATA: Bytes = Bytes::from(vec![42u8; 2048]); + static ref TEST_BUCKET: String = ".rustfs.sys/tmp".to_string(); +} + +pub fn get_max_timeout_duration() -> Duration { + std::env::var(ENV_RUSTFS_DRIVE_MAX_TIMEOUT_DURATION) + .map(|v| Duration::from_secs(v.parse::().unwrap_or(30))) + .unwrap_or(Duration::from_secs(30)) +} + +/// DiskHealthTracker tracks the health status of a disk. +/// Similar to Go's diskHealthTracker. +#[derive(Debug)] +pub struct DiskHealthTracker { + /// Atomic timestamp of last successful operation + pub last_success: AtomicI64, + /// Atomic timestamp of last operation start + pub last_started: AtomicI64, + /// Atomic disk status (OK or Faulty) + pub status: AtomicU32, + /// Atomic number of waiting operations + pub waiting: AtomicU32, +} + +impl DiskHealthTracker { + /// Create a new disk health tracker + pub fn new() -> Self { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + + Self { + last_success: AtomicI64::new(now), + last_started: AtomicI64::new(now), + status: AtomicU32::new(DISK_HEALTH_OK), + waiting: AtomicU32::new(0), + } + } + + /// Log a successful operation + pub fn log_success(&self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + self.last_success.store(now, Ordering::Relaxed); + } + + /// Check if disk is faulty + pub fn is_faulty(&self) -> bool { + self.status.load(Ordering::Relaxed) == DISK_HEALTH_FAULTY + } + + /// Set disk as faulty + pub fn set_faulty(&self) { + self.status.store(DISK_HEALTH_FAULTY, Ordering::Relaxed); + } + + /// Set disk as OK + pub fn set_ok(&self) { + self.status.store(DISK_HEALTH_OK, Ordering::Relaxed); + } + + pub fn swap_ok_to_faulty(&self) -> bool { + self.status + .compare_exchange(DISK_HEALTH_OK, DISK_HEALTH_FAULTY, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + /// Increment waiting operations counter + pub fn increment_waiting(&self) { + self.waiting.fetch_add(1, Ordering::Relaxed); + } + + /// Decrement waiting operations counter + pub fn decrement_waiting(&self) { + self.waiting.fetch_sub(1, Ordering::Relaxed); + } + + /// Get waiting operations count + pub fn waiting_count(&self) -> u32 { + self.waiting.load(Ordering::Relaxed) + } + + /// Get last success timestamp + pub fn last_success(&self) -> i64 { + self.last_success.load(Ordering::Relaxed) + } +} + +impl Default for DiskHealthTracker { + fn default() -> Self { + Self::new() + } +} + +/// Health check context key for tracking disk operations +#[derive(Debug, Clone)] +struct HealthDiskCtxKey; + +#[derive(Debug)] +struct HealthDiskCtxValue { + last_success: Arc, +} + +impl HealthDiskCtxValue { + fn log_success(&self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + self.last_success.store(now, Ordering::Relaxed); + } +} + +/// LocalDiskWrapper wraps a DiskStore with health tracking capabilities. +/// This is similar to Go's xlStorageDiskIDCheck. +#[derive(Debug, Clone)] +pub struct LocalDiskWrapper { + /// The underlying disk store + disk: Arc, + /// Health tracker + health: Arc, + /// Whether health checking is enabled + health_check: bool, + /// Cancellation token for monitoring tasks + cancel_token: CancellationToken, + /// Disk ID for stale checking + disk_id: Arc>>, +} + +impl LocalDiskWrapper { + /// Create a new LocalDiskWrapper + pub fn new(disk: Arc, health_check: bool) -> Self { + // Check environment variable for health check override + // Default to true if not set, but only enable if both param and env are true + let env_health_check = std::env::var(ENV_RUSTFS_DRIVE_ACTIVE_MONITORING) + .map(|v| parse_bool_with_default(&v, true)) + .unwrap_or(true); + + let ret = Self { + disk, + health: Arc::new(DiskHealthTracker::new()), + health_check: health_check && env_health_check, + cancel_token: CancellationToken::new(), + disk_id: Arc::new(RwLock::new(None)), + }; + + ret.start_monitoring(); + + ret + } + + pub fn get_disk(&self) -> Arc { + self.disk.clone() + } + + /// Start the disk monitoring if health_check is enabled + pub fn start_monitoring(&self) { + if self.health_check { + let health = Arc::clone(&self.health); + let cancel_token = self.cancel_token.clone(); + let disk = Arc::clone(&self.disk); + + tokio::spawn(async move { + Self::monitor_disk_writable(disk, health, cancel_token).await; + }); + } + } + + /// Stop the disk monitoring + pub async fn stop_monitoring(&self) { + self.cancel_token.cancel(); + } + + /// Monitor disk writability periodically + async fn monitor_disk_writable(disk: Arc, health: Arc, cancel_token: CancellationToken) { + // TODO: config interval + + let mut interval = time::interval(CHECK_EVERY); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + return; + } + _ = interval.tick() => { + if cancel_token.is_cancelled() { + return; + } + + if health.status.load(Ordering::Relaxed) != DISK_HEALTH_OK { + continue; + } + + let last_success_nanos = health.last_success.load(Ordering::Relaxed); + let elapsed = Duration::from_nanos( + (std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64 - last_success_nanos) as u64 + ); + + if elapsed < SKIP_IF_SUCCESS_BEFORE { + continue; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + + + debug!("health check: performing health check"); + if Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, true, CHECK_TIMEOUT_DURATION).await.is_err() && health.swap_ok_to_faulty() { + // Health check failed, disk is considered faulty + + health.increment_waiting(); // Balance the increment from failed operation + + let health_clone = Arc::clone(&health); + let disk_clone = disk.clone(); + let cancel_clone = cancel_token.clone(); + + tokio::spawn(async move { + Self::monitor_disk_status(disk_clone, health_clone, cancel_clone).await; + }); + } + } + } + } + } + + /// Perform a health check by writing and reading a test file + async fn perform_health_check( + disk: Arc, + test_bucket: &str, + test_filename: &str, + test_data: &Bytes, + check_faulty_only: bool, + timeout_duration: Duration, + ) -> Result<()> { + // Perform health check with timeout + let health_check_result = tokio::time::timeout(timeout_duration, async { + // Try to write test data + disk.write_all(test_bucket, test_filename, test_data.clone()).await?; + + // Try to read back the data + let read_data = disk.read_all(test_bucket, test_filename).await?; + + // Verify data integrity + if read_data.len() != test_data.len() { + warn!( + "health check: test file data length mismatch: expected {} bytes, got {}", + test_data.len(), + read_data.len() + ); + if check_faulty_only { + return Ok(()); + } + return Err(DiskError::FaultyDisk); + } + + // Clean up + disk.delete( + test_bucket, + test_filename, + DeleteOptions { + recursive: false, + immediate: false, + undo_write: false, + old_data_dir: None, + }, + ) + .await?; + + Ok(()) + }) + .await; + + match health_check_result { + Ok(result) => match result { + Ok(()) => Ok(()), + Err(e) => { + debug!("health check: failed: {:?}", e); + + if e == DiskError::FaultyDisk { + return Err(e); + } + + if check_faulty_only { Ok(()) } else { Err(e) } + } + }, + Err(_) => { + // Timeout occurred + warn!("health check: timeout after {:?}", timeout_duration); + Err(DiskError::FaultyDisk) + } + } + } + + /// Monitor disk status and try to bring it back online + async fn monitor_disk_status(disk: Arc, health: Arc, cancel_token: CancellationToken) { + const CHECK_EVERY: Duration = Duration::from_secs(5); + + let mut interval = time::interval(CHECK_EVERY); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + return; + } + _ = interval.tick() => { + if cancel_token.is_cancelled() { + return; + } + + match Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, false, CHECK_TIMEOUT_DURATION).await { + Ok(_) => { + info!("Disk {} is back online", disk.to_string()); + health.set_ok(); + health.decrement_waiting(); + return; + } + Err(e) => { + warn!("Disk {} still faulty: {:?}", disk.to_string(), e); + } + } + } + } + } + } + + async fn check_id(&self, want_id: Option) -> Result<()> { + if want_id.is_none() { + return Ok(()); + } + + let stored_disk_id = self.disk.get_disk_id().await?; + + if stored_disk_id != want_id { + return Err(Error::other(format!("Disk ID mismatch wanted {:?}, got {:?}", want_id, stored_disk_id))); + } + + Ok(()) + } + + /// Check if disk ID is stale + async fn check_disk_stale(&self) -> Result<()> { + let Some(current_disk_id) = *self.disk_id.read().await else { + return Ok(()); + }; + + let stored_disk_id = match self.disk.get_disk_id().await? { + Some(id) => id, + None => return Ok(()), // Empty disk ID is allowed during initialization + }; + + if current_disk_id != stored_disk_id { + return Err(DiskError::DiskNotFound); + } + + Ok(()) + } + + /// Set the disk ID + pub async fn set_disk_id_internal(&self, id: Option) -> Result<()> { + let mut disk_id = self.disk_id.write().await; + *disk_id = id; + Ok(()) + } + + /// Get the current disk ID + pub async fn get_current_disk_id(&self) -> Option { + *self.disk_id.read().await + } + + /// Track disk health for an operation. + /// This method should wrap disk operations to ensure health checking. + pub async fn track_disk_health(&self, operation: F, timeout_duration: Duration) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + // Check if disk is faulty + if self.health.is_faulty() { + warn!("disk {} health is faulty, returning error", self.to_string()); + return Err(DiskError::FaultyDisk); + } + + // Check if disk is stale + self.check_disk_stale().await?; + + // Record operation start + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + self.health.last_started.store(now, Ordering::Relaxed); + self.health.increment_waiting(); + + if timeout_duration == Duration::ZERO { + let result = operation().await; + self.health.decrement_waiting(); + if result.is_ok() { + self.health.log_success(); + } + return result; + } + // Execute the operation with timeout + let result = tokio::time::timeout(timeout_duration, operation()).await; + + match result { + Ok(operation_result) => { + // Log success and decrement waiting counter + if operation_result.is_ok() { + self.health.log_success(); + } + self.health.decrement_waiting(); + operation_result + } + Err(_) => { + // Timeout occurred, mark disk as potentially faulty and decrement waiting counter + self.health.decrement_waiting(); + warn!("disk operation timeout after {:?}", timeout_duration); + Err(DiskError::other(format!("disk operation timeout after {:?}", timeout_duration))) + } + } + } +} + +#[async_trait::async_trait] +impl DiskAPI for LocalDiskWrapper { + fn to_string(&self) -> String { + self.disk.to_string() + } + + async fn is_online(&self) -> bool { + let Ok(Some(disk_id)) = self.disk.get_disk_id().await else { + return false; + }; + + let Some(current_disk_id) = *self.disk_id.read().await else { + return false; + }; + + current_disk_id == disk_id + } + + fn is_local(&self) -> bool { + self.disk.is_local() + } + + fn host_name(&self) -> String { + self.disk.host_name() + } + + fn endpoint(&self) -> Endpoint { + self.disk.endpoint() + } + + async fn close(&self) -> Result<()> { + self.stop_monitoring().await; + self.disk.close().await + } + + async fn get_disk_id(&self) -> Result> { + self.disk.get_disk_id().await + } + + async fn set_disk_id(&self, id: Option) -> Result<()> { + self.set_disk_id_internal(id).await + } + + fn path(&self) -> PathBuf { + self.disk.path() + } + + fn get_disk_location(&self) -> DiskLocation { + self.disk.get_disk_location() + } + + async fn disk_info(&self, opts: &DiskInfoOptions) -> Result { + if opts.noop && opts.metrics { + let mut info = DiskInfo::default(); + // Add health metrics + info.metrics.total_waiting = self.health.waiting_count(); + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + return Ok(info); + } + + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + + let result = self.disk.disk_info(opts).await?; + + if let Some(current_disk_id) = *self.disk_id.read().await + && Some(current_disk_id) != result.id + { + return Err(DiskError::DiskNotFound); + }; + + Ok(result) + } + + async fn make_volume(&self, volume: &str) -> Result<()> { + self.track_disk_health(|| async { self.disk.make_volume(volume).await }, get_max_timeout_duration()) + .await + } + + async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { + self.track_disk_health(|| async { self.disk.make_volumes(volumes).await }, get_max_timeout_duration()) + .await + } + + async fn list_volumes(&self) -> Result> { + self.track_disk_health(|| async { self.disk.list_volumes().await }, Duration::ZERO) + .await + } + + async fn stat_volume(&self, volume: &str) -> Result { + self.track_disk_health(|| async { self.disk.stat_volume(volume).await }, get_max_timeout_duration()) + .await + } + + async fn delete_volume(&self, volume: &str) -> Result<()> { + self.track_disk_health(|| async { self.disk.delete_volume(volume).await }, Duration::ZERO) + .await + } + + async fn walk_dir(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> { + self.track_disk_health(|| async { self.disk.walk_dir(opts, wr).await }, Duration::ZERO) + .await + } + + async fn delete_version( + &self, + volume: &str, + path: &str, + fi: FileInfo, + force_del_marker: bool, + opts: DeleteOptions, + ) -> Result<()> { + self.track_disk_health( + || async { self.disk.delete_version(volume, path, fi, force_del_marker, opts).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn delete_versions(&self, volume: &str, versions: Vec, opts: DeleteOptions) -> Vec> { + // Check if disk is faulty before proceeding + if self.health.is_faulty() { + return vec![Some(DiskError::FaultyDisk); versions.len()]; + } + + // Check if disk is stale + if let Err(e) = self.check_disk_stale().await { + return vec![Some(e); versions.len()]; + } + + // Record operation start + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + self.health.last_started.store(now, Ordering::Relaxed); + self.health.increment_waiting(); + + // Execute the operation + let result = self.disk.delete_versions(volume, versions, opts).await; + + self.health.decrement_waiting(); + let has_err = result.iter().any(|e| e.is_some()); + if !has_err { + // Log success and decrement waiting counter + self.health.log_success(); + } + + result + } + + async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> { + self.track_disk_health(|| async { self.disk.delete_paths(volume, paths).await }, get_max_timeout_duration()) + .await + } + + async fn write_metadata(&self, org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { + self.track_disk_health( + || async { self.disk.write_metadata(org_volume, volume, path, fi).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()> { + self.track_disk_health( + || async { self.disk.update_metadata(volume, path, fi, opts).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn read_version( + &self, + org_volume: &str, + volume: &str, + path: &str, + version_id: &str, + opts: &ReadOptions, + ) -> Result { + self.track_disk_health( + || async { self.disk.read_version(org_volume, volume, path, version_id, opts).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { + self.track_disk_health(|| async { self.disk.read_xl(volume, path, read_data).await }, get_max_timeout_duration()) + .await + } + + async fn rename_data( + &self, + src_volume: &str, + src_path: &str, + fi: FileInfo, + dst_volume: &str, + dst_path: &str, + ) -> Result { + self.track_disk_health( + || async { self.disk.rename_data(src_volume, src_path, fi, dst_volume, dst_path).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result> { + self.track_disk_health( + || async { self.disk.list_dir(origvolume, volume, dir_path, count).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn read_file(&self, volume: &str, path: &str) -> Result { + self.track_disk_health(|| async { self.disk.read_file(volume, path).await }, get_max_timeout_duration()) + .await + } + + async fn read_file_stream(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { + self.track_disk_health( + || async { self.disk.read_file_stream(volume, path, offset, length).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn append_file(&self, volume: &str, path: &str) -> Result { + self.track_disk_health(|| async { self.disk.append_file(volume, path).await }, Duration::ZERO) + .await + } + + async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: i64) -> Result { + self.track_disk_health( + || async { self.disk.create_file(origvolume, volume, path, file_size).await }, + Duration::ZERO, + ) + .await + } + + async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { + self.track_disk_health( + || async { self.disk.rename_file(src_volume, src_path, dst_volume, dst_path).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Bytes) -> Result<()> { + self.track_disk_health( + || async { self.disk.rename_part(src_volume, src_path, dst_volume, dst_path, meta).await }, + get_max_timeout_duration(), + ) + .await + } + + async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { + self.track_disk_health(|| async { self.disk.delete(volume, path, opt).await }, get_max_timeout_duration()) + .await + } + + async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { + self.track_disk_health(|| async { self.disk.verify_file(volume, path, fi).await }, Duration::ZERO) + .await + } + + async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { + self.track_disk_health(|| async { self.disk.check_parts(volume, path, fi).await }, Duration::ZERO) + .await + } + + async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result> { + self.track_disk_health(|| async { self.disk.read_parts(bucket, paths).await }, Duration::ZERO) + .await + } + + async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { + self.track_disk_health(|| async { self.disk.read_multiple(req).await }, Duration::ZERO) + .await + } + + async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> { + self.track_disk_health(|| async { self.disk.write_all(volume, path, data).await }, get_max_timeout_duration()) + .await + } + + async fn read_all(&self, volume: &str, path: &str) -> Result { + self.track_disk_health(|| async { self.disk.read_all(volume, path).await }, get_max_timeout_duration()) + .await + } +} diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 5ed851e6..42444f7e 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -69,7 +69,7 @@ use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; use uuid::Uuid; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FormatInfo { pub id: Option, pub data: Bytes, @@ -77,16 +77,6 @@ pub struct FormatInfo { pub last_check: Option, } -impl FormatInfo { - pub fn last_check_valid(&self) -> bool { - let now = OffsetDateTime::now_utc(); - self.file_info.is_some() - && self.id.is_some() - && self.last_check.is_some() - && (now.unix_timestamp() - self.last_check.unwrap().unix_timestamp() <= 1) - } -} - /// A helper enum to handle internal buffer types for writing data. pub enum InternalBuf<'a> { Ref(&'a [u8]), @@ -185,7 +175,7 @@ impl LocalDisk { }; let root_clone = root.clone(); let update_fn: UpdateFn = Box::new(move || { - let disk_id = id.map_or("".to_string(), |id| id.to_string()); + let disk_id = id; let root = root_clone.clone(); Box::pin(async move { match get_disk_info(root.clone()).await { @@ -200,7 +190,7 @@ impl LocalDisk { minor: info.minor, fs_type: info.fstype, root_disk: root, - id: disk_id.to_string(), + id: disk_id, ..Default::default() }; // if root { @@ -1295,7 +1285,7 @@ impl DiskAPI for LocalDisk { } #[tracing::instrument(skip(self))] async fn is_online(&self) -> bool { - self.check_format_json().await.is_ok() + true } #[tracing::instrument(skip(self))] @@ -1342,24 +1332,40 @@ impl DiskAPI for LocalDisk { #[tracing::instrument(level = "debug", skip(self))] async fn get_disk_id(&self) -> Result> { - let mut format_info = self.format_info.write().await; + let format_info = { + let format_info = self.format_info.read().await; + format_info.clone() + }; let id = format_info.id; - if format_info.last_check_valid() { - return Ok(id); + // if format_info.last_check_valid() { + // return Ok(id); + // } + + if format_info.file_info.is_some() && id.is_some() { + // check last check time + if let Some(last_check) = format_info.last_check { + if last_check.unix_timestamp() + 1 < OffsetDateTime::now_utc().unix_timestamp() { + return Ok(id); + } + } } let file_meta = self.check_format_json().await?; if let Some(file_info) = &format_info.file_info { if super::fs::same_file(&file_meta, file_info) { + let mut format_info = self.format_info.write().await; format_info.last_check = Some(OffsetDateTime::now_utc()); + drop(format_info); return Ok(id); } } + debug!("get_disk_id: read format.json"); + let b = fs::read(&self.format_path).await.map_err(to_unformatted_disk_error)?; let fm = FormatV3::try_from(b.as_slice()).map_err(|e| { @@ -1375,20 +1381,19 @@ impl DiskAPI for LocalDisk { return Err(DiskError::InconsistentDisk); } + let mut format_info = self.format_info.write().await; format_info.id = Some(disk_id); format_info.file_info = Some(file_meta); format_info.data = b.into(); format_info.last_check = Some(OffsetDateTime::now_utc()); + drop(format_info); Ok(Some(disk_id)) } #[tracing::instrument(skip(self))] - async fn set_disk_id(&self, id: Option) -> Result<()> { + async fn set_disk_id(&self, _id: Option) -> Result<()> { // No setup is required locally - // TODO: add check_id_store - let mut format_info = self.format_info.write().await; - format_info.id = id; Ok(()) } @@ -2438,6 +2443,10 @@ impl DiskAPI for LocalDisk { info.endpoint = self.endpoint.to_string(); info.scanning = self.scanning.load(Ordering::SeqCst) == 1; + if info.id.is_none() { + info.id = self.get_disk_id().await.unwrap_or(None); + } + Ok(info) } } @@ -2705,39 +2714,6 @@ mod test { } } - #[tokio::test] - async fn test_format_info_last_check_valid() { - let now = OffsetDateTime::now_utc(); - - // Valid format info - let valid_format_info = FormatInfo { - id: Some(Uuid::new_v4()), - data: vec![1, 2, 3].into(), - file_info: Some(fs::metadata("../../../..").await.unwrap()), - last_check: Some(now), - }; - assert!(valid_format_info.last_check_valid()); - - // Invalid format info (missing id) - let invalid_format_info = FormatInfo { - id: None, - data: vec![1, 2, 3].into(), - file_info: Some(fs::metadata("../../../..").await.unwrap()), - last_check: Some(now), - }; - assert!(!invalid_format_info.last_check_valid()); - - // Invalid format info (old timestamp) - let old_time = OffsetDateTime::now_utc() - time::Duration::seconds(10); - let old_format_info = FormatInfo { - id: Some(Uuid::new_v4()), - data: vec![1, 2, 3].into(), - file_info: Some(fs::metadata("../../../..").await.unwrap()), - last_check: Some(old_time), - }; - assert!(!old_format_info.last_check_valid()); - } - #[tokio::test] async fn test_read_file_exists() { let test_file = "./test_read_exists.txt"; diff --git a/crates/ecstore/src/disk/mod.rs b/crates/ecstore/src/disk/mod.rs index 5f419380..fdba21c9 100644 --- a/crates/ecstore/src/disk/mod.rs +++ b/crates/ecstore/src/disk/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod disk_store; pub mod endpoint; pub mod error; pub mod error_conv; @@ -30,6 +31,7 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json"; pub const STORAGE_FORMAT_FILE: &str = "xl.meta"; pub const STORAGE_FORMAT_FILE_BACKUP: &str = "xl.meta.bkp"; +use crate::disk::disk_store::LocalDiskWrapper; use crate::rpc::RemoteDisk; use bytes::Bytes; use endpoint::Endpoint; @@ -51,7 +53,7 @@ pub type FileWriter = Box; #[derive(Debug)] pub enum Disk { - Local(Box), + Local(Box), Remote(Box), } @@ -398,7 +400,7 @@ impl DiskAPI for Disk { pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { if ep.is_local { let s = LocalDisk::new(ep, opt.cleanup).await?; - Ok(Arc::new(Disk::Local(Box::new(s)))) + Ok(Arc::new(Disk::Local(Box::new(LocalDiskWrapper::new(Arc::new(s), opt.health_check))))) } else { let remote_disk = RemoteDisk::new(ep, opt).await?; Ok(Arc::new(Disk::Remote(Box::new(remote_disk)))) @@ -534,7 +536,7 @@ pub struct DiskInfo { pub scanning: bool, pub endpoint: String, pub mount_path: String, - pub id: String, + pub id: Option, pub rotational: bool, pub metrics: DiskMetrics, pub error: String, @@ -1015,7 +1017,7 @@ mod tests { let endpoint = Endpoint::try_from(test_dir).unwrap(); let local_disk = LocalDisk::new(&endpoint, false).await.unwrap(); - let disk = Disk::Local(Box::new(local_disk)); + let disk = Disk::Local(Box::new(LocalDiskWrapper::new(Arc::new(local_disk), false))); // Test basic methods assert!(disk.is_local()); diff --git a/crates/ecstore/src/rpc/peer_s3_client.rs b/crates/ecstore/src/rpc/peer_s3_client.rs index ac0a035c..fe251a3e 100644 --- a/crates/ecstore/src/rpc/peer_s3_client.rs +++ b/crates/ecstore/src/rpc/peer_s3_client.rs @@ -13,14 +13,18 @@ // limitations under the License. use crate::bucket::metadata_sys; +use crate::disk::error::DiskError; use crate::disk::error::{Error, Result}; use crate::disk::error_reduce::{BUCKET_OP_IGNORED_ERRS, is_all_buckets_not_found, reduce_write_quorum_errs}; -use crate::disk::{DiskAPI, DiskStore}; +use crate::disk::{DiskAPI, DiskStore, disk_store::get_max_timeout_duration}; use crate::global::GLOBAL_LOCAL_DISK_MAP; use crate::store::all_local_disk; use crate::store_utils::is_reserved_or_invalid_bucket; use crate::{ - disk::{self, VolumeInfo}, + disk::{ + self, VolumeInfo, + disk_store::{CHECK_EVERY, CHECK_TIMEOUT_DURATION, DiskHealthTracker}, + }, endpoints::{EndpointServerPools, Node}, store_api::{BucketInfo, BucketOptions, DeleteBucketOptions, MakeBucketOptions}, }; @@ -32,10 +36,11 @@ use rustfs_protos::node_service_time_out_client; use rustfs_protos::proto_gen::node_service::{ DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest, }; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; -use tokio::sync::RwLock; +use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; +use tokio::{net::TcpStream, sync::RwLock, time}; +use tokio_util::sync::CancellationToken; use tonic::Request; -use tracing::info; +use tracing::{debug, info, warn}; type Client = Arc>; @@ -559,16 +564,160 @@ pub struct RemotePeerS3Client { pub node: Option, pub pools: Option>, addr: String, + /// Health tracker for connection monitoring + health: Arc, + /// Cancellation token for monitoring tasks + cancel_token: CancellationToken, } impl RemotePeerS3Client { pub fn new(node: Option, pools: Option>) -> Self { let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string(); - Self { node, pools, addr } + let client = Self { + node, + pools, + addr, + health: Arc::new(DiskHealthTracker::new()), + cancel_token: CancellationToken::new(), + }; + + // Start health monitoring + client.start_health_monitoring(); + + client } + pub fn get_addr(&self) -> String { self.addr.clone() } + + /// Start health monitoring for the remote peer + fn start_health_monitoring(&self) { + let health = Arc::clone(&self.health); + let cancel_token = self.cancel_token.clone(); + let addr = self.addr.clone(); + + tokio::spawn(async move { + Self::monitor_remote_peer_health(addr, health, cancel_token).await; + }); + } + + /// Monitor remote peer health periodically + async fn monitor_remote_peer_health(addr: String, health: Arc, cancel_token: CancellationToken) { + let mut interval = time::interval(CHECK_EVERY); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Health monitoring cancelled for remote peer: {}", addr); + return; + } + _ = interval.tick() => { + if cancel_token.is_cancelled() { + return; + } + + // Skip health check if peer is already marked as faulty + if health.is_faulty() { + continue; + } + + // Perform basic connectivity check + if Self::perform_connectivity_check(&addr).await.is_err() && health.swap_ok_to_faulty() { + warn!("Remote peer health check failed for {}: marking as faulty", addr); + + // Start recovery monitoring + let health_clone = Arc::clone(&health); + let addr_clone = addr.clone(); + let cancel_clone = cancel_token.clone(); + + tokio::spawn(async move { + Self::monitor_remote_peer_recovery(addr_clone, health_clone, cancel_clone).await; + }); + } + } + } + } + } + + /// Monitor remote peer recovery and mark as healthy when recovered + async fn monitor_remote_peer_recovery(addr: String, health: Arc, cancel_token: CancellationToken) { + let mut interval = time::interval(Duration::from_secs(5)); // Check every 5 seconds + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + return; + } + _ = interval.tick() => { + if Self::perform_connectivity_check(&addr).await.is_ok() { + info!("Remote peer recovered: {}", addr); + health.set_ok(); + return; + } + } + } + } + } + + /// Perform basic connectivity check for remote peer + async fn perform_connectivity_check(addr: &str) -> Result<()> { + use tokio::time::timeout; + + let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?; + + let Some(host) = url.host_str() else { + return Err(Error::other("No host in URL".to_string())); + }; + + let port = url.port_or_known_default().unwrap_or(80); + + // Try to establish TCP connection + match timeout(CHECK_TIMEOUT_DURATION, TcpStream::connect((host, port))).await { + Ok(Ok(_)) => Ok(()), + _ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))), + } + } + + /// Execute operation with timeout and health tracking + async fn execute_with_timeout(&self, operation: F, timeout_duration: Duration) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + // Check if peer is faulty + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + + // Record operation start + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + self.health.last_started.store(now, std::sync::atomic::Ordering::Relaxed); + self.health.increment_waiting(); + + // Execute operation with timeout + let result = time::timeout(timeout_duration, operation()).await; + + match result { + Ok(operation_result) => { + // Log success and decrement waiting counter + if operation_result.is_ok() { + self.health.log_success(); + } + self.health.decrement_waiting(); + operation_result + } + Err(_) => { + // Timeout occurred, mark peer as potentially faulty + self.health.decrement_waiting(); + warn!("Remote peer operation timeout after {:?}", timeout_duration); + Err(Error::other(format!("Remote peer operation timeout after {:?}", timeout_duration))) + } + } + } } #[async_trait] @@ -578,115 +727,145 @@ impl PeerS3Client for RemotePeerS3Client { } async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result { - let options: String = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(HealBucketRequest { - bucket: bucket.to_string(), - options, - }); - let response = client.heal_bucket(request).await?.into_inner(); - if !response.success { - return if let Some(err) = response.error { - Err(err.into()) - } else { - Err(Error::other("")) - }; - } + self.execute_with_timeout( + || async { + let options: String = serde_json::to_string(opts)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(HealBucketRequest { + bucket: bucket.to_string(), + options, + }); + let response = client.heal_bucket(request).await?.into_inner(); + if !response.success { + return if let Some(err) = response.error { + Err(err.into()) + } else { + Err(Error::other("")) + }; + } - Ok(HealResultItem { - heal_item_type: HealItemType::Bucket.to_string(), - bucket: bucket.to_string(), - set_count: 0, - ..Default::default() - }) + Ok(HealResultItem { + heal_item_type: HealItemType::Bucket.to_string(), + bucket: bucket.to_string(), + set_count: 0, + ..Default::default() + }) + }, + get_max_timeout_duration(), + ) + .await } async fn list_bucket(&self, opts: &BucketOptions) -> Result> { - let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ListBucketRequest { options }); - let response = client.list_bucket(request).await?.into_inner(); - if !response.success { - return if let Some(err) = response.error { - Err(err.into()) - } else { - Err(Error::other("")) - }; - } - let bucket_infos = response - .bucket_infos - .into_iter() - .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) - .collect(); + self.execute_with_timeout( + || async { + let options = serde_json::to_string(opts)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ListBucketRequest { options }); + let response = client.list_bucket(request).await?.into_inner(); + if !response.success { + return if let Some(err) = response.error { + Err(err.into()) + } else { + Err(Error::other("")) + }; + } + let bucket_infos = response + .bucket_infos + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); - Ok(bucket_infos) + Ok(bucket_infos) + }, + get_max_timeout_duration(), + ) + .await } async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { - let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(MakeBucketRequest { - name: bucket.to_string(), - options, - }); - let response = client.make_bucket(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let options = serde_json::to_string(opts)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(MakeBucketRequest { + name: bucket.to_string(), + options, + }); + let response = client.make_bucket(request).await?.into_inner(); - // TODO: deal with error - if !response.success { - return if let Some(err) = response.error { - Err(err.into()) - } else { - Err(Error::other("")) - }; - } + // TODO: deal with error + if !response.success { + return if let Some(err) = response.error { + Err(err.into()) + } else { + Err(Error::other("")) + }; + } - Ok(()) + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result { - let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(GetBucketInfoRequest { - bucket: bucket.to_string(), - options, - }); - let response = client.get_bucket_info(request).await?.into_inner(); - if !response.success { - return if let Some(err) = response.error { - Err(err.into()) - } else { - Err(Error::other("")) - }; - } - let bucket_info = serde_json::from_str::(&response.bucket_info)?; + self.execute_with_timeout( + || async { + let options = serde_json::to_string(opts)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(GetBucketInfoRequest { + bucket: bucket.to_string(), + options, + }); + let response = client.get_bucket_info(request).await?.into_inner(); + if !response.success { + return if let Some(err) = response.error { + Err(err.into()) + } else { + Err(Error::other("")) + }; + } + let bucket_info = serde_json::from_str::(&response.bucket_info)?; - Ok(bucket_info) + Ok(bucket_info) + }, + get_max_timeout_duration(), + ) + .await } async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(DeleteBucketRequest { - bucket: bucket.to_string(), - }); - let response = client.delete_bucket(request).await?.into_inner(); - if !response.success { - return if let Some(err) = response.error { - Err(err.into()) - } else { - Err(Error::other("")) - }; - } + let request = Request::new(DeleteBucketRequest { + bucket: bucket.to_string(), + }); + let response = client.delete_bucket(request).await?.into_inner(); + if !response.success { + return if let Some(err) = response.error { + Err(err.into()) + } else { + Err(Error::other("")) + }; + } - Ok(()) + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } } diff --git a/crates/ecstore/src/rpc/remote_disk.rs b/crates/ecstore/src/rpc/remote_disk.rs index 5e024f0b..175ad3bf 100644 --- a/crates/ecstore/src/rpc/remote_disk.rs +++ b/crates/ecstore/src/rpc/remote_disk.rs @@ -12,7 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{path::PathBuf, time::Duration}; +use std::{ + path::PathBuf, + sync::{Arc, atomic::Ordering}, + time::Duration, +}; use bytes::Bytes; use futures::lock::Mutex; @@ -26,13 +30,21 @@ use rustfs_protos::{ StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, }, }; +use rustfs_utils::string::parse_bool_with_default; +use tokio::time; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; use crate::disk::{ CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, + disk_store::{ + CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE, get_max_timeout_duration, + }, endpoint::Endpoint, }; use crate::disk::{FileReader, FileWriter}; +use crate::disk::{disk_store::DiskHealthTracker, error::DiskError}; use crate::{ disk::error::{Error, Result}, rpc::build_auth_headers, @@ -42,7 +54,6 @@ use rustfs_protos::proto_gen::node_service::RenamePartRequest; use rustfs_rio::{HttpReader, HttpWriter}; use tokio::{io::AsyncWrite, net::TcpStream, time::timeout}; use tonic::Request; -use tracing::{debug, info}; use uuid::Uuid; #[derive(Debug)] @@ -52,12 +63,16 @@ pub struct RemoteDisk { pub url: url::Url, pub root: PathBuf, endpoint: Endpoint, + /// Whether health checking is enabled + health_check: bool, + /// Health tracker for connection monitoring + health: Arc, + /// Cancellation token for monitoring tasks + cancel_token: CancellationToken, } -const REMOTE_DISK_ONLINE_PROBE_TIMEOUT: Duration = Duration::from_millis(750); - impl RemoteDisk { - pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result { + pub async fn new(ep: &Endpoint, opt: &DiskOption) -> Result { // let root = fs::canonicalize(ep.url.path()).await?; let root = PathBuf::from(ep.get_file_path()); let addr = if let Some(port) = ep.url.port() { @@ -65,13 +80,184 @@ impl RemoteDisk { } else { format!("{}://{}", ep.url.scheme(), ep.url.host_str().unwrap()) }; - Ok(Self { + + let env_health_check = std::env::var(ENV_RUSTFS_DRIVE_ACTIVE_MONITORING) + .map(|v| parse_bool_with_default(&v, true)) + .unwrap_or(true); + + let disk = Self { id: Mutex::new(None), - addr, + addr: addr.clone(), url: ep.url.clone(), root, endpoint: ep.clone(), - }) + health_check: opt.health_check && env_health_check, + health: Arc::new(DiskHealthTracker::new()), + cancel_token: CancellationToken::new(), + }; + + // Start health monitoring + disk.start_health_monitoring(); + + Ok(disk) + } + + /// Start health monitoring for the remote disk + fn start_health_monitoring(&self) { + if self.health_check { + let health = Arc::clone(&self.health); + let cancel_token = self.cancel_token.clone(); + let addr = self.addr.clone(); + + tokio::spawn(async move { + Self::monitor_remote_disk_health(addr, health, cancel_token).await; + }); + } + } + + /// Monitor remote disk health periodically + async fn monitor_remote_disk_health(addr: String, health: Arc, cancel_token: CancellationToken) { + let mut interval = time::interval(CHECK_EVERY); + + // Perform basic connectivity check + if Self::perform_connectivity_check(&addr).await.is_err() && health.swap_ok_to_faulty() { + warn!("Remote disk health check failed for {}: marking as faulty", addr); + + // Start recovery monitoring + let health_clone = Arc::clone(&health); + let addr_clone = addr.clone(); + let cancel_clone = cancel_token.clone(); + + tokio::spawn(async move { + Self::monitor_remote_disk_recovery(addr_clone, health_clone, cancel_clone).await; + }); + } + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("Health monitoring cancelled for remote disk: {}", addr); + return; + } + _ = interval.tick() => { + if cancel_token.is_cancelled() { + return; + } + + // Skip health check if disk is already marked as faulty + if health.is_faulty() { + continue; + } + + let last_success_nanos = health.last_success.load(Ordering::Relaxed); + let elapsed = Duration::from_nanos( + (std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64 - last_success_nanos) as u64 + ); + + if elapsed < SKIP_IF_SUCCESS_BEFORE { + continue; + } + + // Perform basic connectivity check + if Self::perform_connectivity_check(&addr).await.is_err() && health.swap_ok_to_faulty() { + warn!("Remote disk health check failed for {}: marking as faulty", addr); + + // Start recovery monitoring + let health_clone = Arc::clone(&health); + let addr_clone = addr.clone(); + let cancel_clone = cancel_token.clone(); + + tokio::spawn(async move { + Self::monitor_remote_disk_recovery(addr_clone, health_clone, cancel_clone).await; + }); + } + } + } + } + } + + /// Monitor remote disk recovery and mark as healthy when recovered + async fn monitor_remote_disk_recovery(addr: String, health: Arc, cancel_token: CancellationToken) { + let mut interval = time::interval(CHECK_EVERY); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + return; + } + _ = interval.tick() => { + if Self::perform_connectivity_check(&addr).await.is_ok() { + info!("Remote disk recovered: {}", addr); + health.set_ok(); + return; + } + } + } + } + } + + /// Perform basic connectivity check for remote disk + async fn perform_connectivity_check(addr: &str) -> Result<()> { + let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?; + + let Some(host) = url.host_str() else { + return Err(Error::other("No host in URL".to_string())); + }; + + let port = url.port_or_known_default().unwrap_or(80); + + // Try to establish TCP connection + match timeout(CHECK_TIMEOUT_DURATION, TcpStream::connect((host, port))).await { + Ok(Ok(stream)) => { + drop(stream); + Ok(()) + } + _ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))), + } + } + + /// Execute operation with timeout and health tracking + async fn execute_with_timeout(&self, operation: F, timeout_duration: Duration) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + // Check if disk is faulty + if self.health.is_faulty() { + warn!("disk {} health is faulty, returning error", self.to_string()); + return Err(DiskError::FaultyDisk); + } + + // Record operation start + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + self.health.last_started.store(now, std::sync::atomic::Ordering::Relaxed); + self.health.increment_waiting(); + + // Execute operation with timeout + let result = time::timeout(timeout_duration, operation()).await; + + match result { + Ok(operation_result) => { + // Log success and decrement waiting counter + if operation_result.is_ok() { + self.health.log_success(); + } + self.health.decrement_waiting(); + operation_result + } + Err(_) => { + // Timeout occurred, mark disk as potentially faulty + self.health.decrement_waiting(); + warn!("Remote disk operation timeout after {:?}", timeout_duration); + Err(Error::other(format!("Remote disk operation timeout after {:?}", timeout_duration))) + } + } } } @@ -85,19 +271,8 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(skip(self))] async fn is_online(&self) -> bool { - let Some(host) = self.endpoint.url.host_str().map(|host| host.to_string()) else { - return false; - }; - - let port = self.endpoint.url.port_or_known_default().unwrap_or(80); - - match timeout(REMOTE_DISK_ONLINE_PROBE_TIMEOUT, TcpStream::connect((host, port))).await { - Ok(Ok(stream)) => { - drop(stream); - true - } - _ => false, - } + // If disk is marked as faulty, consider it offline + !self.health.is_faulty() } #[tracing::instrument(skip(self))] @@ -114,6 +289,7 @@ impl DiskAPI for RemoteDisk { } #[tracing::instrument(skip(self))] async fn close(&self) -> Result<()> { + self.cancel_token.cancel(); Ok(()) } #[tracing::instrument(skip(self))] @@ -164,108 +340,143 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(skip(self))] async fn make_volume(&self, volume: &str) -> Result<()> { info!("make_volume"); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(MakeVolumeRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - }); - let response = client.make_volume(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(MakeVolumeRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.make_volume(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { info!("make_volumes"); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(MakeVolumesRequest { - disk: self.endpoint.to_string(), - volumes: volumes.iter().map(|s| (*s).to_string()).collect(), - }); - let response = client.make_volumes(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(MakeVolumesRequest { + disk: self.endpoint.to_string(), + volumes: volumes.iter().map(|s| (*s).to_string()).collect(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.make_volumes(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn list_volumes(&self) -> Result> { info!("list_volumes"); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ListVolumesRequest { - disk: self.endpoint.to_string(), - }); - let response = client.list_volumes(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ListVolumesRequest { + disk: self.endpoint.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.list_volumes(request).await?.into_inner(); - let infos = response - .volume_infos - .into_iter() - .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) - .collect(); + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(infos) + let infos = response + .volume_infos + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + Ok(infos) + }, + Duration::ZERO, + ) + .await } #[tracing::instrument(skip(self))] async fn stat_volume(&self, volume: &str) -> Result { info!("stat_volume"); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(StatVolumeRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - }); - let response = client.stat_volume(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(StatVolumeRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.stat_volume(request).await?.into_inner(); - let volume_info = serde_json::from_str::(&response.volume_info)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(volume_info) + let volume_info = serde_json::from_str::(&response.volume_info)?; + + Ok(volume_info) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn delete_volume(&self, volume: &str) -> Result<()> { info!("delete_volume {}/{}", self.endpoint.to_string(), volume); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(DeleteVolumeRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - }); - let response = client.delete_volume(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(DeleteVolumeRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.delete_volume(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + Duration::ZERO, + ) + .await } // // FIXME: TODO: use writer @@ -328,36 +539,47 @@ impl DiskAPI for RemoteDisk { opts: DeleteOptions, ) -> Result<()> { info!("delete_version"); - let file_info = serde_json::to_string(&fi)?; - let opts = serde_json::to_string(&opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(DeleteVersionRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - file_info, - force_del_marker, - opts, - }); + self.execute_with_timeout( + || async { + let file_info = serde_json::to_string(&fi)?; + let opts = serde_json::to_string(&opts)?; - let response = client.delete_version(request).await?.into_inner(); + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(DeleteVersionRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info, + force_del_marker, + opts, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.delete_version(request).await?.into_inner(); - // let raw_file_info = serde_json::from_str::(&response.raw_file_info)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(()) + // let raw_file_info = serde_json::from_str::(&response.raw_file_info)?; + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn delete_versions(&self, volume: &str, versions: Vec, opts: DeleteOptions) -> Vec> { info!("delete_versions"); + if self.health.is_faulty() { + return vec![Some(DiskError::FaultyDisk); versions.len()]; + } + let opts = match serde_json::to_string(&opts) { Ok(opts) => opts, Err(err) => { @@ -401,12 +623,24 @@ impl DiskAPI for RemoteDisk { // TODO: use Error not string - let response = match client.delete_versions(request).await { + let result = self + .execute_with_timeout( + || async { + client + .delete_versions(request) + .await + .map_err(|err| Error::other(format!("delete_versions failed: {err}"))) + }, + get_max_timeout_duration(), + ) + .await; + + let response = match result { Ok(response) => response, Err(err) => { let mut errors = Vec::with_capacity(versions.len()); for _ in 0..versions.len() { - errors.push(Some(Error::other(err.to_string()))); + errors.push(Some(err.clone())); } return errors; } @@ -437,71 +671,91 @@ impl DiskAPI for RemoteDisk { async fn delete_paths(&self, volume: &str, paths: &[String]) -> Result<()> { info!("delete_paths"); let paths = paths.to_owned(); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(DeletePathsRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - paths, - }); - let response = client.delete_paths(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(DeletePathsRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + paths: paths.clone(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.delete_paths(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { info!("write_metadata {}/{}", volume, path); let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(WriteMetadataRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - file_info, - }); - let response = client.write_metadata(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(WriteMetadataRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info: file_info.clone(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.write_metadata(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()> { info!("update_metadata"); let file_info = serde_json::to_string(&fi)?; - let opts = serde_json::to_string(&opts)?; + let opts_str = serde_json::to_string(&opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(UpdateMetadataRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - file_info, - opts, - }); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(UpdateMetadataRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info: file_info.clone(), + opts: opts_str.clone(), + }); - let response = client.update_metadata(request).await?.into_inner(); + let response = client.update_metadata(request).await?.into_inner(); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(()) + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] @@ -514,51 +768,65 @@ impl DiskAPI for RemoteDisk { opts: &ReadOptions, ) -> Result { info!("read_version"); - let opts = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ReadVersionRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - version_id: version_id.to_string(), - opts, - }); + let opts_str = serde_json::to_string(opts)?; - let response = client.read_version(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ReadVersionRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + version_id: version_id.to_string(), + opts: opts_str.clone(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.read_version(request).await?.into_inner(); - let file_info = serde_json::from_str::(&response.file_info)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(file_info) + let file_info = serde_json::from_str::(&response.file_info)?; + + Ok(file_info) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(level = "debug", skip(self))] async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { info!("read_xl {}/{}/{}", self.endpoint.to_string(), volume, path); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ReadXlRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - read_data, - }); - let response = client.read_xl(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ReadXlRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + read_data, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.read_xl(request).await?.into_inner(); - let raw_file_info = serde_json::from_str::(&response.raw_file_info)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(raw_file_info) + let raw_file_info = serde_json::from_str::(&response.raw_file_info)?; + + Ok(raw_file_info) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] @@ -571,33 +839,45 @@ impl DiskAPI for RemoteDisk { dst_path: &str, ) -> Result { info!("rename_data {}/{}/{}/{}", self.addr, self.endpoint.to_string(), dst_volume, dst_path); - let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(RenameDataRequest { - disk: self.endpoint.to_string(), - src_volume: src_volume.to_string(), - src_path: src_path.to_string(), - file_info, - dst_volume: dst_volume.to_string(), - dst_path: dst_path.to_string(), - }); - let response = client.rename_data(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let file_info = serde_json::to_string(&fi)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(RenameDataRequest { + disk: self.endpoint.to_string(), + src_volume: src_volume.to_string(), + src_path: src_path.to_string(), + file_info, + dst_volume: dst_volume.to_string(), + dst_path: dst_path.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.rename_data(request).await?.into_inner(); - let rename_data_resp = serde_json::from_str::(&response.rename_data_resp)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(rename_data_resp) + let rename_data_resp = serde_json::from_str::(&response.rename_data_resp)?; + + Ok(rename_data_resp) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn list_dir(&self, _origvolume: &str, volume: &str, dir_path: &str, count: i32) -> Result> { debug!("list_dir {}/{}", volume, dir_path); + + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let mut client = node_service_time_out_client(&self.addr) .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; @@ -621,6 +901,10 @@ impl DiskAPI for RemoteDisk { async fn walk_dir(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> { info!("walk_dir {}", self.endpoint.to_string()); + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let url = format!( "{}/rustfs/rpc/walk_dir?disk={}", self.endpoint.grid_host(), @@ -644,6 +928,10 @@ impl DiskAPI for RemoteDisk { async fn read_file(&self, volume: &str, path: &str) -> Result { info!("read_file {}/{}", volume, path); + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let url = format!( "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", self.endpoint.grid_host(), @@ -670,6 +958,11 @@ impl DiskAPI for RemoteDisk { // offset, // length // ); + + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let url = format!( "{}/rustfs/rpc/read_file_stream?disk={}&volume={}&path={}&offset={}&length={}", self.endpoint.grid_host(), @@ -690,6 +983,10 @@ impl DiskAPI for RemoteDisk { async fn append_file(&self, volume: &str, path: &str) -> Result { info!("append_file {}/{}", volume, path); + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let url = format!( "{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}", self.endpoint.grid_host(), @@ -716,6 +1013,10 @@ impl DiskAPI for RemoteDisk { // file_size // ); + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let url = format!( "{}/rustfs/rpc/put_file_stream?disk={}&volume={}&path={}&append={}&size={}", self.endpoint.grid_host(), @@ -735,216 +1036,282 @@ impl DiskAPI for RemoteDisk { #[tracing::instrument(level = "debug", skip(self))] async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { info!("rename_file"); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(RenameFileRequest { - disk: self.endpoint.to_string(), - src_volume: src_volume.to_string(), - src_path: src_path.to_string(), - dst_volume: dst_volume.to_string(), - dst_path: dst_path.to_string(), - }); - let response = client.rename_file(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(RenameFileRequest { + disk: self.endpoint.to_string(), + src_volume: src_volume.to_string(), + src_path: src_path.to_string(), + dst_volume: dst_volume.to_string(), + dst_path: dst_path.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.rename_file(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Bytes) -> Result<()> { info!("rename_part {}/{}", src_volume, src_path); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(RenamePartRequest { - disk: self.endpoint.to_string(), - src_volume: src_volume.to_string(), - src_path: src_path.to_string(), - dst_volume: dst_volume.to_string(), - dst_path: dst_path.to_string(), - meta, - }); - let response = client.rename_part(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(RenamePartRequest { + disk: self.endpoint.to_string(), + src_volume: src_volume.to_string(), + src_path: src_path.to_string(), + dst_volume: dst_volume.to_string(), + dst_path: dst_path.to_string(), + meta, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.rename_part(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { info!("delete {}/{}/{}", self.endpoint.to_string(), volume, path); - let options = serde_json::to_string(&opt)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(DeleteRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - options, - }); - let response = client.delete(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let options = serde_json::to_string(&opt)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(DeleteRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + options, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.delete(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { info!("verify_file"); - let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(VerifyFileRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - file_info, - }); - let response = client.verify_file(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let file_info = serde_json::to_string(&fi)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(VerifyFileRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.verify_file(request).await?.into_inner(); - let check_parts_resp = serde_json::from_str::(&response.check_parts_resp)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(check_parts_resp) + let check_parts_resp = serde_json::from_str::(&response.check_parts_resp)?; + + Ok(check_parts_resp) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result> { - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ReadPartsRequest { - disk: self.endpoint.to_string(), - bucket: bucket.to_string(), - paths: paths.to_vec(), - }); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ReadPartsRequest { + disk: self.endpoint.to_string(), + bucket: bucket.to_string(), + paths: paths.to_vec(), + }); - let response = client.read_parts(request).await?.into_inner(); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.read_parts(request).await?.into_inner(); + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - let read_parts_resp = rmp_serde::from_slice::>(&response.object_part_infos)?; + let read_parts_resp = rmp_serde::from_slice::>(&response.object_part_infos)?; - Ok(read_parts_resp) + Ok(read_parts_resp) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result { info!("check_parts"); - let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(CheckPartsRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - file_info, - }); - let response = client.check_parts(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let file_info = serde_json::to_string(&fi)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(CheckPartsRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.check_parts(request).await?.into_inner(); - let check_parts_resp = serde_json::from_str::(&response.check_parts_resp)?; + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(check_parts_resp) + let check_parts_resp = serde_json::from_str::(&response.check_parts_resp)?; + + Ok(check_parts_resp) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { info!("read_multiple {}/{}/{}", self.endpoint.to_string(), req.bucket, req.prefix); - let read_multiple_req = serde_json::to_string(&req)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ReadMultipleRequest { - disk: self.endpoint.to_string(), - read_multiple_req, - }); - let response = client.read_multiple(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let read_multiple_req = serde_json::to_string(&req)?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ReadMultipleRequest { + disk: self.endpoint.to_string(), + read_multiple_req, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.read_multiple(request).await?.into_inner(); - let read_multiple_resps = response - .read_multiple_resps - .into_iter() - .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) - .collect(); + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } - Ok(read_multiple_resps) + let read_multiple_resps = response + .read_multiple_resps + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + Ok(read_multiple_resps) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()> { info!("write_all"); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(WriteAllRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - data, - }); - let response = client.write_all(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(WriteAllRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + data, + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.write_all(request).await?.into_inner(); - Ok(()) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(()) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn read_all(&self, volume: &str, path: &str) -> Result { info!("read_all {}/{}", volume, path); - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - let request = Request::new(ReadAllRequest { - disk: self.endpoint.to_string(), - volume: volume.to_string(), - path: path.to_string(), - }); - let response = client.read_all(request).await?.into_inner(); + self.execute_with_timeout( + || async { + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let request = Request::new(ReadAllRequest { + disk: self.endpoint.to_string(), + volume: volume.to_string(), + path: path.to_string(), + }); - if !response.success { - return Err(response.error.unwrap_or_default().into()); - } + let response = client.read_all(request).await?.into_inner(); - Ok(response.data) + if !response.success { + return Err(response.error.unwrap_or_default().into()); + } + + Ok(response.data) + }, + get_max_timeout_duration(), + ) + .await } #[tracing::instrument(skip(self))] async fn disk_info(&self, opts: &DiskInfoOptions) -> Result { + if self.health.is_faulty() { + return Err(DiskError::FaultyDisk); + } + let opts = serde_json::to_string(&opts)?; let mut client = node_service_time_out_client(&self.addr) .await @@ -969,9 +1336,24 @@ impl DiskAPI for RemoteDisk { #[cfg(test)] mod tests { use super::*; + use std::sync::Once; use tokio::net::TcpListener; + use tracing::Level; use uuid::Uuid; + static INIT: Once = Once::new(); + + fn init_tracing(filter_level: Level) { + INIT.call_once(|| { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_max_level(filter_level) + .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()) + .with_thread_names(true) + .try_init(); + }); + } + #[tokio::test] async fn test_remote_disk_creation() { let url = url::Url::parse("http://example.com:9000/path").unwrap(); @@ -1080,6 +1462,8 @@ mod tests { #[tokio::test] async fn test_remote_disk_is_online_detects_missing_listener() { + init_tracing(Level::ERROR); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let ip = addr.ip(); @@ -1098,10 +1482,14 @@ mod tests { let disk_option = DiskOption { cleanup: false, - health_check: false, + health_check: true, }; let remote_disk = RemoteDisk::new(&endpoint, &disk_option).await.unwrap(); + + // wait for health check connect timeout + tokio::time::sleep(Duration::from_secs(6)).await; + assert!(!remote_disk.is_online().await); } diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 054934e6..d823f784 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -174,56 +174,56 @@ impl SetDisks { }) } - async fn cached_disk_health(&self, index: usize) -> Option { - let cache = self.disk_health_cache.read().await; - cache - .get(index) - .and_then(|entry| entry.as_ref().and_then(|state| state.cached_value())) - } + // async fn cached_disk_health(&self, index: usize) -> Option { + // let cache = self.disk_health_cache.read().await; + // cache + // .get(index) + // .and_then(|entry| entry.as_ref().and_then(|state| state.cached_value())) + // } - async fn update_disk_health(&self, index: usize, online: bool) { - let mut cache = self.disk_health_cache.write().await; - if cache.len() <= index { - cache.resize(index + 1, None); - } - cache[index] = Some(DiskHealthEntry { - last_check: Instant::now(), - online, - }); - } + // async fn update_disk_health(&self, index: usize, online: bool) { + // let mut cache = self.disk_health_cache.write().await; + // if cache.len() <= index { + // cache.resize(index + 1, None); + // } + // cache[index] = Some(DiskHealthEntry { + // last_check: Instant::now(), + // online, + // }); + // } - async fn is_disk_online_cached(&self, index: usize, disk: &DiskStore) -> bool { - if let Some(online) = self.cached_disk_health(index).await { - return online; - } + // async fn is_disk_online_cached(&self, index: usize, disk: &DiskStore) -> bool { + // if let Some(online) = self.cached_disk_health(index).await { + // return online; + // } - let disk_clone = disk.clone(); - let online = timeout(DISK_ONLINE_TIMEOUT, async move { disk_clone.is_online().await }) - .await - .unwrap_or(false); - self.update_disk_health(index, online).await; - online - } + // let disk_clone = disk.clone(); + // let online = timeout(DISK_ONLINE_TIMEOUT, async move { disk_clone.is_online().await }) + // .await + // .unwrap_or(false); + // self.update_disk_health(index, online).await; + // online + // } - async fn filter_online_disks(&self, disks: Vec>) -> (Vec>, usize) { - let mut filtered = Vec::with_capacity(disks.len()); - let mut online_count = 0; + // async fn filter_online_disks(&self, disks: Vec>) -> (Vec>, usize) { + // let mut filtered = Vec::with_capacity(disks.len()); + // let mut online_count = 0; - for (idx, disk) in disks.into_iter().enumerate() { - if let Some(disk_store) = disk { - if self.is_disk_online_cached(idx, &disk_store).await { - filtered.push(Some(disk_store)); - online_count += 1; - } else { - filtered.push(None); - } - } else { - filtered.push(None); - } - } + // for (idx, disk) in disks.into_iter().enumerate() { + // if let Some(disk_store) = disk { + // if self.is_disk_online_cached(idx, &disk_store).await { + // filtered.push(Some(disk_store)); + // online_count += 1; + // } else { + // filtered.push(None); + // } + // } else { + // filtered.push(None); + // } + // } - (filtered, online_count) - } + // (filtered, online_count) + // } fn format_lock_error(&self, bucket: &str, object: &str, mode: &str, err: &LockResult) -> String { match err { LockResult::Timeout => { @@ -259,9 +259,28 @@ impl SetDisks { } async fn get_online_disks(&self) -> Vec> { - let disks = self.get_disks_internal().await; - let (filtered, _) = self.filter_online_disks(disks).await; - filtered.into_iter().filter(|disk| disk.is_some()).collect() + let mut disks = self.get_disks_internal().await; + + // TODO: diskinfo filter online + + let mut new_disk = Vec::with_capacity(disks.len()); + + for disk in disks.iter() { + if let Some(d) = disk { + if d.is_online().await { + new_disk.push(disk.clone()); + } + } + } + + let mut rng = rand::rng(); + + disks.shuffle(&mut rng); + + new_disk + // let disks = self.get_disks_internal().await; + // let (filtered, _) = self.filter_online_disks(disks).await; + // filtered.into_iter().filter(|disk| disk.is_some()).collect() } async fn get_online_local_disks(&self) -> Vec> { let mut disks = self.get_online_disks().await; @@ -1467,7 +1486,9 @@ impl SetDisks { let object = object.clone(); let version_id = version_id.clone(); tokio::spawn(async move { - if let Some(disk) = disk { + if let Some(disk) = disk + && disk.is_online().await + { if version_id.is_empty() { match disk.read_xl(&bucket, &object, read_data).await { Ok(info) => { @@ -1799,14 +1820,14 @@ impl SetDisks { } pub async fn renew_disk(&self, ep: &Endpoint) { - debug!("renew_disk start {:?}", ep); + debug!("renew_disk: start {:?}", ep); let (new_disk, fm) = match Self::connect_endpoint(ep).await { Ok(res) => res, Err(e) => { - warn!("connect_endpoint err {:?}", &e); + warn!("renew_disk: connect_endpoint err {:?}", &e); if ep.is_local && e == DiskError::UnformattedDisk { - info!("unformatteddisk will trigger heal_disk, {:?}", ep); + info!("renew_disk unformatteddisk will trigger heal_disk, {:?}", ep); let set_disk_id = format!("pool_{}_set_{}", ep.pool_idx, ep.set_idx); let _ = send_heal_disk(set_disk_id, Some(HealChannelPriority::Normal)).await; } @@ -1817,7 +1838,7 @@ impl SetDisks { let (set_idx, disk_idx) = match self.find_disk_index(&fm) { Ok(res) => res, Err(e) => { - warn!("find_disk_index err {:?}", e); + warn!("renew_disk: find_disk_index err {:?}", e); return; } }; @@ -1837,7 +1858,7 @@ impl SetDisks { } } - debug!("renew_disk update {:?}", fm.erasure.this); + debug!("renew_disk: update {:?}", fm.erasure.this); let mut disk_lock = self.disks.write().await; disk_lock[disk_idx] = Some(new_disk); @@ -3051,7 +3072,7 @@ impl SetDisks { for (index, disk) in latest_disks.iter().enumerate() { if let Some(outdated_disk) = &out_dated_disks[index] { info!(disk_index = index, "Creating writer for outdated disk"); - let writer = create_bitrot_writer( + let writer = match create_bitrot_writer( is_inline_buffer, Some(outdated_disk), RUSTFS_META_TMP_BUCKET, @@ -3060,7 +3081,19 @@ impl SetDisks { erasure.shard_size(), HashAlgorithm::HighwayHash256, ) - .await?; + .await + { + Ok(writer) => writer, + Err(err) => { + warn!( + "create_bitrot_writer disk {}, err {:?}, skipping operation", + outdated_disk.to_string(), + err + ); + writers.push(None); + continue; + } + }; writers.push(Some(writer)); } else { info!(disk_index = index, "Skipping writer (disk not outdated)"); @@ -3790,8 +3823,8 @@ impl ObjectIO for SetDisks { #[tracing::instrument(level = "debug", skip(self, data,))] async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result { - let disks_snapshot = self.get_disks_internal().await; - let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await; + let disks = self.get_disks_internal().await; + // let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await; // Acquire per-object exclusive lock via RAII guard. It auto-releases asynchronously on drop. let _object_lock_guard = if !opts.no_lock { @@ -3832,13 +3865,13 @@ impl ObjectIO for SetDisks { write_quorum += 1 } - if filtered_online < write_quorum { - warn!( - "online disk snapshot {} below write quorum {} for {}/{}; returning erasure write quorum error", - filtered_online, write_quorum, bucket, object - ); - return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object])); - } + // if filtered_online < write_quorum { + // warn!( + // "online disk snapshot {} below write quorum {} for {}/{}; returning erasure write quorum error", + // filtered_online, write_quorum, bucket, object + // ); + // return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object])); + // } let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives); @@ -3877,8 +3910,10 @@ impl ObjectIO for SetDisks { let mut writers = Vec::with_capacity(shuffle_disks.len()); let mut errors = Vec::with_capacity(shuffle_disks.len()); for disk_op in shuffle_disks.iter() { - if let Some(disk) = disk_op { - let writer = create_bitrot_writer( + if let Some(disk) = disk_op + && disk.is_online().await + { + let writer = match create_bitrot_writer( is_inline_buffer, Some(disk), RUSTFS_META_TMP_BUCKET, @@ -3887,29 +3922,16 @@ impl ObjectIO for SetDisks { erasure.shard_size(), HashAlgorithm::HighwayHash256, ) - .await?; - - // let writer = if is_inline_buffer { - // BitrotWriter::new( - // Writer::from_cursor(Cursor::new(Vec::new())), - // erasure.shard_size(), - // HashAlgorithm::HighwayHash256, - // ) - // } else { - // let f = match disk - // .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_object, erasure.shard_file_size(data.content_length)) - // .await - // { - // Ok(f) => f, - // Err(e) => { - // errors.push(Some(e)); - // writers.push(None); - // continue; - // } - // }; - - // BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256) - // }; + .await + { + Ok(writer) => writer, + Err(err) => { + warn!("create_bitrot_writer disk {}, err {:?}, skipping operation", disk.to_string(), err); + errors.push(Some(err)); + writers.push(None); + continue; + } + }; writers.push(Some(writer)); errors.push(None); @@ -4072,7 +4094,7 @@ impl StorageAPI for SetDisks { async fn local_storage_info(&self) -> rustfs_madmin::StorageInfo { let disks = self.get_disks_internal().await; - let mut local_disks: Vec>> = Vec::new(); + let mut local_disks: Vec> = Vec::new(); let mut local_endpoints = Vec::new(); for (i, ep) in self.set_endpoints.iter().enumerate() { @@ -4908,9 +4930,7 @@ impl StorageAPI for SetDisks { for disk in disks.iter() { if let Some(disk) = disk { - if disk.is_online().await { - continue; - } + continue; } let _ = self.add_partial(bucket, object, opts.version_id.as_ref().expect("err")).await; break; @@ -5129,16 +5149,16 @@ impl StorageAPI for SetDisks { return Err(Error::other(format!("checksum mismatch: {checksum}"))); } - let disks_snapshot = self.get_disks_internal().await; - let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await; + let disks = self.get_disks_internal().await; + // let (disks, filtered_online) = self.filter_online_disks(disks_snapshot).await; - if filtered_online < write_quorum { - warn!( - "online disk snapshot {} below write quorum {} for multipart {}/{}; returning erasure write quorum error", - filtered_online, write_quorum, bucket, object - ); - return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object])); - } + // if filtered_online < write_quorum { + // warn!( + // "online disk snapshot {} below write quorum {} for multipart {}/{}; returning erasure write quorum error", + // filtered_online, write_quorum, bucket, object + // ); + // return Err(to_object_err(Error::ErasureWriteQuorum, vec![bucket, object])); + // } let shuffle_disks = Self::shuffle_disks(&disks, &fi.erasure.distribution); @@ -5152,7 +5172,7 @@ impl StorageAPI for SetDisks { let mut errors = Vec::with_capacity(shuffle_disks.len()); for disk_op in shuffle_disks.iter() { if let Some(disk) = disk_op { - let writer = create_bitrot_writer( + let writer = match create_bitrot_writer( false, Some(disk), RUSTFS_META_TMP_BUCKET, @@ -5161,23 +5181,16 @@ impl StorageAPI for SetDisks { erasure.shard_size(), HashAlgorithm::HighwayHash256, ) - .await?; - - // let writer = { - // let f = match disk - // .create_file("", RUSTFS_META_TMP_BUCKET, &tmp_part_path, erasure.shard_file_size(data.content_length)) - // .await - // { - // Ok(f) => f, - // Err(e) => { - // errors.push(Some(e)); - // writers.push(None); - // continue; - // } - // }; - - // BitrotWriter::new(Writer::from_tokio_writer(f), erasure.shard_size(), HashAlgorithm::HighwayHash256) - // }; + .await + { + Ok(writer) => writer, + Err(err) => { + warn!("create_bitrot_writer disk {}, err {:?}, skipping operation", disk.to_string(), err); + errors.push(Some(err)); + writers.push(None); + continue; + } + }; writers.push(Some(writer)); errors.push(None); @@ -6769,7 +6782,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec{ diff --git a/crates/ecstore/src/store_init.rs b/crates/ecstore/src/store_init.rs index 965088d0..437b5218 100644 --- a/crates/ecstore/src/store_init.rs +++ b/crates/ecstore/src/store_init.rs @@ -265,7 +265,10 @@ pub async fn load_format_erasure(disk: &DiskStore, heal: bool) -> disk::error::R .map_err(|e| match e { DiskError::FileNotFound => DiskError::UnformattedDisk, DiskError::DiskNotFound => DiskError::UnformattedDisk, - _ => e, + _ => { + warn!("load_format_erasure err: {:?} {:?}", disk.to_string(), e); + e + } })?; let mut fm = FormatV3::try_from(data.as_ref())?; @@ -312,17 +315,18 @@ async fn save_format_file_all(disks: &[Option], formats: &[Option, format: &Option) -> disk::error::Result<()> { - if disk.is_none() { + let Some(disk) = disk else { return Err(DiskError::DiskNotFound); - } + }; - let format = format.as_ref().unwrap(); + let Some(format) = format else { + return Err(DiskError::other("format is none")); + }; let json_data = format.to_json()?; let tmpfile = Uuid::new_v4().to_string(); - let disk = disk.as_ref().unwrap(); disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data.into_bytes().into()) .await?; diff --git a/crates/protos/src/lib.rs b/crates/protos/src/lib.rs index 9b3a2aa4..42fab1f4 100644 --- a/crates/protos/src/lib.rs +++ b/crates/protos/src/lib.rs @@ -26,6 +26,11 @@ use tonic::{ }; use tracing::{debug, warn}; +// Type alias for the complex client type +pub type NodeServiceClientType = NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, +>; + pub use generated::*; // Default 100 MB diff --git a/crates/utils/src/string.rs b/crates/utils/src/string.rs index 42a8e0a6..8d3879d1 100644 --- a/crates/utils/src/string.rs +++ b/crates/utils/src/string.rs @@ -48,6 +48,14 @@ pub fn parse_bool(str: &str) -> Result { } } +pub fn parse_bool_with_default(str: &str, default: bool) -> bool { + match str { + "1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => true, + "0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => false, + _ => default, + } +} + /// Matches a simple pattern against a name using wildcards. /// /// # Arguments