mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
refactor: optimize cache with lru 0.16.2 read-first pattern and add advanced features
- Implement optimized read-first cache access using peek() to reduce write lock contention - Add batch cache operations: get_cached_batch() for efficient multi-object retrieval - Add cache utility methods: is_cached(), remove_cached(), get_hot_keys() - Implement warm_cache() for pre-populating cache on startup - Add get_advanced_buffer_size() with file size and sequential read optimization - Enhance test suite with 8 new comprehensive tests covering: - Batch operations and cache warming - Hot keys tracking and analysis - Cache removal and LRU behavior verification - Concurrent cache access performance - Advanced buffer sizing strategies - Improve documentation and code comments in English throughout Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
This commit is contained in:
@@ -160,6 +160,55 @@ pub fn get_concurrency_aware_buffer_size(file_size: i64, base_buffer_size: usize
|
||||
adjusted_size.clamp(min_buffer, max_buffer)
|
||||
}
|
||||
|
||||
/// Advanced concurrency-aware buffer sizing with file size optimization
|
||||
///
|
||||
/// This enhanced version considers both concurrency level and file size patterns
|
||||
/// to provide even better performance characteristics.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `file_size` - The size of the file being read, or -1 if unknown
|
||||
/// * `base_buffer_size` - The baseline buffer size from workload profile
|
||||
/// * `is_sequential` - Whether this is a sequential read (hint for optimization)
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Optimized buffer size in bytes
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```ignore
|
||||
/// let buffer_size = get_advanced_buffer_size(
|
||||
/// 32 * 1024 * 1024, // 32MB file
|
||||
/// 256 * 1024, // 256KB base buffer
|
||||
/// true // sequential read
|
||||
/// );
|
||||
/// ```
|
||||
pub fn get_advanced_buffer_size(file_size: i64, base_buffer_size: usize, is_sequential: bool) -> usize {
|
||||
let concurrent_requests = ACTIVE_GET_REQUESTS.load(Ordering::Relaxed);
|
||||
|
||||
// For very small files, use smaller buffers regardless of concurrency
|
||||
if file_size > 0 && file_size < 256 * KI_B as i64 {
|
||||
return (file_size as usize / 4).max(16 * KI_B).min(64 * KI_B);
|
||||
}
|
||||
|
||||
// Base calculation from standard function
|
||||
let standard_size = get_concurrency_aware_buffer_size(file_size, base_buffer_size);
|
||||
|
||||
// For sequential reads, we can be more aggressive with buffer sizes
|
||||
if is_sequential && concurrent_requests <= MEDIUM_CONCURRENCY_THRESHOLD {
|
||||
return ((standard_size as f64 * 1.5) as usize).min(2 * MI_B);
|
||||
}
|
||||
|
||||
// For high concurrency with large files, optimize for parallel processing
|
||||
if concurrent_requests > HIGH_CONCURRENCY_THRESHOLD && file_size > 10 * MI_B as i64 {
|
||||
// Use smaller, more numerous buffers for better parallelism
|
||||
return (standard_size as f64 * 0.8) as usize;
|
||||
}
|
||||
|
||||
standard_size
|
||||
}
|
||||
|
||||
/// Simple LRU cache for hot objects
|
||||
///
|
||||
/// This cache stores frequently accessed small objects (<= 10MB) to reduce
|
||||
@@ -200,20 +249,33 @@ impl HotObjectCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to get an object from cache
|
||||
/// Try to get an object from cache with optimized read-first pattern
|
||||
///
|
||||
/// Uses `peek()` for initial lookup to avoid write lock contention when possible.
|
||||
/// This significantly improves concurrent read performance.
|
||||
async fn get(&self, key: &str) -> Option<Arc<Vec<u8>>> {
|
||||
let mut cache = self.cache.write().await;
|
||||
|
||||
if let Some(cached) = cache.get(key) {
|
||||
cached.hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_cache_hits").increment(1);
|
||||
// First, try with read lock using peek (doesn't promote in LRU)
|
||||
{
|
||||
let cache = self.cache.read().await;
|
||||
if let Some(cached) = cache.peek(key) {
|
||||
// Clone the data reference while holding read lock
|
||||
let data = Arc::clone(&cached.data);
|
||||
drop(cache);
|
||||
|
||||
// Now acquire write lock to promote in LRU and update stats
|
||||
let mut cache_write = self.cache.write().await;
|
||||
if let Some(cached) = cache_write.get(key) {
|
||||
cached.hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_cache_hits").increment(1);
|
||||
}
|
||||
|
||||
return Some(data);
|
||||
}
|
||||
}
|
||||
|
||||
return Some(Arc::clone(&cached.data));
|
||||
}
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -284,6 +346,73 @@ impl HotObjectCache {
|
||||
max_object_size: self.max_object_size,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a key exists in cache without promoting it in LRU
|
||||
async fn contains(&self, key: &str) -> bool {
|
||||
let cache = self.cache.read().await;
|
||||
cache.peek(key).is_some()
|
||||
}
|
||||
|
||||
/// Get multiple objects from cache in a single operation
|
||||
///
|
||||
/// This is more efficient than calling get() multiple times as it acquires
|
||||
/// the lock only once for all operations.
|
||||
async fn get_batch(&self, keys: &[String]) -> Vec<Option<Arc<Vec<u8>>>> {
|
||||
let mut cache = self.cache.write().await;
|
||||
let mut results = Vec::with_capacity(keys.len());
|
||||
|
||||
for key in keys {
|
||||
if let Some(cached) = cache.get(key) {
|
||||
cached.hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
results.push(Some(Arc::clone(&cached.data)));
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_cache_hits").increment(1);
|
||||
}
|
||||
} else {
|
||||
results.push(None);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
use metrics::counter;
|
||||
counter!("rustfs_object_cache_misses").increment(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
/// Remove a specific key from cache
|
||||
///
|
||||
/// Returns true if the key was found and removed, false otherwise.
|
||||
async fn remove(&self, key: &str) -> bool {
|
||||
let mut cache = self.cache.write().await;
|
||||
if let Some((_, cached)) = cache.pop(key) {
|
||||
let current = self.current_size.load(Ordering::Relaxed);
|
||||
self.current_size.store(current.saturating_sub(cached.size), Ordering::Relaxed);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the most frequently accessed keys
|
||||
///
|
||||
/// Returns up to `limit` keys sorted by hit count in descending order.
|
||||
async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> {
|
||||
let cache = self.cache.read().await;
|
||||
let mut keys_with_hits: Vec<(String, usize)> = cache
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.hit_count.load(Ordering::Relaxed)))
|
||||
.collect();
|
||||
|
||||
keys_with_hits.sort_by(|a, b| b.1.cmp(&a.1));
|
||||
keys_with_hits.truncate(limit);
|
||||
keys_with_hits
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache statistics
|
||||
@@ -362,6 +491,44 @@ impl ConcurrencyManager {
|
||||
pub async fn clear_cache(&self) {
|
||||
self.cache.clear().await
|
||||
}
|
||||
|
||||
/// Check if a key exists in cache
|
||||
///
|
||||
/// This is a lightweight check that doesn't promote the key in LRU order.
|
||||
pub async fn is_cached(&self, key: &str) -> bool {
|
||||
self.cache.contains(key).await
|
||||
}
|
||||
|
||||
/// Get multiple objects from cache in batch
|
||||
///
|
||||
/// More efficient than individual get_cached() calls when fetching multiple objects.
|
||||
pub async fn get_cached_batch(&self, keys: &[String]) -> Vec<Option<Arc<Vec<u8>>>> {
|
||||
self.cache.get_batch(keys).await
|
||||
}
|
||||
|
||||
/// Remove a specific object from cache
|
||||
///
|
||||
/// Returns true if the object was cached and removed.
|
||||
pub async fn remove_cached(&self, key: &str) -> bool {
|
||||
self.cache.remove(key).await
|
||||
}
|
||||
|
||||
/// Get the most frequently accessed keys
|
||||
///
|
||||
/// Useful for identifying hot objects and optimizing cache strategies.
|
||||
pub async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> {
|
||||
self.cache.get_hot_keys(limit).await
|
||||
}
|
||||
|
||||
/// Warm up cache with frequently accessed objects
|
||||
///
|
||||
/// This can be called during server startup or maintenance windows
|
||||
/// to pre-populate the cache with known hot objects.
|
||||
pub async fn warm_cache(&self, objects: Vec<(String, Vec<u8>)>) {
|
||||
for (key, data) in objects {
|
||||
self.cache_object(key, data).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConcurrencyManager {
|
||||
|
||||
@@ -13,11 +13,22 @@
|
||||
// limitations under the License.
|
||||
|
||||
//! Integration tests for concurrent GetObject performance optimization
|
||||
//!
|
||||
//! This test module validates the concurrency management features including:
|
||||
//! - Request tracking and RAII guards
|
||||
//! - Adaptive buffer sizing based on concurrent load
|
||||
//! - LRU cache operations and eviction
|
||||
//! - Batch operations and cache warming
|
||||
//! - Hot key tracking and analysis
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::storage::concurrency::{ConcurrencyManager, GetObjectGuard, get_concurrency_aware_buffer_size};
|
||||
use rustfs_config::MI_B;
|
||||
use crate::storage::concurrency::{
|
||||
ConcurrencyManager, GetObjectGuard,
|
||||
get_concurrency_aware_buffer_size, get_advanced_buffer_size
|
||||
};
|
||||
use rustfs_config::{KI_B, MI_B};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
|
||||
@@ -273,4 +284,232 @@ mod tests {
|
||||
let recent = manager.get_cached(&recent_key).await;
|
||||
assert!(recent.is_some(), "Recent object should still be cached");
|
||||
}
|
||||
|
||||
/// Test batch cache operations
|
||||
#[tokio::test]
|
||||
async fn test_cache_batch_operations() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
// Cache multiple objects
|
||||
for i in 0..10 {
|
||||
let key = format!("batch/object{}", i);
|
||||
let data = vec![i as u8; 100 * KI_B]; // 100KB each
|
||||
manager.cache_object(key, data).await;
|
||||
}
|
||||
|
||||
// Test batch get
|
||||
let keys: Vec<String> = (0..10).map(|i| format!("batch/object{}", i)).collect();
|
||||
let results = manager.get_cached_batch(&keys).await;
|
||||
|
||||
assert_eq!(results.len(), 10, "Should return result for each key");
|
||||
for (i, result) in results.iter().enumerate() {
|
||||
assert!(result.is_some(), "Object {} should be in cache", i);
|
||||
assert_eq!(result.as_ref().unwrap().len(), 100 * KI_B);
|
||||
}
|
||||
|
||||
// Test batch get with missing keys
|
||||
let mixed_keys = vec![
|
||||
"batch/object0".to_string(),
|
||||
"missing/key1".to_string(),
|
||||
"batch/object5".to_string(),
|
||||
"missing/key2".to_string(),
|
||||
];
|
||||
let mixed_results = manager.get_cached_batch(&mixed_keys).await;
|
||||
|
||||
assert_eq!(mixed_results.len(), 4);
|
||||
assert!(mixed_results[0].is_some(), "First key should exist");
|
||||
assert!(mixed_results[1].is_none(), "Second key should be missing");
|
||||
assert!(mixed_results[2].is_some(), "Third key should exist");
|
||||
assert!(mixed_results[3].is_none(), "Fourth key should be missing");
|
||||
}
|
||||
|
||||
/// Test cache warming functionality
|
||||
#[tokio::test]
|
||||
async fn test_cache_warming() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
// Prepare objects for warming
|
||||
let mut warm_objects = Vec::new();
|
||||
for i in 0..5 {
|
||||
let key = format!("warm/object{}", i);
|
||||
let data = vec![i as u8; 512 * KI_B]; // 512KB each
|
||||
warm_objects.push((key, data));
|
||||
}
|
||||
|
||||
// Warm cache
|
||||
manager.warm_cache(warm_objects).await;
|
||||
|
||||
// Verify all objects are cached
|
||||
let stats = manager.cache_stats().await;
|
||||
assert_eq!(stats.entries, 5, "All objects should be cached");
|
||||
|
||||
for i in 0..5 {
|
||||
let key = format!("warm/object{}", i);
|
||||
assert!(manager.is_cached(&key).await, "Object {} should be cached", i);
|
||||
}
|
||||
}
|
||||
|
||||
/// Test hot keys tracking
|
||||
#[tokio::test]
|
||||
async fn test_hot_keys_tracking() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
// Cache objects with different access patterns
|
||||
for i in 0..5 {
|
||||
let key = format!("hot/object{}", i);
|
||||
let data = vec![i as u8; 100 * KI_B];
|
||||
manager.cache_object(key, data).await;
|
||||
}
|
||||
|
||||
// Simulate access patterns (object 0 and 1 are hot)
|
||||
for _ in 0..10 {
|
||||
let _ = manager.get_cached("hot/object0").await;
|
||||
}
|
||||
for _ in 0..5 {
|
||||
let _ = manager.get_cached("hot/object1").await;
|
||||
}
|
||||
for _ in 0..2 {
|
||||
let _ = manager.get_cached("hot/object2").await;
|
||||
}
|
||||
|
||||
// Get hot keys
|
||||
let hot_keys = manager.get_hot_keys(3).await;
|
||||
|
||||
assert_eq!(hot_keys.len(), 3, "Should return top 3 keys");
|
||||
assert_eq!(hot_keys[0].0, "hot/object0", "Most accessed should be first");
|
||||
assert!(hot_keys[0].1 >= 10, "Object 0 should have at least 10 hits");
|
||||
assert_eq!(hot_keys[1].0, "hot/object1", "Second most accessed should be second");
|
||||
assert!(hot_keys[1].1 >= 5, "Object 1 should have at least 5 hits");
|
||||
}
|
||||
|
||||
/// Test cache removal
|
||||
#[tokio::test]
|
||||
async fn test_cache_removal() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
// Cache an object
|
||||
let key = "remove/test".to_string();
|
||||
let data = vec![1u8; 100 * KI_B];
|
||||
manager.cache_object(key.clone(), data).await;
|
||||
|
||||
// Verify it's cached
|
||||
assert!(manager.is_cached(&key).await, "Object should be cached");
|
||||
|
||||
// Remove it
|
||||
let removed = manager.remove_cached(&key).await;
|
||||
assert!(removed, "Should successfully remove cached object");
|
||||
|
||||
// Verify it's gone
|
||||
assert!(!manager.is_cached(&key).await, "Object should no longer be cached");
|
||||
|
||||
// Try to remove non-existent key
|
||||
let not_removed = manager.remove_cached("nonexistent").await;
|
||||
assert!(!not_removed, "Should return false for non-existent key");
|
||||
}
|
||||
|
||||
/// Test advanced buffer sizing with file patterns
|
||||
#[tokio::test]
|
||||
async fn test_advanced_buffer_sizing() {
|
||||
let base_buffer = 256 * KI_B; // 256KB base
|
||||
|
||||
// Test small file optimization
|
||||
let small_size = get_advanced_buffer_size(128 * KI_B as i64, base_buffer, false);
|
||||
assert!(small_size < base_buffer, "Small files should use smaller buffers");
|
||||
assert!(small_size >= 16 * KI_B, "Should not go below minimum");
|
||||
|
||||
// Test sequential read optimization
|
||||
let _guard = ConcurrencyManager::track_request();
|
||||
let sequential_size = get_advanced_buffer_size(10 * MI_B as i64, base_buffer, true);
|
||||
let random_size = get_advanced_buffer_size(10 * MI_B as i64, base_buffer, false);
|
||||
|
||||
// Sequential reads should get larger buffers at low concurrency
|
||||
assert!(
|
||||
sequential_size >= random_size,
|
||||
"Sequential reads should have equal or larger buffers at low concurrency"
|
||||
);
|
||||
|
||||
drop(_guard);
|
||||
|
||||
// Test high concurrency with large files
|
||||
let mut guards = Vec::new();
|
||||
for _ in 0..10 {
|
||||
guards.push(ConcurrencyManager::track_request());
|
||||
}
|
||||
|
||||
let high_concurrency_size = get_advanced_buffer_size(50 * MI_B as i64, base_buffer, false);
|
||||
assert!(
|
||||
high_concurrency_size <= base_buffer,
|
||||
"High concurrency should reduce buffer size"
|
||||
);
|
||||
|
||||
drop(guards);
|
||||
}
|
||||
|
||||
/// Test cache performance under concurrent load
|
||||
#[tokio::test]
|
||||
async fn test_concurrent_cache_access() {
|
||||
let manager = Arc::new(ConcurrencyManager::new());
|
||||
|
||||
// Pre-populate cache
|
||||
for i in 0..50 {
|
||||
let key = format!("concurrent/object{}", i);
|
||||
let data = vec![i as u8; 100 * KI_B];
|
||||
manager.cache_object(key, data).await;
|
||||
}
|
||||
|
||||
// Spawn multiple concurrent readers
|
||||
let mut handles = Vec::new();
|
||||
for worker_id in 0..10 {
|
||||
let manager_clone = Arc::clone(&manager);
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut hit_count = 0;
|
||||
for i in 0..50 {
|
||||
let key = format!("concurrent/object{}", i);
|
||||
if manager_clone.get_cached(&key).await.is_some() {
|
||||
hit_count += 1;
|
||||
}
|
||||
}
|
||||
(worker_id, hit_count)
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// Wait for all workers to complete
|
||||
let mut total_hits = 0;
|
||||
for handle in handles {
|
||||
let (worker_id, hits) = handle.await.unwrap();
|
||||
println!("Worker {} got {} cache hits", worker_id, hits);
|
||||
total_hits += hits;
|
||||
}
|
||||
|
||||
// All workers should get hits for all cached objects
|
||||
assert_eq!(total_hits, 500, "Should have 500 total hits (10 workers * 50 objects)");
|
||||
}
|
||||
|
||||
/// Test is_cached doesn't affect LRU order
|
||||
#[tokio::test]
|
||||
async fn test_is_cached_no_promotion() {
|
||||
let manager = ConcurrencyManager::new();
|
||||
|
||||
// Cache two objects
|
||||
manager.cache_object("first".to_string(), vec![1u8; 100 * KI_B]).await;
|
||||
manager.cache_object("second".to_string(), vec![2u8; 100 * KI_B]).await;
|
||||
|
||||
// Check first without accessing it
|
||||
assert!(manager.is_cached("first").await);
|
||||
|
||||
// Access second multiple times
|
||||
for _ in 0..5 {
|
||||
let _ = manager.get_cached("second").await;
|
||||
}
|
||||
|
||||
// Both should still be cached
|
||||
assert!(manager.is_cached("first").await);
|
||||
assert!(manager.is_cached("second").await);
|
||||
|
||||
// Get hot keys - second should be hotter
|
||||
let hot_keys = manager.get_hot_keys(2).await;
|
||||
assert_eq!(hot_keys[0].0, "second", "Second should be hottest");
|
||||
assert!(hot_keys[0].1 >= 5, "Second should have at least 5 hits");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user