diff --git a/crates/lock/examples/environment_control.rs b/crates/lock/examples/environment_control.rs new file mode 100644 index 00000000..25fb0c5a --- /dev/null +++ b/crates/lock/examples/environment_control.rs @@ -0,0 +1,43 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example demonstrating environment variable control of lock system + +use rustfs_lock::{LockManager, get_global_lock_manager}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let manager = get_global_lock_manager(); + + println!("Lock system status: {}", if manager.is_disabled() { "DISABLED" } else { "ENABLED" }); + + match std::env::var("RUSTFS_ENABLE_LOCKS") { + Ok(value) => println!("RUSTFS_ENABLE_LOCKS set to: {}", value), + Err(_) => println!("RUSTFS_ENABLE_LOCKS not set (defaults to enabled)"), + } + + // Test acquiring a lock + let result = manager.acquire_read_lock("test-bucket", "test-object", "test-owner").await; + match result { + Ok(guard) => { + println!("Lock acquired successfully! Disabled: {}", guard.is_disabled()); + } + Err(e) => { + println!("Failed to acquire lock: {:?}", e); + } + } + + println!("Environment control example completed"); + Ok(()) +} diff --git a/crates/lock/src/client/local.rs b/crates/lock/src/client/local.rs index e606dedc..97543b5a 100644 --- a/crates/lock/src/client/local.rs +++ b/crates/lock/src/client/local.rs @@ -17,9 +17,10 @@ use std::sync::Arc; use tokio::sync::RwLock; use crate::{ + GlobalLockManager, client::LockClient, error::Result, - fast_lock::{FastLockGuard, FastObjectLockManager}, + fast_lock::{FastLockGuard, LockManager}, types::{LockId, LockInfo, LockMetadata, LockPriority, LockRequest, LockResponse, LockStats, LockType}, }; @@ -37,9 +38,9 @@ impl LocalClient { } } - /// Get the global fast lock manager - pub fn get_fast_lock_manager(&self) -> Arc { - crate::get_global_fast_lock_manager() + /// Get the global lock manager + pub fn get_lock_manager(&self) -> Arc { + crate::get_global_lock_manager() } } @@ -52,11 +53,11 @@ impl Default for LocalClient { #[async_trait::async_trait] impl LockClient for LocalClient { async fn acquire_exclusive(&self, request: &LockRequest) -> Result { - let fast_lock_manager = self.get_fast_lock_manager(); + let lock_manager = self.get_lock_manager(); let lock_request = crate::fast_lock::ObjectLockRequest::new_write("", request.resource.clone(), request.owner.clone()) .with_acquire_timeout(request.acquire_timeout); - match fast_lock_manager.acquire_lock(lock_request).await { + match lock_manager.acquire_lock(lock_request).await { Ok(guard) => { let lock_id = crate::types::LockId::new_deterministic(&request.resource); @@ -96,11 +97,11 @@ impl LockClient for LocalClient { } async fn acquire_shared(&self, request: &LockRequest) -> Result { - let fast_lock_manager = self.get_fast_lock_manager(); + let lock_manager = self.get_lock_manager(); let lock_request = crate::fast_lock::ObjectLockRequest::new_read("", request.resource.clone(), request.owner.clone()) .with_acquire_timeout(request.acquire_timeout); - match fast_lock_manager.acquire_lock(lock_request).await { + match lock_manager.acquire_lock(lock_request).await { Ok(guard) => { let lock_id = crate::types::LockId::new_deterministic(&request.resource); diff --git a/crates/lock/src/fast_lock/disabled_manager.rs b/crates/lock/src/fast_lock/disabled_manager.rs new file mode 100644 index 00000000..caf4f329 --- /dev/null +++ b/crates/lock/src/fast_lock/disabled_manager.rs @@ -0,0 +1,291 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Disabled lock manager that bypasses all locking operations +//! Used when RUSTFS_ENABLE_LOCKS environment variable is set to false + +use std::sync::Arc; + +use crate::fast_lock::{ + guard::FastLockGuard, + manager_trait::LockManager, + metrics::AggregatedMetrics, + types::{BatchLockRequest, BatchLockResult, LockConfig, LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest}, +}; + +/// Disabled lock manager that always returns success without actual locking +/// +/// This manager is used when locks are disabled via environment variables. +/// All lock operations immediately return success, effectively bypassing +/// the locking mechanism entirely. +#[derive(Debug)] +pub struct DisabledLockManager { + _config: LockConfig, +} + +impl DisabledLockManager { + /// Create new disabled lock manager + pub fn new() -> Self { + Self::with_config(LockConfig::default()) + } + + /// Create new disabled lock manager with custom config + pub fn with_config(config: LockConfig) -> Self { + Self { _config: config } + } + + /// Always succeeds - returns a no-op guard + pub async fn acquire_lock(&self, request: ObjectLockRequest) -> Result { + Ok(FastLockGuard::new_disabled(request.key, request.mode, request.owner)) + } + + /// Always succeeds - returns a no-op guard + pub async fn acquire_read_lock( + &self, + bucket: impl Into>, + object: impl Into>, + owner: impl Into>, + ) -> Result { + let request = ObjectLockRequest::new_read(bucket, object, owner); + self.acquire_lock(request).await + } + + /// Always succeeds - returns a no-op guard + pub async fn acquire_read_lock_versioned( + &self, + bucket: impl Into>, + object: impl Into>, + version: impl Into>, + owner: impl Into>, + ) -> Result { + let request = ObjectLockRequest::new_read(bucket, object, owner).with_version(version); + self.acquire_lock(request).await + } + + /// Always succeeds - returns a no-op guard + pub async fn acquire_write_lock( + &self, + bucket: impl Into>, + object: impl Into>, + owner: impl Into>, + ) -> Result { + let request = ObjectLockRequest::new_write(bucket, object, owner); + self.acquire_lock(request).await + } + + /// Always succeeds - returns a no-op guard + pub async fn acquire_write_lock_versioned( + &self, + bucket: impl Into>, + object: impl Into>, + version: impl Into>, + owner: impl Into>, + ) -> Result { + let request = ObjectLockRequest::new_write(bucket, object, owner).with_version(version); + self.acquire_lock(request).await + } + + /// Always succeeds - all locks acquired + pub async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult { + let successful_locks: Vec = batch_request.requests.into_iter().map(|req| req.key).collect(); + + BatchLockResult { + successful_locks, + failed_locks: Vec::new(), + all_acquired: true, + } + } + + /// Always returns None - no locks to query + pub fn get_lock_info(&self, _key: &ObjectKey) -> Option { + None + } + + /// Returns empty metrics + pub fn get_metrics(&self) -> AggregatedMetrics { + AggregatedMetrics::empty() + } + + /// Always returns 0 - no locks exist + pub fn total_lock_count(&self) -> usize { + 0 + } + + /// Returns empty pool stats + pub fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> { + Vec::new() + } + + /// No-op cleanup - nothing to clean + pub async fn cleanup_expired(&self) -> usize { + 0 + } + + /// No-op cleanup - nothing to clean + pub async fn cleanup_expired_traditional(&self) -> usize { + 0 + } + + /// No-op shutdown + pub async fn shutdown(&self) { + // Nothing to shutdown + } +} + +impl Default for DisabledLockManager { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl LockManager for DisabledLockManager { + async fn acquire_lock(&self, request: ObjectLockRequest) -> Result { + self.acquire_lock(request).await + } + + async fn acquire_read_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_read_lock(bucket, object, owner).await + } + + async fn acquire_read_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_read_lock_versioned(bucket, object, version, owner).await + } + + async fn acquire_write_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_write_lock(bucket, object, owner).await + } + + async fn acquire_write_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_write_lock_versioned(bucket, object, version, owner).await + } + + async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult { + self.acquire_locks_batch(batch_request).await + } + + fn get_lock_info(&self, key: &ObjectKey) -> Option { + self.get_lock_info(key) + } + + fn get_metrics(&self) -> AggregatedMetrics { + self.get_metrics() + } + + fn total_lock_count(&self) -> usize { + self.total_lock_count() + } + + fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> { + self.get_pool_stats() + } + + async fn cleanup_expired(&self) -> usize { + self.cleanup_expired().await + } + + async fn cleanup_expired_traditional(&self) -> usize { + self.cleanup_expired_traditional().await + } + + async fn shutdown(&self) { + self.shutdown().await + } + + fn is_disabled(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_disabled_manager_basic_operations() { + let manager = DisabledLockManager::new(); + + // All operations should succeed immediately + let read_guard = manager + .acquire_read_lock("bucket", "object", "owner1") + .await + .expect("Disabled manager should always succeed"); + + let write_guard = manager + .acquire_write_lock("bucket", "object", "owner2") + .await + .expect("Disabled manager should always succeed"); + + // Guards should indicate they are disabled + assert!(read_guard.is_disabled()); + assert!(write_guard.is_disabled()); + } + + #[tokio::test] + async fn test_disabled_manager_batch_operations() { + let manager = DisabledLockManager::new(); + + let batch = BatchLockRequest::new("owner") + .add_read_lock("bucket", "obj1") + .add_write_lock("bucket", "obj2") + .with_all_or_nothing(true); + + let result = manager.acquire_locks_batch(batch).await; + assert!(result.all_acquired); + assert_eq!(result.successful_locks.len(), 2); + assert!(result.failed_locks.is_empty()); + } + + #[tokio::test] + async fn test_disabled_manager_metrics() { + let manager = DisabledLockManager::new(); + + // Metrics should indicate empty/disabled state + let metrics = manager.get_metrics(); + assert!(metrics.is_empty()); + assert_eq!(manager.total_lock_count(), 0); + assert!(manager.get_pool_stats().is_empty()); + } + + #[tokio::test] + async fn test_disabled_manager_cleanup() { + let manager = DisabledLockManager::new(); + + // Cleanup should be no-op + assert_eq!(manager.cleanup_expired().await, 0); + assert_eq!(manager.cleanup_expired_traditional().await, 0); + } +} diff --git a/crates/lock/src/fast_lock/guard.rs b/crates/lock/src/fast_lock/guard.rs index f535ae15..1d67ca5c 100644 --- a/crates/lock/src/fast_lock/guard.rs +++ b/crates/lock/src/fast_lock/guard.rs @@ -26,8 +26,9 @@ pub struct FastLockGuard { key: ObjectKey, mode: LockMode, owner: Arc, - shard: Arc, + shard: Option>, // None when locks are disabled released: bool, + disabled: bool, // True when locks are disabled globally } impl FastLockGuard { @@ -36,8 +37,21 @@ impl FastLockGuard { key, mode, owner, - shard, + shard: Some(shard), released: false, + disabled: false, + } + } + + /// Create a disabled guard (when locks are globally disabled) + pub(crate) fn new_disabled(key: ObjectKey, mode: LockMode, owner: Arc) -> Self { + Self { + key, + mode, + owner, + shard: None, + released: false, + disabled: true, } } @@ -65,11 +79,23 @@ impl FastLockGuard { return false; } - let success = self.shard.release_lock(&self.key, &self.owner, self.mode); - if success { + if self.disabled { + // For disabled locks, always succeed self.released = true; + return true; + } + + if let Some(shard) = &self.shard { + let success = shard.release_lock(&self.key, &self.owner, self.mode); + if success { + self.released = true; + } + success + } else { + // Should not happen, but handle gracefully + self.released = true; + false } - success } /// Check if the lock has been released @@ -77,27 +103,36 @@ impl FastLockGuard { self.released } + /// Check if this guard represents a disabled lock + pub fn is_disabled(&self) -> bool { + self.disabled + } + /// Get lock information for monitoring pub fn lock_info(&self) -> Option { - if self.released { + if self.released || self.disabled { None + } else if let Some(shard) = &self.shard { + shard.get_lock_info(&self.key) } else { - self.shard.get_lock_info(&self.key) + None } } } impl Drop for FastLockGuard { fn drop(&mut self) { - if !self.released { - let success = self.shard.release_lock(&self.key, &self.owner, self.mode); - if !success { - tracing::warn!( - "Failed to release lock during drop: key={}, owner={}, mode={:?}", - self.key, - self.owner, - self.mode - ); + if !self.released && !self.disabled { + if let Some(shard) = &self.shard { + let success = shard.release_lock(&self.key, &self.owner, self.mode); + if !success { + tracing::warn!( + "Failed to release lock during drop: key={}, owner={}, mode={:?}", + self.key, + self.owner, + self.mode + ); + } } } } @@ -110,6 +145,7 @@ impl std::fmt::Debug for FastLockGuard { .field("mode", &self.mode) .field("owner", &self.owner) .field("released", &self.released) + .field("disabled", &self.disabled) .finish() } } diff --git a/crates/lock/src/fast_lock/manager.rs b/crates/lock/src/fast_lock/manager.rs index d7152d3c..6be0cbe7 100644 --- a/crates/lock/src/fast_lock/manager.rs +++ b/crates/lock/src/fast_lock/manager.rs @@ -18,9 +18,10 @@ use tokio::time::{Instant, interval}; use crate::fast_lock::{ guard::FastLockGuard, - metrics::GlobalMetrics, + manager_trait::LockManager, + metrics::{AggregatedMetrics, GlobalMetrics}, shard::LockShard, - types::{BatchLockRequest, BatchLockResult, LockConfig, LockResult, ObjectLockRequest}, + types::{BatchLockRequest, BatchLockResult, LockConfig, LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest}, }; /// High-performance object lock manager @@ -361,6 +362,87 @@ impl Drop for FastObjectLockManager { } } +#[async_trait::async_trait] +impl LockManager for FastObjectLockManager { + async fn acquire_lock(&self, request: ObjectLockRequest) -> Result { + self.acquire_lock(request).await + } + + async fn acquire_read_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_read_lock(bucket, object, owner).await + } + + async fn acquire_read_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_read_lock_versioned(bucket, object, version, owner).await + } + + async fn acquire_write_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_write_lock(bucket, object, owner).await + } + + async fn acquire_write_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result { + self.acquire_write_lock_versioned(bucket, object, version, owner).await + } + + async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult { + self.acquire_locks_batch(batch_request).await + } + + fn get_lock_info(&self, key: &ObjectKey) -> Option { + self.get_lock_info(key) + } + + fn get_metrics(&self) -> AggregatedMetrics { + self.get_metrics() + } + + fn total_lock_count(&self) -> usize { + self.total_lock_count() + } + + fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> { + self.get_pool_stats() + } + + async fn cleanup_expired(&self) -> usize { + self.cleanup_expired().await + } + + async fn cleanup_expired_traditional(&self) -> usize { + self.cleanup_expired_traditional().await + } + + async fn shutdown(&self) { + self.shutdown().await + } + + fn is_disabled(&self) -> bool { + false + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/lock/src/fast_lock/manager_trait.rs b/crates/lock/src/fast_lock/manager_trait.rs new file mode 100644 index 00000000..51de83b5 --- /dev/null +++ b/crates/lock/src/fast_lock/manager_trait.rs @@ -0,0 +1,93 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Unified trait for lock managers (enabled and disabled) + +use crate::fast_lock::{ + guard::FastLockGuard, + metrics::AggregatedMetrics, + types::{BatchLockRequest, BatchLockResult, LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest}, +}; +use std::sync::Arc; + +/// Unified trait for lock managers +/// +/// This trait allows transparent switching between enabled and disabled lock managers +/// based on environment variables. +#[async_trait::async_trait] +pub trait LockManager: Send + Sync { + /// Acquire object lock + async fn acquire_lock(&self, request: ObjectLockRequest) -> Result; + + /// Acquire shared (read) lock + async fn acquire_read_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result; + + /// Acquire shared (read) lock for specific version + async fn acquire_read_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result; + + /// Acquire exclusive (write) lock + async fn acquire_write_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result; + + /// Acquire exclusive (write) lock for specific version + async fn acquire_write_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> Result; + + /// Acquire multiple locks atomically + async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult; + + /// Get lock information for monitoring + fn get_lock_info(&self, key: &ObjectKey) -> Option; + + /// Get aggregated metrics + fn get_metrics(&self) -> AggregatedMetrics; + + /// Get total number of active locks across all shards + fn total_lock_count(&self) -> usize; + + /// Get pool statistics from all shards + fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)>; + + /// Force cleanup of expired locks + async fn cleanup_expired(&self) -> usize; + + /// Force cleanup with traditional strategy + async fn cleanup_expired_traditional(&self) -> usize; + + /// Shutdown the lock manager and cleanup resources + async fn shutdown(&self); + + /// Check if this manager is disabled + fn is_disabled(&self) -> bool; +} diff --git a/crates/lock/src/fast_lock/metrics.rs b/crates/lock/src/fast_lock/metrics.rs index 623a5034..03a5940f 100644 --- a/crates/lock/src/fast_lock/metrics.rs +++ b/crates/lock/src/fast_lock/metrics.rs @@ -142,6 +142,20 @@ pub struct MetricsSnapshot { } impl MetricsSnapshot { + /// Create empty snapshot (for disabled lock manager) + pub fn empty() -> Self { + Self { + fast_path_success: 0, + slow_path_success: 0, + timeouts: 0, + releases: 0, + cleanups: 0, + contention_events: 0, + total_wait_time_ns: 0, + max_wait_time_ns: 0, + } + } + pub fn total_acquisitions(&self) -> u64 { self.fast_path_success + self.slow_path_success } @@ -251,6 +265,22 @@ pub struct AggregatedMetrics { } impl AggregatedMetrics { + /// Create empty metrics (for disabled lock manager) + pub fn empty() -> Self { + Self { + shard_metrics: MetricsSnapshot::empty(), + shard_count: 0, + uptime: Duration::ZERO, + cleanup_runs: 0, + total_objects_cleaned: 0, + } + } + + /// Check if metrics are empty (indicates disabled or no activity) + pub fn is_empty(&self) -> bool { + self.shard_count == 0 && self.shard_metrics.total_acquisitions() == 0 && self.shard_metrics.releases == 0 + } + /// Get operations per second pub fn ops_per_second(&self) -> f64 { let total_ops = self.shard_metrics.total_acquisitions() + self.shard_metrics.releases; diff --git a/crates/lock/src/fast_lock/mod.rs b/crates/lock/src/fast_lock/mod.rs index 5e5fd7d6..8c3947f3 100644 --- a/crates/lock/src/fast_lock/mod.rs +++ b/crates/lock/src/fast_lock/mod.rs @@ -24,10 +24,12 @@ //! 4. **Async Optimized** - True async locks that avoid thread blocking //! 5. **Auto Cleanup** - Access-time based automatic lock reclamation +pub mod disabled_manager; pub mod guard; pub mod integration_example; pub mod integration_test; pub mod manager; +pub mod manager_trait; pub mod metrics; pub mod object_pool; pub mod optimized_notify; @@ -39,8 +41,10 @@ pub mod types; // pub mod benchmarks; // Temporarily disabled due to compilation issues // Re-export main types +pub use disabled_manager::DisabledLockManager; pub use guard::FastLockGuard; pub use manager::FastObjectLockManager; +pub use manager_trait::LockManager; pub use types::*; /// Default shard count (must be power of 2) diff --git a/crates/lock/src/lib.rs b/crates/lock/src/lib.rs index cc709b4e..aacecd38 100644 --- a/crates/lock/src/lib.rs +++ b/crates/lock/src/lib.rs @@ -42,8 +42,8 @@ pub use crate::{ error::{LockError, Result}, // Fast Lock System exports fast_lock::{ - BatchLockRequest, BatchLockResult, FastLockGuard, FastObjectLockManager, LockMode, LockResult, ObjectKey, - ObjectLockRequest, + BatchLockRequest, BatchLockResult, DisabledLockManager, FastLockGuard, FastObjectLockManager, LockManager, LockMode, + LockResult, ObjectKey, ObjectLockInfo, ObjectLockRequest, metrics::AggregatedMetrics, }, guard::LockGuard, // Main components @@ -76,13 +76,198 @@ pub const MAX_DELETE_LIST: usize = 1000; use once_cell::sync::OnceCell; use std::sync::Arc; -static GLOBAL_FAST_LOCK_MANAGER: OnceCell> = OnceCell::new(); +/// Enum wrapper for different lock manager implementations +pub enum GlobalLockManager { + Enabled(Arc), + Disabled(fast_lock::DisabledLockManager), +} -/// Get the global shared FastLock manager instance +impl Default for GlobalLockManager { + fn default() -> Self { + Self::new() + } +} + +impl GlobalLockManager { + /// Create a lock manager based on environment variable configuration + pub fn new() -> Self { + // Check RUSTFS_ENABLE_LOCKS environment variable + let locks_enabled = std::env::var("RUSTFS_ENABLE_LOCKS") + .unwrap_or_else(|_| "true".to_string()) + .to_lowercase(); + + match locks_enabled.as_str() { + "false" | "0" | "no" | "off" | "disabled" => { + tracing::info!("Lock system disabled via RUSTFS_ENABLE_LOCKS environment variable"); + Self::Disabled(fast_lock::DisabledLockManager::new()) + } + _ => { + tracing::info!("Lock system enabled"); + Self::Enabled(Arc::new(fast_lock::FastObjectLockManager::new())) + } + } + } + + /// Check if the lock manager is disabled + pub fn is_disabled(&self) -> bool { + matches!(self, Self::Disabled(_)) + } + + /// Get the FastObjectLockManager if enabled, otherwise returns None + pub fn as_fast_lock_manager(&self) -> Option> { + match self { + Self::Enabled(manager) => Some(manager.clone()), + Self::Disabled(_) => None, + } + } +} + +#[async_trait::async_trait] +impl fast_lock::LockManager for GlobalLockManager { + async fn acquire_lock( + &self, + request: fast_lock::ObjectLockRequest, + ) -> std::result::Result { + match self { + Self::Enabled(manager) => manager.acquire_lock(request).await, + Self::Disabled(manager) => manager.acquire_lock(request).await, + } + } + + async fn acquire_read_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> std::result::Result { + match self { + Self::Enabled(manager) => manager.acquire_read_lock(bucket, object, owner).await, + Self::Disabled(manager) => manager.acquire_read_lock(bucket, object, owner).await, + } + } + + async fn acquire_read_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> std::result::Result { + match self { + Self::Enabled(manager) => manager.acquire_read_lock_versioned(bucket, object, version, owner).await, + Self::Disabled(manager) => manager.acquire_read_lock_versioned(bucket, object, version, owner).await, + } + } + + async fn acquire_write_lock( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + owner: impl Into> + Send, + ) -> std::result::Result { + match self { + Self::Enabled(manager) => manager.acquire_write_lock(bucket, object, owner).await, + Self::Disabled(manager) => manager.acquire_write_lock(bucket, object, owner).await, + } + } + + async fn acquire_write_lock_versioned( + &self, + bucket: impl Into> + Send, + object: impl Into> + Send, + version: impl Into> + Send, + owner: impl Into> + Send, + ) -> std::result::Result { + match self { + Self::Enabled(manager) => manager.acquire_write_lock_versioned(bucket, object, version, owner).await, + Self::Disabled(manager) => manager.acquire_write_lock_versioned(bucket, object, version, owner).await, + } + } + + async fn acquire_locks_batch(&self, batch_request: fast_lock::BatchLockRequest) -> fast_lock::BatchLockResult { + match self { + Self::Enabled(manager) => manager.acquire_locks_batch(batch_request).await, + Self::Disabled(manager) => manager.acquire_locks_batch(batch_request).await, + } + } + + fn get_lock_info(&self, key: &fast_lock::ObjectKey) -> Option { + match self { + Self::Enabled(manager) => manager.get_lock_info(key), + Self::Disabled(manager) => manager.get_lock_info(key), + } + } + + fn get_metrics(&self) -> AggregatedMetrics { + match self { + Self::Enabled(manager) => manager.get_metrics(), + Self::Disabled(manager) => manager.get_metrics(), + } + } + + fn total_lock_count(&self) -> usize { + match self { + Self::Enabled(manager) => manager.total_lock_count(), + Self::Disabled(manager) => manager.total_lock_count(), + } + } + + fn get_pool_stats(&self) -> Vec<(u64, u64, u64, usize)> { + match self { + Self::Enabled(manager) => manager.get_pool_stats(), + Self::Disabled(manager) => manager.get_pool_stats(), + } + } + + async fn cleanup_expired(&self) -> usize { + match self { + Self::Enabled(manager) => manager.cleanup_expired().await, + Self::Disabled(manager) => manager.cleanup_expired().await, + } + } + + async fn cleanup_expired_traditional(&self) -> usize { + match self { + Self::Enabled(manager) => manager.cleanup_expired_traditional().await, + Self::Disabled(manager) => manager.cleanup_expired_traditional().await, + } + } + + async fn shutdown(&self) { + match self { + Self::Enabled(manager) => manager.shutdown().await, + Self::Disabled(manager) => manager.shutdown().await, + } + } + + fn is_disabled(&self) -> bool { + match self { + Self::Enabled(manager) => manager.is_disabled(), + Self::Disabled(manager) => manager.is_disabled(), + } + } +} + +static GLOBAL_LOCK_MANAGER: OnceCell> = OnceCell::new(); + +/// Get the global shared lock manager instance +/// +/// Returns either FastObjectLockManager or DisabledLockManager based on +/// the RUSTFS_ENABLE_LOCKS environment variable. +pub fn get_global_lock_manager() -> Arc { + GLOBAL_LOCK_MANAGER.get_or_init(|| Arc::new(GlobalLockManager::new())).clone() +} + +/// Get the global shared FastLock manager instance (legacy) +/// +/// This function is deprecated. Use get_global_lock_manager() instead. +/// Returns FastObjectLockManager when locks are enabled, or panics when disabled. +#[deprecated(note = "Use get_global_lock_manager() instead")] pub fn get_global_fast_lock_manager() -> Arc { - GLOBAL_FAST_LOCK_MANAGER - .get_or_init(|| Arc::new(fast_lock::FastObjectLockManager::new())) - .clone() + let manager = get_global_lock_manager(); + manager.as_fast_lock_manager().unwrap_or_else(|| { + panic!("Cannot get FastObjectLockManager when locks are disabled. Use get_global_lock_manager() instead."); + }) } // ============================================================================ @@ -95,3 +280,87 @@ pub fn create_namespace_lock(namespace: String, _distributed: bool) -> Namespace // This function just creates an empty NamespaceLock NamespaceLock::new(namespace) } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_global_lock_manager_basic() { + let manager = get_global_lock_manager(); + + // Should be able to acquire locks + let guard = manager.acquire_read_lock("bucket", "object", "owner").await; + assert!(guard.is_ok()); + + // Test metrics + let _metrics = manager.get_metrics(); + // Even if locks are disabled, metrics should be available (empty or real) + // shard_count is usize so always >= 0 + } + + #[tokio::test] + async fn test_disabled_manager_direct() { + let manager = fast_lock::DisabledLockManager::new(); + + // All operations should succeed immediately + let guard = manager.acquire_read_lock("bucket", "object", "owner").await; + assert!(guard.is_ok()); + assert!(guard.unwrap().is_disabled()); + + // Metrics should be empty + let metrics = manager.get_metrics(); + assert!(metrics.is_empty()); + assert_eq!(manager.total_lock_count(), 0); + } + + #[tokio::test] + async fn test_enabled_manager_direct() { + let manager = fast_lock::FastObjectLockManager::new(); + + // Operations should work normally + let guard = manager.acquire_read_lock("bucket", "object", "owner").await; + assert!(guard.is_ok()); + assert!(!guard.unwrap().is_disabled()); + + // Should have real metrics + let _metrics = manager.get_metrics(); + // Note: total_lock_count might be > 0 due to previous lock acquisition + } + + #[tokio::test] + async fn test_global_manager_enum_wrapper() { + // Test the GlobalLockManager enum directly + let enabled_manager = GlobalLockManager::Enabled(Arc::new(fast_lock::FastObjectLockManager::new())); + let disabled_manager = GlobalLockManager::Disabled(fast_lock::DisabledLockManager::new()); + + assert!(!enabled_manager.is_disabled()); + assert!(disabled_manager.is_disabled()); + + // Test trait methods work for both + let enabled_guard = enabled_manager.acquire_read_lock("bucket", "obj", "owner").await; + let disabled_guard = disabled_manager.acquire_read_lock("bucket", "obj", "owner").await; + + assert!(enabled_guard.is_ok()); + assert!(disabled_guard.is_ok()); + + assert!(!enabled_guard.unwrap().is_disabled()); + assert!(disabled_guard.unwrap().is_disabled()); + } + + #[tokio::test] + async fn test_batch_operations_work() { + let manager = get_global_lock_manager(); + + let batch = fast_lock::BatchLockRequest::new("owner") + .add_read_lock("bucket", "obj1") + .add_write_lock("bucket", "obj2"); + + let result = manager.acquire_locks_batch(batch).await; + + // Should succeed regardless of whether locks are enabled or disabled + assert!(result.all_acquired); + assert_eq!(result.successful_locks.len(), 2); + assert!(result.failed_locks.is_empty()); + } +} diff --git a/docs/ENVIRONMENT_VARIABLES.md b/docs/ENVIRONMENT_VARIABLES.md new file mode 100644 index 00000000..aee3fc4b --- /dev/null +++ b/docs/ENVIRONMENT_VARIABLES.md @@ -0,0 +1,118 @@ +# RustFS Environment Variables + +This document describes the environment variables that can be used to configure RustFS behavior. + +## Background Services Control + +### RUSTFS_ENABLE_SCANNER + +Controls whether the data scanner service should be started. + +- **Default**: `true` +- **Valid values**: `true`, `false` +- **Description**: When enabled, the data scanner will run background scans to detect inconsistencies and corruption in stored data. + +**Examples**: +```bash +# Disable scanner +export RUSTFS_ENABLE_SCANNER=false + +# Enable scanner (default behavior) +export RUSTFS_ENABLE_SCANNER=true +``` + +### RUSTFS_ENABLE_HEAL + +Controls whether the auto-heal service should be started. + +- **Default**: `true` +- **Valid values**: `true`, `false` +- **Description**: When enabled, the heal manager will automatically repair detected data inconsistencies and corruption. + +**Examples**: +```bash +# Disable auto-heal +export RUSTFS_ENABLE_HEAL=false + +# Enable auto-heal (default behavior) +export RUSTFS_ENABLE_HEAL=true +``` + +### RUSTFS_ENABLE_LOCKS + +Controls whether the distributed lock system should be enabled. + +- **Default**: `true` +- **Valid values**: `true`, `false`, `1`, `0`, `yes`, `no`, `on`, `off`, `enabled`, `disabled` (case insensitive) +- **Description**: When enabled, provides distributed locking for concurrent object operations. When disabled, all lock operations immediately return success without actual locking. + +**Examples**: +```bash +# Disable lock system +export RUSTFS_ENABLE_LOCKS=false + +# Enable lock system (default behavior) +export RUSTFS_ENABLE_LOCKS=true +``` + +## Service Combinations + +The scanner and heal services can be independently controlled: + +| RUSTFS_ENABLE_SCANNER | RUSTFS_ENABLE_HEAL | Result | +|----------------------|-------------------|--------| +| `true` (default) | `true` (default) | Both scanner and heal are active | +| `true` | `false` | Scanner runs without heal capabilities | +| `false` | `true` | Heal manager is available but no scanning | +| `false` | `false` | No background maintenance services | + +## Use Cases + +### Development Environment +For development or testing environments where you don't need background maintenance: +```bash +export RUSTFS_ENABLE_SCANNER=false +export RUSTFS_ENABLE_HEAL=false +./rustfs --address 127.0.0.1:9000 ... +``` + +### Scan-Only Mode +For environments where you want to detect issues but not automatically fix them: +```bash +export RUSTFS_ENABLE_SCANNER=true +export RUSTFS_ENABLE_HEAL=false +./rustfs --address 127.0.0.1:9000 ... +``` + +### Heal-Only Mode +For environments where external tools trigger healing but no automatic scanning: +```bash +export RUSTFS_ENABLE_SCANNER=false +export RUSTFS_ENABLE_HEAL=true +./rustfs --address 127.0.0.1:9000 ... +``` + +### Production Environment (Default) +For production environments where both services should be active: +```bash +# These are the defaults, so no need to set explicitly +# export RUSTFS_ENABLE_SCANNER=true +# export RUSTFS_ENABLE_HEAL=true +./rustfs --address 127.0.0.1:9000 ... +``` + +### No-Lock Development +For single-node development where locking is not needed: +```bash +export RUSTFS_ENABLE_LOCKS=false +./rustfs --address 127.0.0.1:9000 ... +``` + +## Performance Impact + +- **Scanner**: Light to moderate CPU/IO impact during scans +- **Heal**: Moderate to high CPU/IO impact during healing operations +- **Locks**: Minimal CPU/memory overhead for coordination; disabling can improve throughput in single-client scenarios +- **Memory**: Each service uses additional memory for processing queues and metadata + +Disabling these services in resource-constrained environments can improve performance for primary storage operations. \ No newline at end of file diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 5bcc56e0..7b63c803 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -205,11 +205,34 @@ async fn run(opt: config::Opt) -> Result<()> { // Create a cancellation token for AHM services let _ = create_ahm_services_cancel_token(); - // Initialize heal manager with channel processor - let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone())); - let heal_manager = init_heal_manager(heal_storage, None).await?; - let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager)); - scanner.start().await?; + // Check environment variables to determine if scanner and heal should be enabled + let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true); + let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true); + + info!("Background services configuration: scanner={}, heal={}", enable_scanner, enable_heal); + + // Initialize heal manager and scanner based on environment variables + if enable_heal || enable_scanner { + if enable_heal { + // Initialize heal manager with channel processor + let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone())); + let heal_manager = init_heal_manager(heal_storage, None).await?; + + if enable_scanner { + info!("Starting scanner with heal manager..."); + let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager)); + scanner.start().await?; + } else { + info!("Scanner disabled, but heal manager is initialized and available"); + } + } else if enable_scanner { + info!("Starting scanner without heal manager..."); + let scanner = Scanner::new(Some(ScannerConfig::default()), None); + scanner.start().await?; + } + } else { + info!("Both scanner and heal are disabled, skipping AHM service initialization"); + } // print server info print_server_info(); @@ -266,19 +289,37 @@ async fn run(opt: config::Opt) -> Result<()> { Ok(()) } +/// Parse a boolean environment variable with default value +/// +/// Returns true if the environment variable is not set or set to true/1/yes/on/enabled, +/// false if set to false/0/no/off/disabled +fn parse_bool_env_var(var_name: &str, default: bool) -> bool { + std::env::var(var_name) + .unwrap_or_else(|_| default.to_string()) + .parse::() + .unwrap_or(default) +} + /// Handles the shutdown process of the server async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) { info!("Shutdown signal received in main thread"); // update the status to stopping first state_manager.update(ServiceState::Stopping); - // Stop background services (data scanner and auto heal) gracefully - info!("Stopping background services (data scanner and auto heal)..."); - shutdown_background_services(); + // Check environment variables to determine what services need to be stopped + let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true); + let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true); - // Stop AHM services gracefully - info!("Stopping AHM services..."); - shutdown_ahm_services(); + // Stop background services based on what was enabled + if enable_scanner || enable_heal { + info!("Stopping background services (data scanner and auto heal)..."); + shutdown_background_services(); + + info!("Stopping AHM services..."); + shutdown_ahm_services(); + } else { + info!("Background services were disabled, skipping AHM shutdown"); + } // Stop the notification system shutdown_event_notifier().await;