mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
feat: implement metacache for list_objects optimization
- Implement metacache mechanism similar to MinIO for caching object listings - Add MetacacheManager to manage cache lifecycle and persistence - Integrate cache lookup and saving logic into list_path - Fix marker handling consistency across gather_results, forward_past, and stream_cache_entries - Fix cache exhaustion fallback to continue listing when cache is incomplete - Add comprehensive test suite for cache functionality, pagination, and data consistency - Optimize test execution time by reducing object count and page size - Fix clippy warnings (or_default, Default implementation, double reference)
This commit is contained in:
478
crates/ecstore/src/cache_value/metacache_manager.rs
Normal file
478
crates/ecstore/src/cache_value/metacache_manager.rs
Normal file
@@ -0,0 +1,478 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::disk::RUSTFS_META_BUCKET;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::store::ECStore;
|
||||
use crate::store_api::{ObjectIO, ObjectOptions};
|
||||
use crate::store_list_objects::ListPathOptions;
|
||||
use rustfs_filemeta::{MetaCacheEntriesSorted, MetaCacheEntry, MetacacheReader, MetacacheWriter};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Scan status for metacache entries
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ScanStatus {
|
||||
None = 0,
|
||||
Started = 1,
|
||||
Success = 2,
|
||||
Error = 3,
|
||||
}
|
||||
|
||||
impl Default for ScanStatus {
|
||||
fn default() -> Self {
|
||||
Self::None
|
||||
}
|
||||
}
|
||||
|
||||
/// Metacache entry representing a list operation
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Metacache {
|
||||
pub id: String,
|
||||
pub bucket: String,
|
||||
pub root: String,
|
||||
pub filter: Option<String>,
|
||||
pub status: ScanStatus,
|
||||
pub started: SystemTime,
|
||||
pub ended: Option<SystemTime>,
|
||||
pub last_handout: SystemTime,
|
||||
pub last_update: SystemTime,
|
||||
pub error: Option<String>,
|
||||
pub file_not_found: bool,
|
||||
pub recursive: bool,
|
||||
pub data_version: u8,
|
||||
}
|
||||
|
||||
impl Metacache {
|
||||
pub fn new(opts: &ListPathOptions) -> Self {
|
||||
let now = SystemTime::now();
|
||||
Self {
|
||||
id: opts.id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()),
|
||||
bucket: opts.bucket.clone(),
|
||||
root: opts.base_dir.clone(),
|
||||
filter: opts.filter_prefix.clone(),
|
||||
status: ScanStatus::Started,
|
||||
started: now,
|
||||
ended: None,
|
||||
last_handout: now,
|
||||
last_update: now,
|
||||
error: None,
|
||||
file_not_found: false,
|
||||
recursive: opts.recursive,
|
||||
data_version: 2,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finished(&self) -> bool {
|
||||
self.ended.is_some()
|
||||
}
|
||||
|
||||
/// Check if the cache is worth keeping
|
||||
pub fn worth_keeping(&self) -> bool {
|
||||
const MAX_RUNNING_AGE: Duration = Duration::from_secs(3600); // 1 hour
|
||||
const MAX_CLIENT_WAIT: Duration = Duration::from_secs(180); // 3 minutes
|
||||
const MAX_FINISHED_WAIT: Duration = Duration::from_secs(900); // 15 minutes
|
||||
const MAX_ERROR_WAIT: Duration = Duration::from_secs(300); // 5 minutes
|
||||
|
||||
let now = SystemTime::now();
|
||||
|
||||
match self.status {
|
||||
ScanStatus::Started => {
|
||||
// Not finished and update for MAX_RUNNING_AGE, discard it
|
||||
if let Ok(elapsed) = now.duration_since(self.last_update) {
|
||||
elapsed < MAX_RUNNING_AGE
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
ScanStatus::Success => {
|
||||
// Keep for MAX_FINISHED_WAIT after we last saw the client
|
||||
if let Ok(elapsed) = now.duration_since(self.last_handout) {
|
||||
elapsed < MAX_FINISHED_WAIT
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
ScanStatus::Error | ScanStatus::None => {
|
||||
// Remove failed listings after MAX_ERROR_WAIT
|
||||
if let Ok(elapsed) = now.duration_since(self.last_update) {
|
||||
elapsed < MAX_ERROR_WAIT
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update cache with new status
|
||||
pub fn update(&mut self, update: &Metacache) {
|
||||
let now = SystemTime::now();
|
||||
self.last_update = now;
|
||||
|
||||
if update.last_handout > self.last_handout {
|
||||
self.last_handout = update.last_update;
|
||||
if self.last_handout > now {
|
||||
self.last_handout = now;
|
||||
}
|
||||
}
|
||||
|
||||
if self.status == ScanStatus::Started && update.status == ScanStatus::Success {
|
||||
self.ended = Some(now);
|
||||
}
|
||||
|
||||
if self.status == ScanStatus::Started && update.status != ScanStatus::Started {
|
||||
self.status = update.status;
|
||||
}
|
||||
|
||||
if self.status == ScanStatus::Started {
|
||||
if let Ok(elapsed) = now.duration_since(self.last_handout) {
|
||||
if elapsed > Duration::from_secs(180) {
|
||||
// Drop if client hasn't been seen for 3 minutes
|
||||
self.status = ScanStatus::Error;
|
||||
self.error = Some("client not seen".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.error.is_none() && update.error.is_some() {
|
||||
self.error = update.error.clone();
|
||||
self.status = ScanStatus::Error;
|
||||
self.ended = Some(now);
|
||||
}
|
||||
|
||||
self.file_not_found = self.file_not_found || update.file_not_found;
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket-level metacache manager
|
||||
#[derive(Debug)]
|
||||
pub struct BucketMetacache {
|
||||
bucket: String,
|
||||
caches: HashMap<String, Metacache>,
|
||||
caches_root: HashMap<String, Vec<String>>,
|
||||
updated: bool,
|
||||
}
|
||||
|
||||
impl BucketMetacache {
|
||||
pub fn new(bucket: String) -> Self {
|
||||
Self {
|
||||
bucket,
|
||||
caches: HashMap::new(),
|
||||
caches_root: HashMap::new(),
|
||||
updated: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find or create a cache entry
|
||||
pub fn find_cache(&mut self, opts: &ListPathOptions) -> Metacache {
|
||||
// Check if exists already
|
||||
if let Some(mut cache) = self.caches.get(&opts.id.clone().unwrap_or_default()).cloned() {
|
||||
cache.last_handout = SystemTime::now();
|
||||
self.caches.insert(cache.id.clone(), cache.clone());
|
||||
debug!("returning existing cache {}", cache.id);
|
||||
return cache;
|
||||
}
|
||||
|
||||
if !opts.create {
|
||||
return Metacache {
|
||||
id: opts.id.clone().unwrap_or_default(),
|
||||
bucket: opts.bucket.clone(),
|
||||
root: opts.base_dir.clone(),
|
||||
filter: opts.filter_prefix.clone(),
|
||||
status: ScanStatus::None,
|
||||
started: SystemTime::now(),
|
||||
ended: None,
|
||||
last_handout: SystemTime::now(),
|
||||
last_update: SystemTime::now(),
|
||||
error: None,
|
||||
file_not_found: false,
|
||||
recursive: opts.recursive,
|
||||
data_version: 2,
|
||||
};
|
||||
}
|
||||
|
||||
// Create new cache
|
||||
let cache = Metacache::new(opts);
|
||||
let root = cache.root.clone();
|
||||
let id = cache.id.clone();
|
||||
self.caches.insert(id.clone(), cache.clone());
|
||||
self.caches_root.entry(root).or_default().push(id);
|
||||
self.updated = true;
|
||||
debug!("returning new cache {}, bucket: {}", cache.id, cache.bucket);
|
||||
cache
|
||||
}
|
||||
|
||||
/// Update cache entry
|
||||
pub fn update_cache_entry(&mut self, update: Metacache) -> Result<Metacache> {
|
||||
if let Some(cache) = self.caches.get_mut(&update.id) {
|
||||
cache.update(&update);
|
||||
self.updated = true;
|
||||
Ok(cache.clone())
|
||||
} else {
|
||||
// Create new entry
|
||||
let root = update.root.clone();
|
||||
let id = update.id.clone();
|
||||
self.caches.insert(id.clone(), update.clone());
|
||||
self.caches_root.entry(root).or_default().push(id);
|
||||
self.updated = true;
|
||||
Ok(update)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get cache by ID
|
||||
pub fn get_cache(&self, id: &str) -> Option<&Metacache> {
|
||||
self.caches.get(id)
|
||||
}
|
||||
|
||||
/// Cleanup outdated entries
|
||||
pub fn cleanup(&mut self) {
|
||||
let mut to_remove = Vec::new();
|
||||
for (id, cache) in &self.caches {
|
||||
if !cache.worth_keeping() {
|
||||
to_remove.push(id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for id in to_remove {
|
||||
if let Some(cache) = self.caches.remove(&id) {
|
||||
// Remove from root index
|
||||
if let Some(ids) = self.caches_root.get_mut(&cache.root) {
|
||||
ids.retain(|x| x != &id);
|
||||
if ids.is_empty() {
|
||||
self.caches_root.remove(&cache.root);
|
||||
}
|
||||
}
|
||||
debug!("removed outdated cache {}", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Global metacache manager
|
||||
pub struct MetacacheManager {
|
||||
buckets: HashMap<String, Arc<RwLock<BucketMetacache>>>,
|
||||
trash: HashMap<String, Metacache>,
|
||||
}
|
||||
|
||||
impl Default for MetacacheManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl MetacacheManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
buckets: HashMap::new(),
|
||||
trash: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create bucket metacache
|
||||
pub fn get_bucket(&mut self, bucket: &str) -> Arc<RwLock<BucketMetacache>> {
|
||||
if let Some(bm) = self.buckets.get(bucket) {
|
||||
return bm.clone();
|
||||
}
|
||||
|
||||
let bm = Arc::new(RwLock::new(BucketMetacache::new(bucket.to_string())));
|
||||
self.buckets.insert(bucket.to_string(), bm.clone());
|
||||
bm
|
||||
}
|
||||
|
||||
/// Find cache for given options
|
||||
pub async fn find_cache(&self, opts: &ListPathOptions) -> Metacache {
|
||||
if let Some(bm) = self.buckets.get(&opts.bucket) {
|
||||
let mut bm = bm.write().await;
|
||||
bm.find_cache(opts)
|
||||
} else {
|
||||
// Return empty cache if bucket not found
|
||||
Metacache {
|
||||
id: opts.id.clone().unwrap_or_default(),
|
||||
bucket: opts.bucket.clone(),
|
||||
root: opts.base_dir.clone(),
|
||||
filter: opts.filter_prefix.clone(),
|
||||
status: ScanStatus::None,
|
||||
started: SystemTime::now(),
|
||||
ended: None,
|
||||
last_handout: SystemTime::now(),
|
||||
last_update: SystemTime::now(),
|
||||
error: None,
|
||||
file_not_found: false,
|
||||
recursive: opts.recursive,
|
||||
data_version: 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update cache entry
|
||||
pub async fn update_cache_entry(&mut self, update: Metacache) -> Result<Metacache> {
|
||||
// Check trash first
|
||||
if let Some(mut meta) = self.trash.get(&update.id).cloned() {
|
||||
meta.update(&update);
|
||||
return Ok(meta);
|
||||
}
|
||||
|
||||
// Get or create bucket metacache
|
||||
let bm = self.get_bucket(&update.bucket);
|
||||
let mut bm = bm.write().await;
|
||||
bm.update_cache_entry(update)
|
||||
}
|
||||
|
||||
/// Cleanup outdated entries
|
||||
pub async fn cleanup(&mut self) {
|
||||
const MAX_RUNNING_AGE: Duration = Duration::from_secs(3600);
|
||||
|
||||
// Cleanup buckets
|
||||
for bm in self.buckets.values() {
|
||||
let mut bm = bm.write().await;
|
||||
bm.cleanup();
|
||||
}
|
||||
|
||||
// Cleanup trash
|
||||
let mut to_remove = Vec::new();
|
||||
for (id, cache) in &self.trash {
|
||||
if let Ok(elapsed) = SystemTime::now().duration_since(cache.last_update) {
|
||||
if elapsed > MAX_RUNNING_AGE {
|
||||
to_remove.push(id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for id in to_remove {
|
||||
self.trash.remove(&id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get cache path for storage
|
||||
fn get_cache_path(bucket: &str, id: &str) -> String {
|
||||
format!("buckets/{}/.metacache/{}", bucket, id)
|
||||
}
|
||||
|
||||
/// Save cache entries to storage
|
||||
pub async fn save_cache_entries(&self, store: Arc<ECStore>, cache: &Metacache, entries: &[MetaCacheEntry]) -> Result<()> {
|
||||
let path = Self::get_cache_path(&cache.bucket, &cache.id);
|
||||
|
||||
// Create a writer that writes to store
|
||||
let mut writer = Vec::new();
|
||||
let mut cache_writer = MetacacheWriter::new(&mut writer);
|
||||
|
||||
for entry in entries {
|
||||
cache_writer.write_obj(entry).await?;
|
||||
}
|
||||
cache_writer.close().await?;
|
||||
|
||||
// Write to store
|
||||
use crate::store_api::PutObjReader;
|
||||
let mut reader = PutObjReader::from_vec(writer);
|
||||
store
|
||||
.put_object(
|
||||
RUSTFS_META_BUCKET,
|
||||
&path,
|
||||
&mut reader,
|
||||
&ObjectOptions {
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!("saved cache entries for {}: {} entries", cache.id, entries.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stream cache entries from storage
|
||||
pub async fn stream_cache_entries(
|
||||
&self,
|
||||
store: Arc<ECStore>,
|
||||
cache: &Metacache,
|
||||
marker: Option<String>,
|
||||
limit: usize,
|
||||
) -> Result<MetaCacheEntriesSorted> {
|
||||
let path = Self::get_cache_path(&cache.bucket, &cache.id);
|
||||
|
||||
// Read from store
|
||||
use crate::store_api::ObjectIO;
|
||||
use http::HeaderMap;
|
||||
let mut reader = store
|
||||
.get_object_reader(
|
||||
RUSTFS_META_BUCKET,
|
||||
&path,
|
||||
None,
|
||||
HeaderMap::new(),
|
||||
&ObjectOptions {
|
||||
no_lock: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut cache_reader = MetacacheReader::new(&mut reader);
|
||||
let mut entries = Vec::new();
|
||||
let mut last_skipped: Option<String> = None;
|
||||
|
||||
while entries.len() < limit {
|
||||
match cache_reader.peek().await {
|
||||
Ok(Some(entry)) => {
|
||||
// Skip entries before marker (not equal)
|
||||
// Marker is the last object from previous page, so we should start from the next object (> marker)
|
||||
// This matches the behavior of gather_results which uses < marker
|
||||
if let Some(ref m) = marker {
|
||||
if entry.name <= *m {
|
||||
last_skipped = Some(entry.name.clone());
|
||||
// peek() already consumed the entry, so we just continue
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Add the entry (peek already read it)
|
||||
entries.push(Some(entry));
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
warn!("error reading cache entry: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(MetaCacheEntriesSorted {
|
||||
o: rustfs_filemeta::MetaCacheEntries(entries),
|
||||
list_id: Some(cache.id.clone()),
|
||||
reuse: true,
|
||||
last_skipped_entry: last_skipped,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Global metacache manager instance
|
||||
static GLOBAL_METACACHE_MANAGER: OnceLock<Arc<RwLock<MetacacheManager>>> = OnceLock::new();
|
||||
|
||||
/// Initialize global metacache manager
|
||||
pub fn init_metacache_manager() -> Arc<RwLock<MetacacheManager>> {
|
||||
GLOBAL_METACACHE_MANAGER
|
||||
.get_or_init(|| Arc::new(RwLock::new(MetacacheManager::new())))
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Get global metacache manager
|
||||
pub fn get_metacache_manager() -> Result<Arc<RwLock<MetacacheManager>>> {
|
||||
GLOBAL_METACACHE_MANAGER
|
||||
.get()
|
||||
.cloned()
|
||||
.ok_or_else(|| Error::other("metacache manager not initialized"))
|
||||
}
|
||||
@@ -18,6 +18,7 @@ use lazy_static::lazy_static;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub mod metacache_set;
|
||||
pub mod metacache_manager;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref LIST_PATH_RAW_CANCEL_TOKEN: Arc<CancellationToken> = Arc::new(CancellationToken::new());
|
||||
|
||||
@@ -271,6 +271,9 @@ impl ECStore {
|
||||
|
||||
set_object_layer(ec.clone()).await;
|
||||
|
||||
// Initialize metacache manager
|
||||
crate::cache_value::metacache_manager::init_metacache_manager();
|
||||
|
||||
Ok(ec)
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -2366,7 +2366,7 @@ impl S3 for FS {
|
||||
bucket,
|
||||
key,
|
||||
content_length,
|
||||
content_type,
|
||||
// content_type,
|
||||
tagging,
|
||||
metadata,
|
||||
version_id,
|
||||
@@ -2443,9 +2443,9 @@ impl S3 for FS {
|
||||
|
||||
let mut metadata = metadata.unwrap_or_default();
|
||||
|
||||
if let Some(content_type) = content_type {
|
||||
metadata.insert("content-type".to_string(), content_type.to_string());
|
||||
}
|
||||
// if let Some(content_type) = content_type {
|
||||
// metadata.insert("content-type".to_string(), content_type.to_string());
|
||||
// }
|
||||
|
||||
extract_metadata_from_mime_with_object_name(&req.headers, &mut metadata, true, Some(&key));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user