add ns lock

This commit is contained in:
weisd
2025-12-24 11:41:04 +08:00
parent e437d42d31
commit 0aad1ed6aa
8 changed files with 34 additions and 3 deletions

View File

@@ -260,6 +260,7 @@ impl LocalDiskWrapper {
debug!("health check: performing health check");
if Self::perform_health_check(disk.clone(), &TEST_BUCKET, &TEST_OBJ, &TEST_DATA, true, CHECK_TIMEOUT_DURATION).await.is_err() && health.swap_ok_to_faulty() {
// Health check failed, disk is considered faulty
warn!("health check: failed, disk is considered faulty");
health.increment_waiting(); // Balance the increment from failed operation
@@ -429,7 +430,7 @@ impl LocalDiskWrapper {
{
// Check if disk is faulty
if self.health.is_faulty() {
warn!("disk {} health is faulty, returning error", self.to_string());
warn!("local disk {} health is faulty, returning error", self.to_string());
return Err(DiskError::FaultyDisk);
}

View File

@@ -871,7 +871,7 @@ impl LocalDisk {
}
// write_all_private with check_path_length
#[tracing::instrument(level = "debug", skip_all)]
#[tracing::instrument(level = "debug", skip(self, buf, sync, skip_parent))]
pub async fn write_all_private(&self, volume: &str, path: &str, buf: Bytes, sync: bool, skip_parent: &Path) -> Result<()> {
let volume_dir = self.get_bucket_path(volume)?;
let file_path = volume_dir.join(Path::new(&path));

View File

@@ -231,7 +231,7 @@ impl RemoteDisk {
{
// Check if disk is faulty
if self.health.is_faulty() {
warn!("disk {} health is faulty, returning error", self.to_string());
warn!("remote disk {} health is faulty, returning error", self.to_string());
return Err(DiskError::FaultyDisk);
}

View File

@@ -73,6 +73,7 @@ use rustfs_filemeta::{
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
RawFileInfo, ReplicationStatusType, VersionPurgeStatusType, file_info_from_raw, merge_file_meta_versions,
};
use rustfs_lock::FastLockGuard;
use rustfs_lock::fast_lock::types::LockResult;
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, WarpReader};
@@ -4075,6 +4076,14 @@ impl ObjectIO for SetDisks {
#[async_trait::async_trait]
impl StorageAPI for SetDisks {
#[tracing::instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
self.fast_lock_manager
.acquire_write_lock(bucket, object, self.locker_owner.as_str())
.await
.map_err(|e| Error::other(self.format_lock_error(bucket, object, "write", &e)))
}
#[tracing::instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
unimplemented!()

View File

@@ -45,6 +45,7 @@ use rustfs_common::{
};
use rustfs_filemeta::FileInfo;
use rustfs_lock::FastLockGuard;
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_utils::{crc_hash, path::path_join_buf, sip_hash};
use tokio::sync::RwLock;
@@ -366,6 +367,10 @@ impl ObjectIO for Sets {
#[async_trait::async_trait]
impl StorageAPI for Sets {
#[tracing::instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
self.disk_set[0].new_ns_lock(bucket, object).await
}
#[tracing::instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
unimplemented!()

View File

@@ -58,6 +58,7 @@ use rand::Rng as _;
use rustfs_common::globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT};
use rustfs_common::heal_channel::{HealItemType, HealOpts};
use rustfs_filemeta::FileInfo;
use rustfs_lock::FastLockGuard;
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_utils::path::{SLASH_SEPARATOR, decode_dir_object, encode_dir_object, path_join_buf};
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
@@ -1151,6 +1152,10 @@ lazy_static! {
#[async_trait::async_trait]
impl StorageAPI for ECStore {
#[instrument(skip(self))]
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard> {
self.pools[0].new_ns_lock(bucket, object).await
}
#[instrument(skip(self))]
async fn backend_info(&self) -> rustfs_madmin::BackendInfo {
let (standard_sc_parity, rr_sc_parity) = {

View File

@@ -30,6 +30,7 @@ use rustfs_filemeta::{
FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, REPLICATION_RESET, REPLICATION_STATUS, ReplicateDecision, ReplicationState,
ReplicationStatusType, VersionPurgeStatusType, replication_statuses_map, version_purge_statuses_map,
};
use rustfs_lock::FastLockGuard;
use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_rio::Checksum;
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
@@ -1299,6 +1300,7 @@ pub trait ObjectIO: Send + Sync + Debug + 'static {
#[allow(clippy::too_many_arguments)]
pub trait StorageAPI: ObjectIO + Debug {
// NewNSLock TODO:
async fn new_ns_lock(&self, bucket: &str, object: &str) -> Result<FastLockGuard>;
// Shutdown TODO:
// NSScanner TODO:

View File

@@ -22,7 +22,9 @@ use crate::{DataUsageInfo, ScannerError};
use chrono::{DateTime, Utc};
use rustfs_common::heal_channel::HealScanMode;
use rustfs_config::{DEFAULT_DATA_SCANNER_START_DELAY_SECS, ENV_DATA_SCANNER_START_DELAY_SECS};
use rustfs_ecstore::StorageAPI as _;
use rustfs_ecstore::config::com::{read_config, save_config};
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
use rustfs_ecstore::error::Error as EcstoreError;
use rustfs_ecstore::global::is_erasure_sd;
use rustfs_ecstore::store::ECStore;
@@ -123,6 +125,13 @@ pub async fn save_background_heal_info(storeapi: Arc<ECStore>, info: BackgroundH
pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) -> Result<(), ScannerError> {
// TODO: leader lock
let _guard = match storeapi.new_ns_lock(RUSTFS_META_BUCKET, "leader.lock").await {
Ok(guard) => guard,
Err(e) => {
error!("run_data_scanner: other node is running, failed to acquire leader lock: {e}");
return Ok(());
}
};
let mut cycle_info = CurrentCycle::default();
let buf = read_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH)