feat(lock): Add support for disabling lock manager (#511)

* feat(lock): Add support for disabling lock manager
Implement control of lock system activation and deactivation via environment variables
Add DisabledLockManager for lock-free operation scenarios
Introduce LockManager trait to uniformly manage different lock managers

Signed-off-by: junxiang Mu <1948535941@qq.com>

* refactor(lock): Optimize implementation of global lock manager and parsing of boolean environment variables
Refactor the implementation of the global lock manager: wrap FastObjectLockManager with Arc and add the as_fast_lock_manager method
Extract the boolean environment variable parsing logic into an independent function parse_bool_env_var

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
guojidan
2025-09-11 13:46:06 +08:00
committed by GitHub
parent d4beb1cc0b
commit cf863ba059
11 changed files with 1052 additions and 44 deletions

View File

@@ -0,0 +1,43 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Example demonstrating environment variable control of lock system
use rustfs_lock::{LockManager, get_global_lock_manager};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = get_global_lock_manager();
println!("Lock system status: {}", if manager.is_disabled() { "DISABLED" } else { "ENABLED" });
match std::env::var("RUSTFS_ENABLE_LOCKS") {
Ok(value) => println!("RUSTFS_ENABLE_LOCKS set to: {}", value),
Err(_) => println!("RUSTFS_ENABLE_LOCKS not set (defaults to enabled)"),
}
// Test acquiring a lock
let result = manager.acquire_read_lock("test-bucket", "test-object", "test-owner").await;
match result {
Ok(guard) => {
println!("Lock acquired successfully! Disabled: {}", guard.is_disabled());
}
Err(e) => {
println!("Failed to acquire lock: {:?}", e);
}
}
println!("Environment control example completed");
Ok(())
}

View File

@@ -17,9 +17,10 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use crate::{
GlobalLockManager,
client::LockClient,
error::Result,
fast_lock::{FastLockGuard, FastObjectLockManager},
fast_lock::{FastLockGuard, LockManager},
types::{LockId, LockInfo, LockMetadata, LockPriority, LockRequest, LockResponse, LockStats, LockType},
};
@@ -37,9 +38,9 @@ impl LocalClient {
}
}
/// Get the global fast lock manager
pub fn get_fast_lock_manager(&self) -> Arc<FastObjectLockManager> {
crate::get_global_fast_lock_manager()
/// Get the global lock manager
pub fn get_lock_manager(&self) -> Arc<GlobalLockManager> {
crate::get_global_lock_manager()
}
}
@@ -52,11 +53,11 @@ impl Default for LocalClient {
#[async_trait::async_trait]
impl LockClient for LocalClient {
async fn acquire_exclusive(&self, request: &LockRequest) -> Result<LockResponse> {
let fast_lock_manager = self.get_fast_lock_manager();
let lock_manager = self.get_lock_manager();
let lock_request = crate::fast_lock::ObjectLockRequest::new_write("", request.resource.clone(), request.owner.clone())
.with_acquire_timeout(request.acquire_timeout);
match fast_lock_manager.acquire_lock(lock_request).await {
match lock_manager.acquire_lock(lock_request).await {
Ok(guard) => {
let lock_id = crate::types::LockId::new_deterministic(&request.resource);
@@ -96,11 +97,11 @@ impl LockClient for LocalClient {
}
async fn acquire_shared(&self, request: &LockRequest) -> Result<LockResponse> {
let fast_lock_manager = self.get_fast_lock_manager();
let lock_manager = self.get_lock_manager();
let lock_request = crate::fast_lock::ObjectLockRequest::new_read("", request.resource.clone(), request.owner.clone())
.with_acquire_timeout(request.acquire_timeout);
match fast_lock_manager.acquire_lock(lock_request).await {
match lock_manager.acquire_lock(lock_request).await {
Ok(guard) => {
let lock_id = crate::types::LockId::new_deterministic(&request.resource);

View File

@@ -0,0 +1,291 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Disabled lock manager that bypasses all locking operations
//! Used when RUSTFS_ENABLE_LOCKS environment variable is set to false
use std::sync::Arc;
use crate::fast_lock::{
guard::FastLockGuard,
manager_trait::LockManager,
metrics::AggregatedMetrics,
types::{BatchLockRequest, BatchLockResult, LockConfig, LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest},
};
/// Disabled lock manager that always returns success without actual locking
///
/// This manager is used when locks are disabled via environment variables.
/// All lock operations immediately return success, effectively bypassing
/// the locking mechanism entirely.
#[derive(Debug)]
pub struct DisabledLockManager {
_config: LockConfig,
}
impl DisabledLockManager {
/// Create new disabled lock manager
pub fn new() -> Self {
Self::with_config(LockConfig::default())
}
/// Create new disabled lock manager with custom config
pub fn with_config(config: LockConfig) -> Self {
Self { _config: config }
}
/// Always succeeds - returns a no-op guard
pub async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {
Ok(FastLockGuard::new_disabled(request.key, request.mode, request.owner))
}
/// Always succeeds - returns a no-op guard
pub async fn acquire_read_lock(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request = ObjectLockRequest::new_read(bucket, object, owner);
self.acquire_lock(request).await
}
/// Always succeeds - returns a no-op guard
pub async fn acquire_read_lock_versioned(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
version: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request = ObjectLockRequest::new_read(bucket, object, owner).with_version(version);
self.acquire_lock(request).await
}
/// Always succeeds - returns a no-op guard
pub async fn acquire_write_lock(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request = ObjectLockRequest::new_write(bucket, object, owner);
self.acquire_lock(request).await
}
/// Always succeeds - returns a no-op guard
pub async fn acquire_write_lock_versioned(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
version: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request = ObjectLockRequest::new_write(bucket, object, owner).with_version(version);
self.acquire_lock(request).await
}
/// Always succeeds - all locks acquired
pub async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult {
let successful_locks: Vec<ObjectKey> = batch_request.requests.into_iter().map(|req| req.key).collect();
BatchLockResult {
successful_locks,
failed_locks: Vec::new(),
all_acquired: true,
}
}
/// Always returns None - no locks to query
pub fn get_lock_info(&self, _key: &ObjectKey) -> Option<ObjectLockInfo> {
None
}
/// Returns empty metrics
pub fn get_metrics(&self) -> AggregatedMetrics {
AggregatedMetrics::empty()
}
/// Always returns 0 - no locks exist
pub fn total_lock_count(&self) -> usize {
0
}
/// Returns empty pool stats
pub fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> {
Vec::new()
}
/// No-op cleanup - nothing to clean
pub async fn cleanup_expired(&self) -> usize {
0
}
/// No-op cleanup - nothing to clean
pub async fn cleanup_expired_traditional(&self) -> usize {
0
}
/// No-op shutdown
pub async fn shutdown(&self) {
// Nothing to shutdown
}
}
impl Default for DisabledLockManager {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl LockManager for DisabledLockManager {
async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {
self.acquire_lock(request).await
}
async fn acquire_read_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_read_lock(bucket, object, owner).await
}
async fn acquire_read_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_read_lock_versioned(bucket, object, version, owner).await
}
async fn acquire_write_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_write_lock(bucket, object, owner).await
}
async fn acquire_write_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_write_lock_versioned(bucket, object, version, owner).await
}
async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult {
self.acquire_locks_batch(batch_request).await
}
fn get_lock_info(&self, key: &ObjectKey) -> Option<ObjectLockInfo> {
self.get_lock_info(key)
}
fn get_metrics(&self) -> AggregatedMetrics {
self.get_metrics()
}
fn total_lock_count(&self) -> usize {
self.total_lock_count()
}
fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> {
self.get_pool_stats()
}
async fn cleanup_expired(&self) -> usize {
self.cleanup_expired().await
}
async fn cleanup_expired_traditional(&self) -> usize {
self.cleanup_expired_traditional().await
}
async fn shutdown(&self) {
self.shutdown().await
}
fn is_disabled(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_disabled_manager_basic_operations() {
let manager = DisabledLockManager::new();
// All operations should succeed immediately
let read_guard = manager
.acquire_read_lock("bucket", "object", "owner1")
.await
.expect("Disabled manager should always succeed");
let write_guard = manager
.acquire_write_lock("bucket", "object", "owner2")
.await
.expect("Disabled manager should always succeed");
// Guards should indicate they are disabled
assert!(read_guard.is_disabled());
assert!(write_guard.is_disabled());
}
#[tokio::test]
async fn test_disabled_manager_batch_operations() {
let manager = DisabledLockManager::new();
let batch = BatchLockRequest::new("owner")
.add_read_lock("bucket", "obj1")
.add_write_lock("bucket", "obj2")
.with_all_or_nothing(true);
let result = manager.acquire_locks_batch(batch).await;
assert!(result.all_acquired);
assert_eq!(result.successful_locks.len(), 2);
assert!(result.failed_locks.is_empty());
}
#[tokio::test]
async fn test_disabled_manager_metrics() {
let manager = DisabledLockManager::new();
// Metrics should indicate empty/disabled state
let metrics = manager.get_metrics();
assert!(metrics.is_empty());
assert_eq!(manager.total_lock_count(), 0);
assert!(manager.get_pool_stats().is_empty());
}
#[tokio::test]
async fn test_disabled_manager_cleanup() {
let manager = DisabledLockManager::new();
// Cleanup should be no-op
assert_eq!(manager.cleanup_expired().await, 0);
assert_eq!(manager.cleanup_expired_traditional().await, 0);
}
}

View File

@@ -26,8 +26,9 @@ pub struct FastLockGuard {
key: ObjectKey,
mode: LockMode,
owner: Arc<str>,
shard: Arc<LockShard>,
shard: Option<Arc<LockShard>>, // None when locks are disabled
released: bool,
disabled: bool, // True when locks are disabled globally
}
impl FastLockGuard {
@@ -36,8 +37,21 @@ impl FastLockGuard {
key,
mode,
owner,
shard,
shard: Some(shard),
released: false,
disabled: false,
}
}
/// Create a disabled guard (when locks are globally disabled)
pub(crate) fn new_disabled(key: ObjectKey, mode: LockMode, owner: Arc<str>) -> Self {
Self {
key,
mode,
owner,
shard: None,
released: false,
disabled: true,
}
}
@@ -65,11 +79,23 @@ impl FastLockGuard {
return false;
}
let success = self.shard.release_lock(&self.key, &self.owner, self.mode);
if success {
if self.disabled {
// For disabled locks, always succeed
self.released = true;
return true;
}
if let Some(shard) = &self.shard {
let success = shard.release_lock(&self.key, &self.owner, self.mode);
if success {
self.released = true;
}
success
} else {
// Should not happen, but handle gracefully
self.released = true;
false
}
success
}
/// Check if the lock has been released
@@ -77,27 +103,36 @@ impl FastLockGuard {
self.released
}
/// Check if this guard represents a disabled lock
pub fn is_disabled(&self) -> bool {
self.disabled
}
/// Get lock information for monitoring
pub fn lock_info(&self) -> Option<crate::fast_lock::types::ObjectLockInfo> {
if self.released {
if self.released || self.disabled {
None
} else if let Some(shard) = &self.shard {
shard.get_lock_info(&self.key)
} else {
self.shard.get_lock_info(&self.key)
None
}
}
}
impl Drop for FastLockGuard {
fn drop(&mut self) {
if !self.released {
let success = self.shard.release_lock(&self.key, &self.owner, self.mode);
if !success {
tracing::warn!(
"Failed to release lock during drop: key={}, owner={}, mode={:?}",
self.key,
self.owner,
self.mode
);
if !self.released && !self.disabled {
if let Some(shard) = &self.shard {
let success = shard.release_lock(&self.key, &self.owner, self.mode);
if !success {
tracing::warn!(
"Failed to release lock during drop: key={}, owner={}, mode={:?}",
self.key,
self.owner,
self.mode
);
}
}
}
}
@@ -110,6 +145,7 @@ impl std::fmt::Debug for FastLockGuard {
.field("mode", &self.mode)
.field("owner", &self.owner)
.field("released", &self.released)
.field("disabled", &self.disabled)
.finish()
}
}

View File

@@ -18,9 +18,10 @@ use tokio::time::{Instant, interval};
use crate::fast_lock::{
guard::FastLockGuard,
metrics::GlobalMetrics,
manager_trait::LockManager,
metrics::{AggregatedMetrics, GlobalMetrics},
shard::LockShard,
types::{BatchLockRequest, BatchLockResult, LockConfig, LockResult, ObjectLockRequest},
types::{BatchLockRequest, BatchLockResult, LockConfig, LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest},
};
/// High-performance object lock manager
@@ -361,6 +362,87 @@ impl Drop for FastObjectLockManager {
}
}
#[async_trait::async_trait]
impl LockManager for FastObjectLockManager {
async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {
self.acquire_lock(request).await
}
async fn acquire_read_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_read_lock(bucket, object, owner).await
}
async fn acquire_read_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_read_lock_versioned(bucket, object, version, owner).await
}
async fn acquire_write_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_write_lock(bucket, object, owner).await
}
async fn acquire_write_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult> {
self.acquire_write_lock_versioned(bucket, object, version, owner).await
}
async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult {
self.acquire_locks_batch(batch_request).await
}
fn get_lock_info(&self, key: &ObjectKey) -> Option<ObjectLockInfo> {
self.get_lock_info(key)
}
fn get_metrics(&self) -> AggregatedMetrics {
self.get_metrics()
}
fn total_lock_count(&self) -> usize {
self.total_lock_count()
}
fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> {
self.get_pool_stats()
}
async fn cleanup_expired(&self) -> usize {
self.cleanup_expired().await
}
async fn cleanup_expired_traditional(&self) -> usize {
self.cleanup_expired_traditional().await
}
async fn shutdown(&self) {
self.shutdown().await
}
fn is_disabled(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,93 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Unified trait for lock managers (enabled and disabled)
use crate::fast_lock::{
guard::FastLockGuard,
metrics::AggregatedMetrics,
types::{BatchLockRequest, BatchLockResult, LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest},
};
use std::sync::Arc;
/// Unified trait for lock managers
///
/// This trait allows transparent switching between enabled and disabled lock managers
/// based on environment variables.
#[async_trait::async_trait]
pub trait LockManager: Send + Sync {
/// Acquire object lock
async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult>;
/// Acquire shared (read) lock
async fn acquire_read_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult>;
/// Acquire shared (read) lock for specific version
async fn acquire_read_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult>;
/// Acquire exclusive (write) lock
async fn acquire_write_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult>;
/// Acquire exclusive (write) lock for specific version
async fn acquire_write_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> Result<FastLockGuard, LockResult>;
/// Acquire multiple locks atomically
async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult;
/// Get lock information for monitoring
fn get_lock_info(&self, key: &ObjectKey) -> Option<ObjectLockInfo>;
/// Get aggregated metrics
fn get_metrics(&self) -> AggregatedMetrics;
/// Get total number of active locks across all shards
fn total_lock_count(&self) -> usize;
/// Get pool statistics from all shards
fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)>;
/// Force cleanup of expired locks
async fn cleanup_expired(&self) -> usize;
/// Force cleanup with traditional strategy
async fn cleanup_expired_traditional(&self) -> usize;
/// Shutdown the lock manager and cleanup resources
async fn shutdown(&self);
/// Check if this manager is disabled
fn is_disabled(&self) -> bool;
}

View File

@@ -142,6 +142,20 @@ pub struct MetricsSnapshot {
}
impl MetricsSnapshot {
/// Create empty snapshot (for disabled lock manager)
pub fn empty() -> Self {
Self {
fast_path_success: 0,
slow_path_success: 0,
timeouts: 0,
releases: 0,
cleanups: 0,
contention_events: 0,
total_wait_time_ns: 0,
max_wait_time_ns: 0,
}
}
pub fn total_acquisitions(&self) -> u64 {
self.fast_path_success + self.slow_path_success
}
@@ -251,6 +265,22 @@ pub struct AggregatedMetrics {
}
impl AggregatedMetrics {
/// Create empty metrics (for disabled lock manager)
pub fn empty() -> Self {
Self {
shard_metrics: MetricsSnapshot::empty(),
shard_count: 0,
uptime: Duration::ZERO,
cleanup_runs: 0,
total_objects_cleaned: 0,
}
}
/// Check if metrics are empty (indicates disabled or no activity)
pub fn is_empty(&self) -> bool {
self.shard_count == 0 && self.shard_metrics.total_acquisitions() == 0 && self.shard_metrics.releases == 0
}
/// Get operations per second
pub fn ops_per_second(&self) -> f64 {
let total_ops = self.shard_metrics.total_acquisitions() + self.shard_metrics.releases;

View File

@@ -24,10 +24,12 @@
//! 4. **Async Optimized** - True async locks that avoid thread blocking
//! 5. **Auto Cleanup** - Access-time based automatic lock reclamation
pub mod disabled_manager;
pub mod guard;
pub mod integration_example;
pub mod integration_test;
pub mod manager;
pub mod manager_trait;
pub mod metrics;
pub mod object_pool;
pub mod optimized_notify;
@@ -39,8 +41,10 @@ pub mod types;
// pub mod benchmarks; // Temporarily disabled due to compilation issues
// Re-export main types
pub use disabled_manager::DisabledLockManager;
pub use guard::FastLockGuard;
pub use manager::FastObjectLockManager;
pub use manager_trait::LockManager;
pub use types::*;
/// Default shard count (must be power of 2)

View File

@@ -42,8 +42,8 @@ pub use crate::{
error::{LockError, Result},
// Fast Lock System exports
fast_lock::{
BatchLockRequest, BatchLockResult, FastLockGuard, FastObjectLockManager, LockMode, LockResult, ObjectKey,
ObjectLockRequest,
BatchLockRequest, BatchLockResult, DisabledLockManager, FastLockGuard, FastObjectLockManager, LockManager, LockMode,
LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest, metrics::AggregatedMetrics,
},
guard::LockGuard,
// Main components
@@ -76,13 +76,198 @@ pub const MAX_DELETE_LIST: usize = 1000;
use once_cell::sync::OnceCell;
use std::sync::Arc;
static GLOBAL_FAST_LOCK_MANAGER: OnceCell<Arc<fast_lock::FastObjectLockManager>> = OnceCell::new();
/// Enum wrapper for different lock manager implementations
pub enum GlobalLockManager {
Enabled(Arc<fast_lock::FastObjectLockManager>),
Disabled(fast_lock::DisabledLockManager),
}
/// Get the global shared FastLock manager instance
impl Default for GlobalLockManager {
fn default() -> Self {
Self::new()
}
}
impl GlobalLockManager {
/// Create a lock manager based on environment variable configuration
pub fn new() -> Self {
// Check RUSTFS_ENABLE_LOCKS environment variable
let locks_enabled = std::env::var("RUSTFS_ENABLE_LOCKS")
.unwrap_or_else(|_| "true".to_string())
.to_lowercase();
match locks_enabled.as_str() {
"false" | "0" | "no" | "off" | "disabled" => {
tracing::info!("Lock system disabled via RUSTFS_ENABLE_LOCKS environment variable");
Self::Disabled(fast_lock::DisabledLockManager::new())
}
_ => {
tracing::info!("Lock system enabled");
Self::Enabled(Arc::new(fast_lock::FastObjectLockManager::new()))
}
}
}
/// Check if the lock manager is disabled
pub fn is_disabled(&self) -> bool {
matches!(self, Self::Disabled(_))
}
/// Get the FastObjectLockManager if enabled, otherwise returns None
pub fn as_fast_lock_manager(&self) -> Option<Arc<fast_lock::FastObjectLockManager>> {
match self {
Self::Enabled(manager) => Some(manager.clone()),
Self::Disabled(_) => None,
}
}
}
#[async_trait::async_trait]
impl fast_lock::LockManager for GlobalLockManager {
async fn acquire_lock(
&self,
request: fast_lock::ObjectLockRequest,
) -> std::result::Result<fast_lock::FastLockGuard, fast_lock::LockResult> {
match self {
Self::Enabled(manager) => manager.acquire_lock(request).await,
Self::Disabled(manager) => manager.acquire_lock(request).await,
}
}
async fn acquire_read_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> std::result::Result<fast_lock::FastLockGuard, fast_lock::LockResult> {
match self {
Self::Enabled(manager) => manager.acquire_read_lock(bucket, object, owner).await,
Self::Disabled(manager) => manager.acquire_read_lock(bucket, object, owner).await,
}
}
async fn acquire_read_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> std::result::Result<fast_lock::FastLockGuard, fast_lock::LockResult> {
match self {
Self::Enabled(manager) => manager.acquire_read_lock_versioned(bucket, object, version, owner).await,
Self::Disabled(manager) => manager.acquire_read_lock_versioned(bucket, object, version, owner).await,
}
}
async fn acquire_write_lock(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> std::result::Result<fast_lock::FastLockGuard, fast_lock::LockResult> {
match self {
Self::Enabled(manager) => manager.acquire_write_lock(bucket, object, owner).await,
Self::Disabled(manager) => manager.acquire_write_lock(bucket, object, owner).await,
}
}
async fn acquire_write_lock_versioned(
&self,
bucket: impl Into<Arc<str>> + Send,
object: impl Into<Arc<str>> + Send,
version: impl Into<Arc<str>> + Send,
owner: impl Into<Arc<str>> + Send,
) -> std::result::Result<fast_lock::FastLockGuard, fast_lock::LockResult> {
match self {
Self::Enabled(manager) => manager.acquire_write_lock_versioned(bucket, object, version, owner).await,
Self::Disabled(manager) => manager.acquire_write_lock_versioned(bucket, object, version, owner).await,
}
}
async fn acquire_locks_batch(&self, batch_request: fast_lock::BatchLockRequest) -> fast_lock::BatchLockResult {
match self {
Self::Enabled(manager) => manager.acquire_locks_batch(batch_request).await,
Self::Disabled(manager) => manager.acquire_locks_batch(batch_request).await,
}
}
fn get_lock_info(&self, key: &fast_lock::ObjectKey) -> Option<fast_lock::ObjectLockInfo> {
match self {
Self::Enabled(manager) => manager.get_lock_info(key),
Self::Disabled(manager) => manager.get_lock_info(key),
}
}
fn get_metrics(&self) -> AggregatedMetrics {
match self {
Self::Enabled(manager) => manager.get_metrics(),
Self::Disabled(manager) => manager.get_metrics(),
}
}
fn total_lock_count(&self) -> usize {
match self {
Self::Enabled(manager) => manager.total_lock_count(),
Self::Disabled(manager) => manager.total_lock_count(),
}
}
fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> {
match self {
Self::Enabled(manager) => manager.get_pool_stats(),
Self::Disabled(manager) => manager.get_pool_stats(),
}
}
async fn cleanup_expired(&self) -> usize {
match self {
Self::Enabled(manager) => manager.cleanup_expired().await,
Self::Disabled(manager) => manager.cleanup_expired().await,
}
}
async fn cleanup_expired_traditional(&self) -> usize {
match self {
Self::Enabled(manager) => manager.cleanup_expired_traditional().await,
Self::Disabled(manager) => manager.cleanup_expired_traditional().await,
}
}
async fn shutdown(&self) {
match self {
Self::Enabled(manager) => manager.shutdown().await,
Self::Disabled(manager) => manager.shutdown().await,
}
}
fn is_disabled(&self) -> bool {
match self {
Self::Enabled(manager) => manager.is_disabled(),
Self::Disabled(manager) => manager.is_disabled(),
}
}
}
static GLOBAL_LOCK_MANAGER: OnceCell<Arc<GlobalLockManager>> = OnceCell::new();
/// Get the global shared lock manager instance
///
/// Returns either FastObjectLockManager or DisabledLockManager based on
/// the RUSTFS_ENABLE_LOCKS environment variable.
pub fn get_global_lock_manager() -> Arc<GlobalLockManager> {
GLOBAL_LOCK_MANAGER.get_or_init(|| Arc::new(GlobalLockManager::new())).clone()
}
/// Get the global shared FastLock manager instance (legacy)
///
/// This function is deprecated. Use get_global_lock_manager() instead.
/// Returns FastObjectLockManager when locks are enabled, or panics when disabled.
#[deprecated(note = "Use get_global_lock_manager() instead")]
pub fn get_global_fast_lock_manager() -> Arc<fast_lock::FastObjectLockManager> {
GLOBAL_FAST_LOCK_MANAGER
.get_or_init(|| Arc::new(fast_lock::FastObjectLockManager::new()))
.clone()
let manager = get_global_lock_manager();
manager.as_fast_lock_manager().unwrap_or_else(|| {
panic!("Cannot get FastObjectLockManager when locks are disabled. Use get_global_lock_manager() instead.");
})
}
// ============================================================================
@@ -95,3 +280,87 @@ pub fn create_namespace_lock(namespace: String, _distributed: bool) -> Namespace
// This function just creates an empty NamespaceLock
NamespaceLock::new(namespace)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_global_lock_manager_basic() {
let manager = get_global_lock_manager();
// Should be able to acquire locks
let guard = manager.acquire_read_lock("bucket", "object", "owner").await;
assert!(guard.is_ok());
// Test metrics
let _metrics = manager.get_metrics();
// Even if locks are disabled, metrics should be available (empty or real)
// shard_count is usize so always >= 0
}
#[tokio::test]
async fn test_disabled_manager_direct() {
let manager = fast_lock::DisabledLockManager::new();
// All operations should succeed immediately
let guard = manager.acquire_read_lock("bucket", "object", "owner").await;
assert!(guard.is_ok());
assert!(guard.unwrap().is_disabled());
// Metrics should be empty
let metrics = manager.get_metrics();
assert!(metrics.is_empty());
assert_eq!(manager.total_lock_count(), 0);
}
#[tokio::test]
async fn test_enabled_manager_direct() {
let manager = fast_lock::FastObjectLockManager::new();
// Operations should work normally
let guard = manager.acquire_read_lock("bucket", "object", "owner").await;
assert!(guard.is_ok());
assert!(!guard.unwrap().is_disabled());
// Should have real metrics
let _metrics = manager.get_metrics();
// Note: total_lock_count might be > 0 due to previous lock acquisition
}
#[tokio::test]
async fn test_global_manager_enum_wrapper() {
// Test the GlobalLockManager enum directly
let enabled_manager = GlobalLockManager::Enabled(Arc::new(fast_lock::FastObjectLockManager::new()));
let disabled_manager = GlobalLockManager::Disabled(fast_lock::DisabledLockManager::new());
assert!(!enabled_manager.is_disabled());
assert!(disabled_manager.is_disabled());
// Test trait methods work for both
let enabled_guard = enabled_manager.acquire_read_lock("bucket", "obj", "owner").await;
let disabled_guard = disabled_manager.acquire_read_lock("bucket", "obj", "owner").await;
assert!(enabled_guard.is_ok());
assert!(disabled_guard.is_ok());
assert!(!enabled_guard.unwrap().is_disabled());
assert!(disabled_guard.unwrap().is_disabled());
}
#[tokio::test]
async fn test_batch_operations_work() {
let manager = get_global_lock_manager();
let batch = fast_lock::BatchLockRequest::new("owner")
.add_read_lock("bucket", "obj1")
.add_write_lock("bucket", "obj2");
let result = manager.acquire_locks_batch(batch).await;
// Should succeed regardless of whether locks are enabled or disabled
assert!(result.all_acquired);
assert_eq!(result.successful_locks.len(), 2);
assert!(result.failed_locks.is_empty());
}
}

View File

@@ -0,0 +1,118 @@
# RustFS Environment Variables
This document describes the environment variables that can be used to configure RustFS behavior.
## Background Services Control
### RUSTFS_ENABLE_SCANNER
Controls whether the data scanner service should be started.
- **Default**: `true`
- **Valid values**: `true`, `false`
- **Description**: When enabled, the data scanner will run background scans to detect inconsistencies and corruption in stored data.
**Examples**:
```bash
# Disable scanner
export RUSTFS_ENABLE_SCANNER=false
# Enable scanner (default behavior)
export RUSTFS_ENABLE_SCANNER=true
```
### RUSTFS_ENABLE_HEAL
Controls whether the auto-heal service should be started.
- **Default**: `true`
- **Valid values**: `true`, `false`
- **Description**: When enabled, the heal manager will automatically repair detected data inconsistencies and corruption.
**Examples**:
```bash
# Disable auto-heal
export RUSTFS_ENABLE_HEAL=false
# Enable auto-heal (default behavior)
export RUSTFS_ENABLE_HEAL=true
```
### RUSTFS_ENABLE_LOCKS
Controls whether the distributed lock system should be enabled.
- **Default**: `true`
- **Valid values**: `true`, `false`, `1`, `0`, `yes`, `no`, `on`, `off`, `enabled`, `disabled` (case insensitive)
- **Description**: When enabled, provides distributed locking for concurrent object operations. When disabled, all lock operations immediately return success without actual locking.
**Examples**:
```bash
# Disable lock system
export RUSTFS_ENABLE_LOCKS=false
# Enable lock system (default behavior)
export RUSTFS_ENABLE_LOCKS=true
```
## Service Combinations
The scanner and heal services can be independently controlled:
| RUSTFS_ENABLE_SCANNER | RUSTFS_ENABLE_HEAL | Result |
|----------------------|-------------------|--------|
| `true` (default) | `true` (default) | Both scanner and heal are active |
| `true` | `false` | Scanner runs without heal capabilities |
| `false` | `true` | Heal manager is available but no scanning |
| `false` | `false` | No background maintenance services |
## Use Cases
### Development Environment
For development or testing environments where you don't need background maintenance:
```bash
export RUSTFS_ENABLE_SCANNER=false
export RUSTFS_ENABLE_HEAL=false
./rustfs --address 127.0.0.1:9000 ...
```
### Scan-Only Mode
For environments where you want to detect issues but not automatically fix them:
```bash
export RUSTFS_ENABLE_SCANNER=true
export RUSTFS_ENABLE_HEAL=false
./rustfs --address 127.0.0.1:9000 ...
```
### Heal-Only Mode
For environments where external tools trigger healing but no automatic scanning:
```bash
export RUSTFS_ENABLE_SCANNER=false
export RUSTFS_ENABLE_HEAL=true
./rustfs --address 127.0.0.1:9000 ...
```
### Production Environment (Default)
For production environments where both services should be active:
```bash
# These are the defaults, so no need to set explicitly
# export RUSTFS_ENABLE_SCANNER=true
# export RUSTFS_ENABLE_HEAL=true
./rustfs --address 127.0.0.1:9000 ...
```
### No-Lock Development
For single-node development where locking is not needed:
```bash
export RUSTFS_ENABLE_LOCKS=false
./rustfs --address 127.0.0.1:9000 ...
```
## Performance Impact
- **Scanner**: Light to moderate CPU/IO impact during scans
- **Heal**: Moderate to high CPU/IO impact during healing operations
- **Locks**: Minimal CPU/memory overhead for coordination; disabling can improve throughput in single-client scenarios
- **Memory**: Each service uses additional memory for processing queues and metadata
Disabling these services in resource-constrained environments can improve performance for primary storage operations.

View File

@@ -205,11 +205,34 @@ async fn run(opt: config::Opt) -> Result<()> {
// Create a cancellation token for AHM services
let _ = create_ahm_services_cancel_token();
// Initialize heal manager with channel processor
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
let heal_manager = init_heal_manager(heal_storage, None).await?;
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
scanner.start().await?;
// Check environment variables to determine if scanner and heal should be enabled
let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true);
let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true);
info!("Background services configuration: scanner={}, heal={}", enable_scanner, enable_heal);
// Initialize heal manager and scanner based on environment variables
if enable_heal || enable_scanner {
if enable_heal {
// Initialize heal manager with channel processor
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
let heal_manager = init_heal_manager(heal_storage, None).await?;
if enable_scanner {
info!("Starting scanner with heal manager...");
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
scanner.start().await?;
} else {
info!("Scanner disabled, but heal manager is initialized and available");
}
} else if enable_scanner {
info!("Starting scanner without heal manager...");
let scanner = Scanner::new(Some(ScannerConfig::default()), None);
scanner.start().await?;
}
} else {
info!("Both scanner and heal are disabled, skipping AHM service initialization");
}
// print server info
print_server_info();
@@ -266,19 +289,37 @@ async fn run(opt: config::Opt) -> Result<()> {
Ok(())
}
/// Parse a boolean environment variable with default value
///
/// Returns true if the environment variable is not set or set to true/1/yes/on/enabled,
/// false if set to false/0/no/off/disabled
fn parse_bool_env_var(var_name: &str, default: bool) -> bool {
std::env::var(var_name)
.unwrap_or_else(|_| default.to_string())
.parse::<bool>()
.unwrap_or(default)
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!("Shutdown signal received in main thread");
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Stop background services (data scanner and auto heal) gracefully
info!("Stopping background services (data scanner and auto heal)...");
shutdown_background_services();
// Check environment variables to determine what services need to be stopped
let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true);
let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true);
// Stop AHM services gracefully
info!("Stopping AHM services...");
shutdown_ahm_services();
// Stop background services based on what was enabled
if enable_scanner || enable_heal {
info!("Stopping background services (data scanner and auto heal)...");
shutdown_background_services();
info!("Stopping AHM services...");
shutdown_ahm_services();
} else {
info!("Background services were disabled, skipping AHM shutdown");
}
// Stop the notification system
shutdown_event_notifier().await;