feat: replace LRU with Moka cache and add comprehensive metrics

- Replace lru crate with moka 0.12.11 for superior concurrent performance
- Implement lock-free cache with automatic TTL/TTI expiration
- Add size-based eviction using Moka's weigher function
- Integrate comprehensive metrics collection throughout GetObject flow:
  * Cache hit/miss tracking with per-key access counts
  * Request concurrency gauges
  * Disk permit wait time histograms
  * Total request duration tracking
  * Response size and buffer size histograms
- Deep integration with ecfs.rs GetObject operation
- Add hit rate calculation method
- Enhanced CacheStats with hit/miss counters
- Lock-free concurrent reads for better scalability

Moka advantages over LRU:
- True lock-free concurrent access
- Built-in TTL and TTI support
- Automatic size-based eviction
- Better performance under high concurrency
- Native async support

Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-11-24 17:17:42 +00:00
parent b4615ed17f
commit 3b6e281de5
2 changed files with 182 additions and 135 deletions

View File

@@ -31,10 +31,11 @@
//! - High concurrency (>8 requests): Optimizes for fairness and predictable latency //! - High concurrency (>8 requests): Optimizes for fairness and predictable latency
use rustfs_config::{KI_B, MI_B}; use rustfs_config::{KI_B, MI_B};
use std::sync::atomic::{AtomicUsize, Ordering}; use moka::future::Cache;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock}; use std::sync::{Arc, LazyLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Semaphore}; use tokio::sync::Semaphore;
/// Global concurrent request counter for adaptive buffer sizing /// Global concurrent request counter for adaptive buffer sizing
static ACTIVE_GET_REQUESTS: AtomicUsize = AtomicUsize::new(0); static ACTIVE_GET_REQUESTS: AtomicUsize = AtomicUsize::new(0);
@@ -45,8 +46,11 @@ const HIGH_CONCURRENCY_THRESHOLD: usize = 8;
/// Medium concurrency threshold /// Medium concurrency threshold
const MEDIUM_CONCURRENCY_THRESHOLD: usize = 4; const MEDIUM_CONCURRENCY_THRESHOLD: usize = 4;
/// Soft expiration time for hot cache objects (seconds) /// Time-to-live for cached objects (5 minutes)
const HOT_OBJECT_SOFT_TTL_SECS: u64 = 300; const CACHE_TTL_SECS: u64 = 300;
/// Time-to-idle for cached objects (2 minutes)
const CACHE_TTI_SECS: u64 = 120;
/// Minimum hit count to extend object lifetime beyond TTL /// Minimum hit count to extend object lifetime beyond TTL
const HOT_OBJECT_MIN_HITS_TO_EXTEND: usize = 5; const HOT_OBJECT_MIN_HITS_TO_EXTEND: usize = 5;
@@ -216,24 +220,27 @@ pub fn get_advanced_buffer_size(file_size: i64, base_buffer_size: usize, is_sequ
standard_size standard_size
} }
/// Simple LRU cache for hot objects /// High-performance cache for hot objects using Moka
/// ///
/// This cache stores frequently accessed small objects (<= 10MB) to reduce /// This cache uses Moka for superior concurrent performance with features like:
/// disk I/O and improve response times under high concurrency. /// - Lock-free reads and writes
#[derive(Debug)] /// - Automatic TTL and TTI expiration
/// - Size-based eviction with weigher function
/// - Built-in metrics collection
#[derive(Clone)]
struct HotObjectCache { struct HotObjectCache {
/// Maximum size of objects to cache (10MB by default) /// Moka cache instance with size-based eviction
cache: Cache<String, Arc<CachedObject>>,
/// Maximum size of individual objects to cache (10MB by default)
max_object_size: usize, max_object_size: usize,
/// Maximum total cache size (100MB by default) /// Global cache hit counter
max_cache_size: usize, hit_count: Arc<AtomicU64>,
/// Current cache size in bytes /// Global cache miss counter
current_size: AtomicUsize, miss_count: Arc<AtomicU64>,
/// Cached objects with their data and metadata
cache: RwLock<lru::LruCache<String, Arc<CachedObject>>>,
} }
/// A cached object with metadata /// A cached object with metadata and metrics
#[derive(Debug)] #[derive(Clone)]
struct CachedObject { struct CachedObject {
/// The object data /// The object data
data: Arc<Vec<u8>>, data: Arc<Vec<u8>>,
@@ -241,87 +248,74 @@ struct CachedObject {
cached_at: Instant, cached_at: Instant,
/// Object size in bytes /// Object size in bytes
size: usize, size: usize,
/// Number of times this object has been served from cache /// Number of times this object has been accessed
hit_count: AtomicUsize, access_count: Arc<AtomicU64>,
} }
impl HotObjectCache { impl HotObjectCache {
/// Create a new hot object cache with default parameters /// Create a new hot object cache with Moka
///
/// Configures Moka with:
/// - Size-based eviction (100MB max)
/// - TTL of 5 minutes
/// - TTI of 2 minutes
/// - Weigher function for accurate size tracking
fn new() -> Self { fn new() -> Self {
let cache = Cache::builder()
.max_capacity(100 * MI_B as u64)
.weigher(|_key: &String, value: &Arc<CachedObject>| -> u32 {
// Weight based on actual data size
value.size.min(u32::MAX as usize) as u32
})
.time_to_live(Duration::from_secs(CACHE_TTL_SECS))
.time_to_idle(Duration::from_secs(CACHE_TTI_SECS))
.build();
Self { Self {
cache,
max_object_size: 10 * MI_B, max_object_size: 10 * MI_B,
max_cache_size: 100 * MI_B, hit_count: Arc::new(AtomicU64::new(0)),
current_size: AtomicUsize::new(0), miss_count: Arc::new(AtomicU64::new(0)),
cache: RwLock::new(lru::LruCache::new(std::num::NonZeroUsize::new(1000).expect("max cache size overflow"))),
} }
} }
/// Check if a cached object should be expired based on age and access pattern /// Get an object from cache with lock-free concurrent access
fn should_expire(obj: &CachedObject) -> bool {
let age = obj.cached_at.elapsed();
age.as_secs() > HOT_OBJECT_SOFT_TTL_SECS && obj.hit_count.load(Ordering::Relaxed) < HOT_OBJECT_MIN_HITS_TO_EXTEND
}
/// Try to get an object from cache with optimized read-first pattern
/// ///
/// Uses `peek()` for initial lookup to avoid write lock contention when possible. /// Moka provides lock-free reads, significantly improving concurrent performance.
/// This significantly improves concurrent read performance. Also enforces expiration.
async fn get(&self, key: &str) -> Option<Arc<Vec<u8>>> { async fn get(&self, key: &str) -> Option<Arc<Vec<u8>>> {
// Fast path: read lock with peek (doesn't promote in LRU) match self.cache.get(key).await {
{ Some(cached) => {
let cache = self.cache.read().await; // Update access count
if let Some(cached) = cache.peek(key) { cached.access_count.fetch_add(1, Ordering::Relaxed);
// Check expiration before returning self.hit_count.fetch_add(1, Ordering::Relaxed);
if Self::should_expire(cached) {
// Don't return expired object; will be removed in write path
} else {
let data = Arc::clone(&cached.data);
drop(cache);
// Promote and update hit count with write lock #[cfg(feature = "metrics")]
let mut cache_write = self.cache.write().await; {
if let Some(cached) = cache_write.get(key) { use metrics::counter;
cached.hit_count.fetch_add(1, Ordering::Relaxed); counter!("rustfs_object_cache_hits").increment(1);
counter!("rustfs_object_cache_access_count", "key" => key.to_string())
#[cfg(feature = "metrics")] .increment(1);
{
use metrics::counter;
counter!("rustfs_object_cache_hits").increment(1);
}
return Some(data);
}
} }
Some(Arc::clone(&cached.data))
}
None => {
self.miss_count.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use metrics::counter;
counter!("rustfs_object_cache_misses").increment(1);
}
None
} }
} }
// Slow path: remove expired entry if found
{
let mut cache = self.cache.write().await;
if let Some(cached) = cache.peek(key) {
if Self::should_expire(cached) {
if let Some(evicted) = cache.pop(key) {
self.current_size.fetch_sub(evicted.size, Ordering::Relaxed);
}
}
}
}
#[cfg(feature = "metrics")]
{
use metrics::counter;
counter!("rustfs_object_cache_misses").increment(1);
}
None
} }
/// Put an object into cache if it's small enough /// Put an object into cache with automatic size-based eviction
/// ///
/// This method ensures that: /// Moka handles eviction automatically based on the weigher function.
/// - Objects larger than `max_object_size` are rejected
/// - Total cache size never exceeds `max_cache_size`
/// - Old entries are evicted using LRU policy when necessary
async fn put(&self, key: String, data: Vec<u8>) { async fn put(&self, key: String, data: Vec<u8>) {
let size = data.len(); let size = data.len();
@@ -330,72 +324,54 @@ impl HotObjectCache {
return; return;
} }
let mut cache = self.cache.write().await;
// If key already exists, remove the old value first to update size correctly
if let Some(old) = cache.pop(&key) {
self.current_size.fetch_sub(old.size, Ordering::Relaxed);
}
// Evict items if adding this object would exceed max_cache_size
let mut current = self.current_size.load(Ordering::Relaxed);
while current + size > self.max_cache_size {
if let Some((_k, evicted)) = cache.pop_lru() {
current -= evicted.size;
self.current_size.store(current, Ordering::Relaxed);
} else {
// Cache is empty but size is still reported wrong; reset
self.current_size.store(0, Ordering::Relaxed);
break;
}
}
let cached_obj = Arc::new(CachedObject { let cached_obj = Arc::new(CachedObject {
data: Arc::new(data), data: Arc::new(data),
cached_at: Instant::now(), cached_at: Instant::now(),
size, size,
hit_count: AtomicUsize::new(0), access_count: Arc::new(AtomicU64::new(0)),
}); });
cache.put(key, cached_obj); self.cache.insert(key.clone(), cached_obj).await;
self.current_size.fetch_add(size, Ordering::Relaxed);
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{ {
use metrics::{counter, gauge}; use metrics::{counter, gauge};
counter!("rustfs_object_cache_insertions").increment(1); counter!("rustfs_object_cache_insertions").increment(1);
gauge!("rustfs_object_cache_size_bytes").set(self.current_size.load(Ordering::Relaxed) as f64); gauge!("rustfs_object_cache_size_bytes").set(self.cache.weighted_size() as f64);
gauge!("rustfs_object_cache_entry_count").set(self.cache.entry_count() as f64);
} }
} }
/// Clear all cached objects /// Clear all cached objects
async fn clear(&self) { async fn clear(&self) {
let mut cache = self.cache.write().await; self.cache.invalidate_all();
cache.clear(); // Sync to ensure all entries are removed
self.current_size.store(0, Ordering::Relaxed); self.cache.run_pending_tasks().await;
} }
/// Get cache statistics for monitoring /// Get cache statistics for monitoring
async fn stats(&self) -> CacheStats { async fn stats(&self) -> CacheStats {
let cache = self.cache.read().await; // Ensure pending tasks are processed for accurate stats
self.cache.run_pending_tasks().await;
CacheStats { CacheStats {
size: self.current_size.load(Ordering::Relaxed), size: self.cache.weighted_size() as usize,
entries: cache.len(), entries: self.cache.entry_count() as usize,
max_size: self.max_cache_size, max_size: 100 * MI_B,
max_object_size: self.max_object_size, max_object_size: self.max_object_size,
hit_count: self.hit_count.load(Ordering::Relaxed),
miss_count: self.miss_count.load(Ordering::Relaxed),
} }
} }
/// Check if a key exists in cache without promoting it in LRU /// Check if a key exists in cache (lock-free)
async fn contains(&self, key: &str) -> bool { async fn contains(&self, key: &str) -> bool {
let cache = self.cache.read().await; self.cache.contains_key(key)
cache.peek(key).is_some()
} }
/// Get multiple objects from cache in a single operation /// Get multiple objects from cache in parallel
/// ///
/// This is more efficient than calling get() multiple times as it acquires /// Leverages Moka's lock-free design for true parallel access.
/// the lock only once for all operations.
async fn get_batch(&self, keys: &[String]) -> Vec<Option<Arc<Vec<u8>>>> { async fn get_batch(&self, keys: &[String]) -> Vec<Option<Arc<Vec<u8>>>> {
let mut results = Vec::with_capacity(keys.len()); let mut results = Vec::with_capacity(keys.len());
for key in keys { for key in keys {
@@ -405,27 +381,25 @@ impl HotObjectCache {
} }
/// Remove a specific key from cache /// Remove a specific key from cache
///
/// Returns true if the key was found and removed, false otherwise.
async fn remove(&self, key: &str) -> bool { async fn remove(&self, key: &str) -> bool {
let mut cache = self.cache.write().await; let had_key = self.cache.contains_key(key);
if let Some(evicted) = cache.pop(key) { self.cache.invalidate(key).await;
self.current_size.fetch_sub(evicted.size, Ordering::Relaxed); had_key
true
} else {
false
}
} }
/// Get the most frequently accessed keys /// Get the most frequently accessed keys
/// ///
/// Returns up to `limit` keys sorted by hit count in descending order. /// Returns up to `limit` keys sorted by access count in descending order.
async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> { async fn get_hot_keys(&self, limit: usize) -> Vec<(String, u64)> {
let cache = self.cache.read().await; // Run pending tasks to ensure accurate entry count
let mut entries: Vec<(String, usize)> = cache self.cache.run_pending_tasks().await;
.iter()
.map(|(k, v)| (k.clone(), v.hit_count.load(Ordering::Relaxed))) let mut entries: Vec<(String, u64)> = Vec::new();
.collect();
// Iterate through cache entries
self.cache.iter().for_each(|(key, value)| {
entries.push((key.clone(), value.access_count.load(Ordering::Relaxed)));
});
entries.sort_by(|a, b| b.1.cmp(&a.1)); entries.sort_by(|a, b| b.1.cmp(&a.1));
entries.truncate(limit); entries.truncate(limit);
@@ -438,6 +412,19 @@ impl HotObjectCache {
self.put(key, data).await; self.put(key, data).await;
} }
} }
/// Get hit rate percentage
fn hit_rate(&self) -> f64 {
let hits = self.hit_count.load(Ordering::Relaxed);
let misses = self.miss_count.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
0.0
} else {
(hits as f64 / total as f64) * 100.0
}
}
} }
/// Cache statistics for monitoring and debugging /// Cache statistics for monitoring and debugging
@@ -451,6 +438,10 @@ pub struct CacheStats {
pub max_size: usize, pub max_size: usize,
/// Maximum allowed object size in bytes /// Maximum allowed object size in bytes
pub max_object_size: usize, pub max_object_size: usize,
/// Total number of cache hits
pub hit_count: u64,
/// Total number of cache misses
pub miss_count: u64,
} }
/// Concurrency manager for coordinating concurrent GetObject requests /// Concurrency manager for coordinating concurrent GetObject requests
@@ -523,9 +514,14 @@ impl ConcurrencyManager {
} }
/// Get the most frequently accessed keys /// Get the most frequently accessed keys
pub async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> { pub async fn get_hot_keys(&self, limit: usize) -> Vec<(String, u64)> {
self.cache.get_hot_keys(limit).await self.cache.get_hot_keys(limit).await
} }
/// Get cache hit rate percentage
pub fn cache_hit_rate(&self) -> f64 {
self.cache.hit_rate()
}
/// Warm up cache with frequently accessed objects /// Warm up cache with frequently accessed objects
/// ///

View File

@@ -1614,10 +1614,19 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc()) fields(start_time=?time::OffsetDateTime::now_utc())
)] )]
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> { async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
let request_start = std::time::Instant::now();
// Track this request for concurrency-aware optimizations // Track this request for concurrency-aware optimizations
let _request_guard = ConcurrencyManager::track_request(); let _request_guard = ConcurrencyManager::track_request();
let concurrent_requests = GetObjectGuard::concurrent_requests(); let concurrent_requests = GetObjectGuard::concurrent_requests();
#[cfg(feature = "metrics")]
{
use metrics::{counter, gauge};
counter!("rustfs_get_object_requests_total").increment(1);
gauge!("rustfs_concurrent_get_object_requests").set(concurrent_requests as f64);
}
debug!("GetObject request started with {} concurrent requests", concurrent_requests); debug!("GetObject request started with {} concurrent requests", concurrent_requests);
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject"); let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject");
@@ -1643,7 +1652,19 @@ impl S3 for FS {
// Only attempt cache lookup for objects without range/part requests // Only attempt cache lookup for objects without range/part requests
if part_number.is_none() && range.is_none() { if part_number.is_none() && range.is_none() {
if let Some(cached_data) = manager.get_cached(&cache_key).await { if let Some(cached_data) = manager.get_cached(&cache_key).await {
debug!("Serving object from cache: {}", cache_key); let cache_serve_duration = request_start.elapsed();
debug!("Serving object from cache: {} (latency: {:?})", cache_key, cache_serve_duration);
#[cfg(feature = "metrics")]
{
use metrics::{counter, histogram};
counter!("rustfs_get_object_cache_served_total").increment(1);
histogram!("rustfs_get_object_cache_serve_duration_seconds")
.record(cache_serve_duration.as_secs_f64());
histogram!("rustfs_get_object_cache_size_bytes")
.record(cached_data.len() as f64);
}
let value = cached_data.clone(); let value = cached_data.clone();
// Build response from cached data // Build response from cached data
@@ -1700,7 +1721,16 @@ impl S3 for FS {
let store = get_validated_store(&bucket).await?; let store = get_validated_store(&bucket).await?;
// Acquire disk read permit to prevent I/O saturation under high concurrency // Acquire disk read permit to prevent I/O saturation under high concurrency
let permit_wait_start = std::time::Instant::now();
let _disk_permit = manager.acquire_disk_read_permit().await; let _disk_permit = manager.acquire_disk_read_permit().await;
let permit_wait_duration = permit_wait_start.elapsed();
#[cfg(feature = "metrics")]
{
use metrics::histogram;
histogram!("rustfs_disk_permit_wait_duration_seconds")
.record(permit_wait_duration.as_secs_f64());
}
let reader = store let reader = store
.get_object_reader(bucket.as_str(), key.as_str(), rs.clone(), h, &opts) .get_object_reader(bucket.as_str(), key.as_str(), rs.clone(), h, &opts)
@@ -2054,6 +2084,27 @@ impl S3 for FS {
let version_id = req.input.version_id.clone().unwrap_or_default(); let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(event_info).version_id(version_id); helper = helper.object(event_info).version_id(version_id);
let total_duration = request_start.elapsed();
#[cfg(feature = "metrics")]
{
use metrics::{counter, histogram};
counter!("rustfs_get_object_requests_completed").increment(1);
histogram!("rustfs_get_object_total_duration_seconds")
.record(total_duration.as_secs_f64());
histogram!("rustfs_get_object_response_size_bytes")
.record(response_content_length as f64);
// Record buffer size that was used
histogram!("rustfs_get_object_buffer_size_bytes")
.record(optimal_buffer_size as f64);
}
debug!(
"GetObject completed: key={} size={} duration={:?} buffer={}",
cache_key, response_content_length, total_duration, optimal_buffer_size
);
let result = Ok(S3Response::new(output)); let result = Ok(S3Response::new(output));
let _ = helper.complete(&result); let _ = helper.complete(&result);
result result