diff --git a/crates/ecstore/src/disk/disk_store.rs b/crates/ecstore/src/disk/disk_store.rs index d9ecaf84..12cad517 100644 --- a/crates/ecstore/src/disk/disk_store.rs +++ b/crates/ecstore/src/disk/disk_store.rs @@ -260,6 +260,7 @@ impl LocalDiskWrapper { debug!("health check: performing health check"); if Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, true, CHECK_TIMEOUT_DURATION).await.is_err() && health.swap_ok_to_faulty() { // Health check failed, disk is considered faulty + warn!("health check: failed, disk is considered faulty"); health.increment_waiting(); // Balance the increment from failed operation @@ -429,7 +430,7 @@ impl LocalDiskWrapper { { // Check if disk is faulty if self.health.is_faulty() { - warn!("disk {} health is faulty, returning error", self.to_string()); + warn!("local disk {} health is faulty, returning error", self.to_string()); return Err(DiskError::FaultyDisk); } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index bc491424..a9395575 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -871,7 +871,7 @@ impl LocalDisk { } // write_all_private with check_path_length - #[tracing::instrument(level = "debug", skip_all)] + #[tracing::instrument(level = "debug", skip(self, buf, sync, skip_parent))] pub async fn write_all_private(&self, volume: &str, path: &str, buf: Bytes, sync: bool, skip_parent: &Path) -> Result<()> { let volume_dir = self.get_bucket_path(volume)?; let file_path = volume_dir.join(Path::new(&path)); diff --git a/crates/ecstore/src/rpc/remote_disk.rs b/crates/ecstore/src/rpc/remote_disk.rs index 67fc8a04..6b3dac54 100644 --- a/crates/ecstore/src/rpc/remote_disk.rs +++ b/crates/ecstore/src/rpc/remote_disk.rs @@ -231,7 +231,7 @@ impl RemoteDisk { { // Check if disk is faulty if self.health.is_faulty() { - warn!("disk {} health is faulty, returning error", self.to_string()); + warn!("remote disk {} health is faulty, returning error", self.to_string()); return Err(DiskError::FaultyDisk); } diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 47ab24ea..cd10ca95 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -73,6 +73,7 @@ use rustfs_filemeta::{ FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo, RawFileInfo, ReplicationStatusType, VersionPurgeStatusType, file_info_from_raw, merge_file_meta_versions, }; +use rustfs_lock::FastLockGuard; use rustfs_lock::fast_lock::types::LockResult; use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem}; use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, WarpReader}; @@ -4075,6 +4076,14 @@ impl ObjectIO for SetDisks { #[async_trait::async_trait] impl StorageAPI for SetDisks { + #[tracing::instrument(skip(self))] + async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result { + self.fast_lock_manager + .acquire_write_lock(bucket, object, self.locker_owner.as_str()) + .await + .map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e))) + } + #[tracing::instrument(skip(self))] async fn backend_info(&self) -> rustfs_madmin::BackendInfo { unimplemented!() diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 90812cd8..d321948f 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -45,6 +45,7 @@ use rustfs_common::{ }; use rustfs_filemeta::FileInfo; +use rustfs_lock::FastLockGuard; use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem}; use rustfs_utils::{crc_hash, path::path_join_buf, sip_hash}; use tokio::sync::RwLock; @@ -366,6 +367,10 @@ impl ObjectIO for Sets { #[async_trait::async_trait] impl StorageAPI for Sets { + #[tracing::instrument(skip(self))] + async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result { + self.disk_set[0].new_ns_lock(bucket, object).await + } #[tracing::instrument(skip(self))] async fn backend_info(&self) -> rustfs_madmin::BackendInfo { unimplemented!() diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 2259e5b5..df0b6c66 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -58,6 +58,7 @@ use rand::Rng as _; use rustfs_common::globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT}; use rustfs_common::heal_channel::{HealItemType, HealOpts}; use rustfs_filemeta::FileInfo; +use rustfs_lock::FastLockGuard; use rustfs_madmin::heal_commands::HealResultItem; use rustfs_utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf}; use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration}; @@ -1151,6 +1152,10 @@ lazy_static! { #[async_trait::async_trait] impl StorageAPI for ECStore { + #[instrument(skip(self))] + async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result { + self.pools[0].new_ns_lock(bucket, object).await + } #[instrument(skip(self))] async fn backend_info(&self) -> rustfs_madmin::BackendInfo { let (standard_sc_parity, rr_sc_parity) = { diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index 7c3ce857..e1c2b21c 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -30,6 +30,7 @@ use rustfs_filemeta::{ FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState, ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map, }; +use rustfs_lock::FastLockGuard; use rustfs_madmin::heal_commands::HealResultItem; use rustfs_rio::Checksum; use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader}; @@ -1299,6 +1300,7 @@ pub trait ObjectIO: Send + Sync + Debug + 'static { #[allow(clippy::too_many_arguments)] pub trait StorageAPI: ObjectIO + Debug { // NewNSLock TODO: + async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result; // Shutdown TODO: // NSScanner TODO: diff --git a/crates/scanner/src/scanner.rs b/crates/scanner/src/scanner.rs index f90538e1..c73a3b97 100644 --- a/crates/scanner/src/scanner.rs +++ b/crates/scanner/src/scanner.rs @@ -22,7 +22,9 @@ use crate::{DataUsageInfo, ScannerError}; use chrono::{DateTime, Utc}; use rustfs_common::heal_channel::HealScanMode; use rustfs_config::{DEFAULT_DATA_SCANNER_START_DELAY_SECS, ENV_DATA_SCANNER_START_DELAY_SECS}; +use rustfs_ecstore::StorageAPI as _; use rustfs_ecstore::config::com::{read_config, save_config}; +use rustfs_ecstore::disk::RUSTFS_META_BUCKET; use rustfs_ecstore::error::Error as EcstoreError; use rustfs_ecstore::global::is_erasure_sd; use rustfs_ecstore::store::ECStore; @@ -123,6 +125,13 @@ pub async fn save_background_heal_info(storeapi: Arc, info: BackgroundH pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc) -> Result<(), ScannerError> { // TODO: leader lock + let _guard = match storeapi.new_ns_lock(RUSTFS_META_BUCKET, "leader.lock").await { + Ok(guard) => guard, + Err(e) => { + error!("run_data_scanner: other node is running, failed to acquire leader lock: {e}"); + return Ok(()); + } + }; let mut cycle_info = CurrentCycle::default(); let buf = read_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH)