diff --git a/rustfs/src/storage/concurrency.rs b/rustfs/src/storage/concurrency.rs index 60028fcd..0a7c4544 100644 --- a/rustfs/src/storage/concurrency.rs +++ b/rustfs/src/storage/concurrency.rs @@ -31,10 +31,11 @@ //! - High concurrency (>8 requests): Optimizes for fairness and predictable latency use rustfs_config::{KI_B, MI_B}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use moka::future::Cache; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant}; -use tokio::sync::{RwLock, Semaphore}; +use tokio::sync::Semaphore; /// Global concurrent request counter for adaptive buffer sizing static ACTIVE_GET_REQUESTS: AtomicUsize = AtomicUsize::new(0); @@ -45,8 +46,11 @@ const HIGH_CONCURRENCY_THRESHOLD: usize = 8; /// Medium concurrency threshold const MEDIUM_CONCURRENCY_THRESHOLD: usize = 4; -/// Soft expiration time for hot cache objects (seconds) -const HOT_OBJECT_SOFT_TTL_SECS: u64 = 300; +/// Time-to-live for cached objects (5 minutes) +const CACHE_TTL_SECS: u64 = 300; + +/// Time-to-idle for cached objects (2 minutes) +const CACHE_TTI_SECS: u64 = 120; /// Minimum hit count to extend object lifetime beyond TTL const HOT_OBJECT_MIN_HITS_TO_EXTEND: usize = 5; @@ -216,24 +220,27 @@ pub fn get_advanced_buffer_size(file_size: i64, base_buffer_size: usize, is_sequ standard_size } -/// Simple LRU cache for hot objects +/// High-performance cache for hot objects using Moka /// -/// This cache stores frequently accessed small objects (<= 10MB) to reduce -/// disk I/O and improve response times under high concurrency. -#[derive(Debug)] +/// This cache uses Moka for superior concurrent performance with features like: +/// - Lock-free reads and writes +/// - Automatic TTL and TTI expiration +/// - Size-based eviction with weigher function +/// - Built-in metrics collection +#[derive(Clone)] struct HotObjectCache { - /// Maximum size of objects to cache (10MB by default) + /// Moka cache instance with size-based eviction + cache: Cache>, + /// Maximum size of individual objects to cache (10MB by default) max_object_size: usize, - /// Maximum total cache size (100MB by default) - max_cache_size: usize, - /// Current cache size in bytes - current_size: AtomicUsize, - /// Cached objects with their data and metadata - cache: RwLock>>, + /// Global cache hit counter + hit_count: Arc, + /// Global cache miss counter + miss_count: Arc, } -/// A cached object with metadata -#[derive(Debug)] +/// A cached object with metadata and metrics +#[derive(Clone)] struct CachedObject { /// The object data data: Arc>, @@ -241,87 +248,74 @@ struct CachedObject { cached_at: Instant, /// Object size in bytes size: usize, - /// Number of times this object has been served from cache - hit_count: AtomicUsize, + /// Number of times this object has been accessed + access_count: Arc, } impl HotObjectCache { - /// Create a new hot object cache with default parameters + /// Create a new hot object cache with Moka + /// + /// Configures Moka with: + /// - Size-based eviction (100MB max) + /// - TTL of 5 minutes + /// - TTI of 2 minutes + /// - Weigher function for accurate size tracking fn new() -> Self { + let cache = Cache::builder() + .max_capacity(100 * MI_B as u64) + .weigher(|_key: &String, value: &Arc| -> u32 { + // Weight based on actual data size + value.size.min(u32::MAX as usize) as u32 + }) + .time_to_live(Duration::from_secs(CACHE_TTL_SECS)) + .time_to_idle(Duration::from_secs(CACHE_TTI_SECS)) + .build(); + Self { + cache, max_object_size: 10 * MI_B, - max_cache_size: 100 * MI_B, - current_size: AtomicUsize::new(0), - cache: RwLock::new(lru::LruCache::new(std::num::NonZeroUsize::new(1000).expect("max cache size overflow"))), + hit_count: Arc::new(AtomicU64::new(0)), + miss_count: Arc::new(AtomicU64::new(0)), } } - /// Check if a cached object should be expired based on age and access pattern - fn should_expire(obj: &CachedObject) -> bool { - let age = obj.cached_at.elapsed(); - age.as_secs() > HOT_OBJECT_SOFT_TTL_SECS && obj.hit_count.load(Ordering::Relaxed) < HOT_OBJECT_MIN_HITS_TO_EXTEND - } - - /// Try to get an object from cache with optimized read-first pattern + /// Get an object from cache with lock-free concurrent access /// - /// Uses `peek()` for initial lookup to avoid write lock contention when possible. - /// This significantly improves concurrent read performance. Also enforces expiration. + /// Moka provides lock-free reads, significantly improving concurrent performance. async fn get(&self, key: &str) -> Option>> { - // Fast path: read lock with peek (doesn't promote in LRU) - { - let cache = self.cache.read().await; - if let Some(cached) = cache.peek(key) { - // Check expiration before returning - if Self::should_expire(cached) { - // Don't return expired object; will be removed in write path - } else { - let data = Arc::clone(&cached.data); - drop(cache); + match self.cache.get(key).await { + Some(cached) => { + // Update access count + cached.access_count.fetch_add(1, Ordering::Relaxed); + self.hit_count.fetch_add(1, Ordering::Relaxed); - // Promote and update hit count with write lock - let mut cache_write = self.cache.write().await; - if let Some(cached) = cache_write.get(key) { - cached.hit_count.fetch_add(1, Ordering::Relaxed); - - #[cfg(feature = "metrics")] - { - use metrics::counter; - counter!("rustfs_object_cache_hits").increment(1); - } - - return Some(data); - } + #[cfg(feature = "metrics")] + { + use metrics::counter; + counter!("rustfs_object_cache_hits").increment(1); + counter!("rustfs_object_cache_access_count", "key" => key.to_string()) + .increment(1); } + + Some(Arc::clone(&cached.data)) + } + None => { + self.miss_count.fetch_add(1, Ordering::Relaxed); + + #[cfg(feature = "metrics")] + { + use metrics::counter; + counter!("rustfs_object_cache_misses").increment(1); + } + + None } } - - // Slow path: remove expired entry if found - { - let mut cache = self.cache.write().await; - if let Some(cached) = cache.peek(key) { - if Self::should_expire(cached) { - if let Some(evicted) = cache.pop(key) { - self.current_size.fetch_sub(evicted.size, Ordering::Relaxed); - } - } - } - } - - #[cfg(feature = "metrics")] - { - use metrics::counter; - counter!("rustfs_object_cache_misses").increment(1); - } - - None } - /// Put an object into cache if it's small enough + /// Put an object into cache with automatic size-based eviction /// - /// This method ensures that: - /// - Objects larger than `max_object_size` are rejected - /// - Total cache size never exceeds `max_cache_size` - /// - Old entries are evicted using LRU policy when necessary + /// Moka handles eviction automatically based on the weigher function. async fn put(&self, key: String, data: Vec) { let size = data.len(); @@ -330,72 +324,54 @@ impl HotObjectCache { return; } - let mut cache = self.cache.write().await; - - // If key already exists, remove the old value first to update size correctly - if let Some(old) = cache.pop(&key) { - self.current_size.fetch_sub(old.size, Ordering::Relaxed); - } - - // Evict items if adding this object would exceed max_cache_size - let mut current = self.current_size.load(Ordering::Relaxed); - while current + size > self.max_cache_size { - if let Some((_k, evicted)) = cache.pop_lru() { - current -= evicted.size; - self.current_size.store(current, Ordering::Relaxed); - } else { - // Cache is empty but size is still reported wrong; reset - self.current_size.store(0, Ordering::Relaxed); - break; - } - } - let cached_obj = Arc::new(CachedObject { data: Arc::new(data), cached_at: Instant::now(), size, - hit_count: AtomicUsize::new(0), + access_count: Arc::new(AtomicU64::new(0)), }); - cache.put(key, cached_obj); - self.current_size.fetch_add(size, Ordering::Relaxed); + self.cache.insert(key.clone(), cached_obj).await; #[cfg(feature = "metrics")] { use metrics::{counter, gauge}; counter!("rustfs_object_cache_insertions").increment(1); - gauge!("rustfs_object_cache_size_bytes").set(self.current_size.load(Ordering::Relaxed) as f64); + gauge!("rustfs_object_cache_size_bytes").set(self.cache.weighted_size() as f64); + gauge!("rustfs_object_cache_entry_count").set(self.cache.entry_count() as f64); } } /// Clear all cached objects async fn clear(&self) { - let mut cache = self.cache.write().await; - cache.clear(); - self.current_size.store(0, Ordering::Relaxed); + self.cache.invalidate_all(); + // Sync to ensure all entries are removed + self.cache.run_pending_tasks().await; } /// Get cache statistics for monitoring async fn stats(&self) -> CacheStats { - let cache = self.cache.read().await; + // Ensure pending tasks are processed for accurate stats + self.cache.run_pending_tasks().await; + CacheStats { - size: self.current_size.load(Ordering::Relaxed), - entries: cache.len(), - max_size: self.max_cache_size, + size: self.cache.weighted_size() as usize, + entries: self.cache.entry_count() as usize, + max_size: 100 * MI_B, max_object_size: self.max_object_size, + hit_count: self.hit_count.load(Ordering::Relaxed), + miss_count: self.miss_count.load(Ordering::Relaxed), } } - /// Check if a key exists in cache without promoting it in LRU + /// Check if a key exists in cache (lock-free) async fn contains(&self, key: &str) -> bool { - let cache = self.cache.read().await; - cache.peek(key).is_some() + self.cache.contains_key(key) } - /// Get multiple objects from cache in a single operation + /// Get multiple objects from cache in parallel /// - /// This is more efficient than calling get() multiple times as it acquires - /// the lock only once for all operations. + /// Leverages Moka's lock-free design for true parallel access. async fn get_batch(&self, keys: &[String]) -> Vec>>> { let mut results = Vec::with_capacity(keys.len()); for key in keys { @@ -405,27 +381,25 @@ impl HotObjectCache { } /// Remove a specific key from cache - /// - /// Returns true if the key was found and removed, false otherwise. async fn remove(&self, key: &str) -> bool { - let mut cache = self.cache.write().await; - if let Some(evicted) = cache.pop(key) { - self.current_size.fetch_sub(evicted.size, Ordering::Relaxed); - true - } else { - false - } + let had_key = self.cache.contains_key(key); + self.cache.invalidate(key).await; + had_key } /// Get the most frequently accessed keys /// - /// Returns up to `limit` keys sorted by hit count in descending order. - async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> { - let cache = self.cache.read().await; - let mut entries: Vec<(String, usize)> = cache - .iter() - .map(|(k, v)| (k.clone(), v.hit_count.load(Ordering::Relaxed))) - .collect(); + /// Returns up to `limit` keys sorted by access count in descending order. + async fn get_hot_keys(&self, limit: usize) -> Vec<(String, u64)> { + // Run pending tasks to ensure accurate entry count + self.cache.run_pending_tasks().await; + + let mut entries: Vec<(String, u64)> = Vec::new(); + + // Iterate through cache entries + self.cache.iter().for_each(|(key, value)| { + entries.push((key.clone(), value.access_count.load(Ordering::Relaxed))); + }); entries.sort_by(|a, b| b.1.cmp(&a.1)); entries.truncate(limit); @@ -438,6 +412,19 @@ impl HotObjectCache { self.put(key, data).await; } } + + /// Get hit rate percentage + fn hit_rate(&self) -> f64 { + let hits = self.hit_count.load(Ordering::Relaxed); + let misses = self.miss_count.load(Ordering::Relaxed); + let total = hits + misses; + + if total == 0 { + 0.0 + } else { + (hits as f64 / total as f64) * 100.0 + } + } } /// Cache statistics for monitoring and debugging @@ -451,6 +438,10 @@ pub struct CacheStats { pub max_size: usize, /// Maximum allowed object size in bytes pub max_object_size: usize, + /// Total number of cache hits + pub hit_count: u64, + /// Total number of cache misses + pub miss_count: u64, } /// Concurrency manager for coordinating concurrent GetObject requests @@ -523,9 +514,14 @@ impl ConcurrencyManager { } /// Get the most frequently accessed keys - pub async fn get_hot_keys(&self, limit: usize) -> Vec<(String, usize)> { + pub async fn get_hot_keys(&self, limit: usize) -> Vec<(String, u64)> { self.cache.get_hot_keys(limit).await } + + /// Get cache hit rate percentage + pub fn cache_hit_rate(&self) -> f64 { + self.cache.hit_rate() + } /// Warm up cache with frequently accessed objects /// diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index a56c042b..abdf5c32 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1614,10 +1614,19 @@ impl S3 for FS { fields(start_time=?time::OffsetDateTime::now_utc()) )] async fn get_object(&self, req: S3Request) -> S3Result> { + let request_start = std::time::Instant::now(); + // Track this request for concurrency-aware optimizations let _request_guard = ConcurrencyManager::track_request(); let concurrent_requests = GetObjectGuard::concurrent_requests(); + #[cfg(feature = "metrics")] + { + use metrics::{counter, gauge}; + counter!("rustfs_get_object_requests_total").increment(1); + gauge!("rustfs_concurrent_get_object_requests").set(concurrent_requests as f64); + } + debug!("GetObject request started with {} concurrent requests", concurrent_requests); let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject"); @@ -1643,7 +1652,19 @@ impl S3 for FS { // Only attempt cache lookup for objects without range/part requests if part_number.is_none() && range.is_none() { if let Some(cached_data) = manager.get_cached(&cache_key).await { - debug!("Serving object from cache: {}", cache_key); + let cache_serve_duration = request_start.elapsed(); + + debug!("Serving object from cache: {} (latency: {:?})", cache_key, cache_serve_duration); + + #[cfg(feature = "metrics")] + { + use metrics::{counter, histogram}; + counter!("rustfs_get_object_cache_served_total").increment(1); + histogram!("rustfs_get_object_cache_serve_duration_seconds") + .record(cache_serve_duration.as_secs_f64()); + histogram!("rustfs_get_object_cache_size_bytes") + .record(cached_data.len() as f64); + } let value = cached_data.clone(); // Build response from cached data @@ -1700,7 +1721,16 @@ impl S3 for FS { let store = get_validated_store(&bucket).await?; // Acquire disk read permit to prevent I/O saturation under high concurrency + let permit_wait_start = std::time::Instant::now(); let _disk_permit = manager.acquire_disk_read_permit().await; + let permit_wait_duration = permit_wait_start.elapsed(); + + #[cfg(feature = "metrics")] + { + use metrics::histogram; + histogram!("rustfs_disk_permit_wait_duration_seconds") + .record(permit_wait_duration.as_secs_f64()); + } let reader = store .get_object_reader(bucket.as_str(), key.as_str(), rs.clone(), h, &opts) @@ -2054,6 +2084,27 @@ impl S3 for FS { let version_id = req.input.version_id.clone().unwrap_or_default(); helper = helper.object(event_info).version_id(version_id); + let total_duration = request_start.elapsed(); + + #[cfg(feature = "metrics")] + { + use metrics::{counter, histogram}; + counter!("rustfs_get_object_requests_completed").increment(1); + histogram!("rustfs_get_object_total_duration_seconds") + .record(total_duration.as_secs_f64()); + histogram!("rustfs_get_object_response_size_bytes") + .record(response_content_length as f64); + + // Record buffer size that was used + histogram!("rustfs_get_object_buffer_size_bytes") + .record(optimal_buffer_size as f64); + } + + debug!( + "GetObject completed: key={} size={} duration={:?} buffer={}", + cache_key, response_content_length, total_duration, optimal_buffer_size + ); + let result = Ok(S3Response::new(output)); let _ = helper.complete(&result); result