diff --git a/rustfs/src/storage/concurrency.rs b/rustfs/src/storage/concurrency.rs index 418d26f0..331bd890 100644 --- a/rustfs/src/storage/concurrency.rs +++ b/rustfs/src/storage/concurrency.rs @@ -160,6 +160,55 @@ pub fn get_concurrency_aware_buffer_size(file_size: i64, base_buffer_size: usize adjusted_size.clamp(min_buffer, max_buffer) } +/// Advanced concurrency-aware buffer sizing with file size optimization +/// +/// This enhanced version considers both concurrency level and file size patterns +/// to provide even better performance characteristics. +/// +/// # Arguments +/// +/// * `file_size` - The size of the file being read, or -1 if unknown +/// * `base_buffer_size` - The baseline buffer size from workload profile +/// * `is_sequential` - Whether this is a sequential read (hint for optimization) +/// +/// # Returns +/// +/// Optimized buffer size in bytes +/// +/// # Examples +/// +/// ```ignore +/// let buffer_size = get_advanced_buffer_size( +/// 32 * 1024 * 1024, // 32MB file +/// 256 * 1024, // 256KB base buffer +/// true // sequential read +/// ); +/// ``` +pub fn get_advanced_buffer_size(file_size: i64, base_buffer_size: usize, is_sequential: bool) -> usize { + let concurrent_requests = ACTIVE_GET_REQUESTS.load(Ordering::Relaxed); + + // For very small files, use smaller buffers regardless of concurrency + if file_size > 0 && file_size < 256 * KI_B as i64 { + return (file_size as usize / 4).max(16 * KI_B).min(64 * KI_B); + } + + // Base calculation from standard function + let standard_size = get_concurrency_aware_buffer_size(file_size, base_buffer_size); + + // For sequential reads, we can be more aggressive with buffer sizes + if is_sequential && concurrent_requests <= MEDIUM_CONCURRENCY_THRESHOLD { + return ((standard_size as f64 * 1.5) as usize).min(2 * MI_B); + } + + // For high concurrency with large files, optimize for parallel processing + if concurrent_requests > HIGH_CONCURRENCY_THRESHOLD && file_size > 10 * MI_B as i64 { + // Use smaller, more numerous buffers for better parallelism + return (standard_size as f64 * 0.8) as usize; + } + + standard_size +} + /// Simple LRU cache for hot objects /// /// This cache stores frequently accessed small objects (<= 10MB) to reduce @@ -200,20 +249,33 @@ impl HotObjectCache { } } - /// Try to get an object from cache + /// Try to get an object from cache with optimized read-first pattern + /// + /// Uses `peek()` for initial lookup to avoid write lock contention when possible. + /// This significantly improves concurrent read performance. async fn get(&self, key: &str) -> Option>> { - let mut cache = self.cache.write().await; - - if let Some(cached) = cache.get(key) { - cached.hit_count.fetch_add(1, Ordering::Relaxed); - - #[cfg(feature = "metrics")] - { - use metrics::counter; - counter!("rustfs_object_cache_hits").increment(1); + // First, try with read lock using peek (doesn't promote in LRU) + { + let cache = self.cache.read().await; + if let Some(cached) = cache.peek(key) { + // Clone the data reference while holding read lock + let data = Arc::clone(&cached.data); + drop(cache); + + // Now acquire write lock to promote in LRU and update stats + let mut cache_write = self.cache.write().await; + if let Some(cached) = cache_write.get(key) { + cached.hit_count.fetch_add(1, Ordering::Relaxed); + + #[cfg(feature = "metrics")] + { + use metrics::counter; + counter!("rustfs_object_cache_hits").increment(1); + } + + return Some(data); + } } - - return Some(Arc::clone(&cached.data)); } #[cfg(feature = "metrics")] @@ -284,6 +346,73 @@ impl HotObjectCache { max_object_size: self.max_object_size, } } + + /// Check if a key exists in cache without promoting it in LRU + async fn contains(&self, key: &str) -> bool { + let cache = self.cache.read().await; + cache.peek(key).is_some() + } + + /// Get multiple objects from cache in a single operation + /// + /// This is more efficient than calling get() multiple times as it acquires + /// the lock only once for all operations. + async fn get_batch(&self, keys: &[String]) -> Vec>>> { + let mut cache = self.cache.write().await; + let mut results = Vec::with_capacity(keys.len()); + + for key in keys { + if let Some(cached) = cache.get(key) { + cached.hit_count.fetch_add(1, Ordering::Relaxed); + results.push(Some(Arc::clone(&cached.data))); + + #[cfg(feature = "metrics")] + { + use metrics::counter; + counter!("rustfs_object_cache_hits").increment(1); + } + } else { + results.push(None); + + #[cfg(feature = "metrics")] + { + use metrics::counter; + counter!("rustfs_object_cache_misses").increment(1); + } + } + } + + results + } + + /// Remove a specific key from cache + /// + /// Returns true if the key was found and removed, false otherwise. + async fn remove(&self, key: &str) -> bool { + let mut cache = self.cache.write().await; + if let Some((_, cached)) = cache.pop(key) { + let current = self.current_size.load(Ordering::Relaxed); + self.current_size.store(current.saturating_sub(cached.size), Ordering::Relaxed); + true + } else { + false + } + } + + /// Get the most frequently accessed keys + /// + /// Returns up to `limit` keys sorted by hit count in descending order. + async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> { + let cache = self.cache.read().await; + let mut keys_with_hits: Vec<(String, usize)> = cache + .iter() + .map(|(k, v)| (k.clone(), v.hit_count.load(Ordering::Relaxed))) + .collect(); + + keys_with_hits.sort_by(|a, b| b.1.cmp(&a.1)); + keys_with_hits.truncate(limit); + keys_with_hits + } } /// Cache statistics @@ -362,6 +491,44 @@ impl ConcurrencyManager { pub async fn clear_cache(&self) { self.cache.clear().await } + + /// Check if a key exists in cache + /// + /// This is a lightweight check that doesn't promote the key in LRU order. + pub async fn is_cached(&self, key: &str) -> bool { + self.cache.contains(key).await + } + + /// Get multiple objects from cache in batch + /// + /// More efficient than individual get_cached() calls when fetching multiple objects. + pub async fn get_cached_batch(&self, keys: &[String]) -> Vec>>> { + self.cache.get_batch(keys).await + } + + /// Remove a specific object from cache + /// + /// Returns true if the object was cached and removed. + pub async fn remove_cached(&self, key: &str) -> bool { + self.cache.remove(key).await + } + + /// Get the most frequently accessed keys + /// + /// Useful for identifying hot objects and optimizing cache strategies. + pub async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> { + self.cache.get_hot_keys(limit).await + } + + /// Warm up cache with frequently accessed objects + /// + /// This can be called during server startup or maintenance windows + /// to pre-populate the cache with known hot objects. + pub async fn warm_cache(&self, objects: Vec<(String, Vec)>) { + for (key, data) in objects { + self.cache_object(key, data).await; + } + } } impl Default for ConcurrencyManager { diff --git a/rustfs/src/storage/concurrent_get_object_test.rs b/rustfs/src/storage/concurrent_get_object_test.rs index ae6ccfb8..511709cd 100644 --- a/rustfs/src/storage/concurrent_get_object_test.rs +++ b/rustfs/src/storage/concurrent_get_object_test.rs @@ -13,11 +13,22 @@ // limitations under the License. //! Integration tests for concurrent GetObject performance optimization +//! +//! This test module validates the concurrency management features including: +//! - Request tracking and RAII guards +//! - Adaptive buffer sizing based on concurrent load +//! - LRU cache operations and eviction +//! - Batch operations and cache warming +//! - Hot key tracking and analysis #[cfg(test)] mod tests { - use crate::storage::concurrency::{ConcurrencyManager, GetObjectGuard, get_concurrency_aware_buffer_size}; - use rustfs_config::MI_B; + use crate::storage::concurrency::{ + ConcurrencyManager, GetObjectGuard, + get_concurrency_aware_buffer_size, get_advanced_buffer_size + }; + use rustfs_config::{KI_B, MI_B}; + use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; @@ -273,4 +284,232 @@ mod tests { let recent = manager.get_cached(&recent_key).await; assert!(recent.is_some(), "Recent object should still be cached"); } + + /// Test batch cache operations + #[tokio::test] + async fn test_cache_batch_operations() { + let manager = ConcurrencyManager::new(); + + // Cache multiple objects + for i in 0..10 { + let key = format!("batch/object{}", i); + let data = vec![i as u8; 100 * KI_B]; // 100KB each + manager.cache_object(key, data).await; + } + + // Test batch get + let keys: Vec = (0..10).map(|i| format!("batch/object{}", i)).collect(); + let results = manager.get_cached_batch(&keys).await; + + assert_eq!(results.len(), 10, "Should return result for each key"); + for (i, result) in results.iter().enumerate() { + assert!(result.is_some(), "Object {} should be in cache", i); + assert_eq!(result.as_ref().unwrap().len(), 100 * KI_B); + } + + // Test batch get with missing keys + let mixed_keys = vec![ + "batch/object0".to_string(), + "missing/key1".to_string(), + "batch/object5".to_string(), + "missing/key2".to_string(), + ]; + let mixed_results = manager.get_cached_batch(&mixed_keys).await; + + assert_eq!(mixed_results.len(), 4); + assert!(mixed_results[0].is_some(), "First key should exist"); + assert!(mixed_results[1].is_none(), "Second key should be missing"); + assert!(mixed_results[2].is_some(), "Third key should exist"); + assert!(mixed_results[3].is_none(), "Fourth key should be missing"); + } + + /// Test cache warming functionality + #[tokio::test] + async fn test_cache_warming() { + let manager = ConcurrencyManager::new(); + + // Prepare objects for warming + let mut warm_objects = Vec::new(); + for i in 0..5 { + let key = format!("warm/object{}", i); + let data = vec![i as u8; 512 * KI_B]; // 512KB each + warm_objects.push((key, data)); + } + + // Warm cache + manager.warm_cache(warm_objects).await; + + // Verify all objects are cached + let stats = manager.cache_stats().await; + assert_eq!(stats.entries, 5, "All objects should be cached"); + + for i in 0..5 { + let key = format!("warm/object{}", i); + assert!(manager.is_cached(&key).await, "Object {} should be cached", i); + } + } + + /// Test hot keys tracking + #[tokio::test] + async fn test_hot_keys_tracking() { + let manager = ConcurrencyManager::new(); + + // Cache objects with different access patterns + for i in 0..5 { + let key = format!("hot/object{}", i); + let data = vec![i as u8; 100 * KI_B]; + manager.cache_object(key, data).await; + } + + // Simulate access patterns (object 0 and 1 are hot) + for _ in 0..10 { + let _ = manager.get_cached("hot/object0").await; + } + for _ in 0..5 { + let _ = manager.get_cached("hot/object1").await; + } + for _ in 0..2 { + let _ = manager.get_cached("hot/object2").await; + } + + // Get hot keys + let hot_keys = manager.get_hot_keys(3).await; + + assert_eq!(hot_keys.len(), 3, "Should return top 3 keys"); + assert_eq!(hot_keys[0].0, "hot/object0", "Most accessed should be first"); + assert!(hot_keys[0].1 >= 10, "Object 0 should have at least 10 hits"); + assert_eq!(hot_keys[1].0, "hot/object1", "Second most accessed should be second"); + assert!(hot_keys[1].1 >= 5, "Object 1 should have at least 5 hits"); + } + + /// Test cache removal + #[tokio::test] + async fn test_cache_removal() { + let manager = ConcurrencyManager::new(); + + // Cache an object + let key = "remove/test".to_string(); + let data = vec![1u8; 100 * KI_B]; + manager.cache_object(key.clone(), data).await; + + // Verify it's cached + assert!(manager.is_cached(&key).await, "Object should be cached"); + + // Remove it + let removed = manager.remove_cached(&key).await; + assert!(removed, "Should successfully remove cached object"); + + // Verify it's gone + assert!(!manager.is_cached(&key).await, "Object should no longer be cached"); + + // Try to remove non-existent key + let not_removed = manager.remove_cached("nonexistent").await; + assert!(!not_removed, "Should return false for non-existent key"); + } + + /// Test advanced buffer sizing with file patterns + #[tokio::test] + async fn test_advanced_buffer_sizing() { + let base_buffer = 256 * KI_B; // 256KB base + + // Test small file optimization + let small_size = get_advanced_buffer_size(128 * KI_B as i64, base_buffer, false); + assert!(small_size < base_buffer, "Small files should use smaller buffers"); + assert!(small_size >= 16 * KI_B, "Should not go below minimum"); + + // Test sequential read optimization + let _guard = ConcurrencyManager::track_request(); + let sequential_size = get_advanced_buffer_size(10 * MI_B as i64, base_buffer, true); + let random_size = get_advanced_buffer_size(10 * MI_B as i64, base_buffer, false); + + // Sequential reads should get larger buffers at low concurrency + assert!( + sequential_size >= random_size, + "Sequential reads should have equal or larger buffers at low concurrency" + ); + + drop(_guard); + + // Test high concurrency with large files + let mut guards = Vec::new(); + for _ in 0..10 { + guards.push(ConcurrencyManager::track_request()); + } + + let high_concurrency_size = get_advanced_buffer_size(50 * MI_B as i64, base_buffer, false); + assert!( + high_concurrency_size <= base_buffer, + "High concurrency should reduce buffer size" + ); + + drop(guards); + } + + /// Test cache performance under concurrent load + #[tokio::test] + async fn test_concurrent_cache_access() { + let manager = Arc::new(ConcurrencyManager::new()); + + // Pre-populate cache + for i in 0..50 { + let key = format!("concurrent/object{}", i); + let data = vec![i as u8; 100 * KI_B]; + manager.cache_object(key, data).await; + } + + // Spawn multiple concurrent readers + let mut handles = Vec::new(); + for worker_id in 0..10 { + let manager_clone = Arc::clone(&manager); + let handle = tokio::spawn(async move { + let mut hit_count = 0; + for i in 0..50 { + let key = format!("concurrent/object{}", i); + if manager_clone.get_cached(&key).await.is_some() { + hit_count += 1; + } + } + (worker_id, hit_count) + }); + handles.push(handle); + } + + // Wait for all workers to complete + let mut total_hits = 0; + for handle in handles { + let (worker_id, hits) = handle.await.unwrap(); + println!("Worker {} got {} cache hits", worker_id, hits); + total_hits += hits; + } + + // All workers should get hits for all cached objects + assert_eq!(total_hits, 500, "Should have 500 total hits (10 workers * 50 objects)"); + } + + /// Test is_cached doesn't affect LRU order + #[tokio::test] + async fn test_is_cached_no_promotion() { + let manager = ConcurrencyManager::new(); + + // Cache two objects + manager.cache_object("first".to_string(), vec![1u8; 100 * KI_B]).await; + manager.cache_object("second".to_string(), vec![2u8; 100 * KI_B]).await; + + // Check first without accessing it + assert!(manager.is_cached("first").await); + + // Access second multiple times + for _ in 0..5 { + let _ = manager.get_cached("second").await; + } + + // Both should still be cached + assert!(manager.is_cached("first").await); + assert!(manager.is_cached("second").await); + + // Get hot keys - second should be hotter + let hot_keys = manager.get_hot_keys(2).await; + assert_eq!(hot_keys[0].0, "second", "Second should be hottest"); + assert!(hot_keys[0].1 >= 5, "Second should have at least 5 hits"); + } }