mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
Fix data usage cache and scanner (#2074)
This commit is contained in:
@@ -15,12 +15,8 @@
|
||||
pub mod local_snapshot;
|
||||
|
||||
use crate::{
|
||||
bucket::metadata_sys::get_replication_config,
|
||||
config::com::read_config,
|
||||
disk::DiskAPI,
|
||||
error::Error,
|
||||
store::ECStore,
|
||||
store_api::{BucketOperations, ListOperations},
|
||||
bucket::metadata_sys::get_replication_config, config::com::read_config, disk::DiskAPI, error::Error, store::ECStore,
|
||||
store_api::ListOperations,
|
||||
};
|
||||
pub use local_snapshot::{
|
||||
DATA_USAGE_DIR, DATA_USAGE_STATE_DIR, LOCAL_USAGE_SNAPSHOT_VERSION, LocalUsageSnapshot, LocalUsageSnapshotMeta,
|
||||
@@ -38,7 +34,7 @@ use std::{
|
||||
};
|
||||
use tokio::fs;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
// Data usage storage constants
|
||||
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR;
|
||||
@@ -112,32 +108,24 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!("Failed to read data usage info from backend: {}", e);
|
||||
if e == Error::ConfigNotFound {
|
||||
info!("Data usage config not found, building basic statistics");
|
||||
return build_basic_data_usage_info(store).await;
|
||||
|
||||
match read_config(store.clone(), format!("{}.bkp", DATA_USAGE_OBJ_NAME_PATH.as_str()).as_str()).await {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
if e == Error::ConfigNotFound {
|
||||
return Ok(DataUsageInfo::default());
|
||||
}
|
||||
error!("Failed to read data usage info from backend: {}", e);
|
||||
return Err(Error::other(e));
|
||||
}
|
||||
}
|
||||
return Err(Error::other(e));
|
||||
}
|
||||
};
|
||||
|
||||
let mut data_usage_info: DataUsageInfo =
|
||||
serde_json::from_slice(&buf).map_err(|e| Error::other(format!("Failed to deserialize data usage info: {e}")))?;
|
||||
|
||||
info!("Loaded data usage info from backend with {} buckets", data_usage_info.buckets_count);
|
||||
|
||||
// Validate data and supplement if empty
|
||||
if data_usage_info.buckets_count == 0 || data_usage_info.buckets_usage.is_empty() {
|
||||
warn!("Loaded data is empty, supplementing with basic statistics");
|
||||
if let Ok(basic_info) = build_basic_data_usage_info(store.clone()).await {
|
||||
data_usage_info.buckets_count = basic_info.buckets_count;
|
||||
data_usage_info.buckets_usage = basic_info.buckets_usage;
|
||||
data_usage_info.bucket_sizes = basic_info.bucket_sizes;
|
||||
data_usage_info.objects_total_count = basic_info.objects_total_count;
|
||||
data_usage_info.objects_total_size = basic_info.objects_total_size;
|
||||
data_usage_info.last_update = basic_info.last_update;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle backward compatibility
|
||||
if data_usage_info.buckets_usage.is_empty() {
|
||||
data_usage_info.buckets_usage = data_usage_info
|
||||
@@ -502,57 +490,6 @@ pub async fn sync_memory_cache_with_backend() -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build basic data usage info with real object counts
|
||||
pub async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
|
||||
let mut data_usage_info = DataUsageInfo::default();
|
||||
|
||||
// Get bucket list
|
||||
match store.list_bucket(&crate::store_api::BucketOptions::default()).await {
|
||||
Ok(buckets) => {
|
||||
data_usage_info.buckets_count = buckets.len() as u64;
|
||||
data_usage_info.last_update = Some(SystemTime::now());
|
||||
|
||||
let mut total_objects = 0u64;
|
||||
let mut total_versions = 0u64;
|
||||
let mut total_size = 0u64;
|
||||
let mut total_delete_markers = 0u64;
|
||||
|
||||
for bucket_info in buckets {
|
||||
if bucket_info.name.starts_with('.') {
|
||||
continue; // Skip system buckets
|
||||
}
|
||||
|
||||
match compute_bucket_usage(store.clone(), &bucket_info.name).await {
|
||||
Ok(bucket_usage) => {
|
||||
total_objects = total_objects.saturating_add(bucket_usage.objects_count);
|
||||
total_versions = total_versions.saturating_add(bucket_usage.versions_count);
|
||||
total_size = total_size.saturating_add(bucket_usage.size);
|
||||
total_delete_markers = total_delete_markers.saturating_add(bucket_usage.delete_markers_count);
|
||||
|
||||
data_usage_info
|
||||
.buckets_usage
|
||||
.insert(bucket_info.name.clone(), bucket_usage.clone());
|
||||
data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_usage.size);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data_usage_info.objects_total_count = total_objects;
|
||||
data_usage_info.versions_total_count = total_versions;
|
||||
data_usage_info.objects_total_size = total_size;
|
||||
data_usage_info.delete_markers_total_count = total_delete_markers;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to list buckets for basic data usage info: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(data_usage_info)
|
||||
}
|
||||
|
||||
/// Create a data usage cache entry from size summary
|
||||
pub fn create_cache_entry_from_summary(summary: &SizeSummary) -> DataUsageEntry {
|
||||
let mut entry = DataUsageEntry::default();
|
||||
|
||||
@@ -1109,10 +1109,13 @@ impl DataUsageCache {
|
||||
let mut buf = Vec::new();
|
||||
self.serialize(&mut rmp_serde::Serializer::new(&mut buf))?;
|
||||
|
||||
let path = path_join_buf(&[BUCKET_META_PREFIX, name]);
|
||||
|
||||
let store_clone = store.clone();
|
||||
let buf_clone = buf.clone();
|
||||
let path_clone = path.clone();
|
||||
let res = timeout(Duration::from_secs(5), async move {
|
||||
save_config(store_clone, name, buf_clone).await?;
|
||||
save_config(store_clone, &path_clone, buf_clone).await?;
|
||||
Ok::<(), StorageError>(())
|
||||
})
|
||||
.await
|
||||
@@ -1125,8 +1128,9 @@ impl DataUsageCache {
|
||||
|
||||
let store_clone = store.clone();
|
||||
let backup_name = format!("{name}.bkp");
|
||||
let backup_path = path_join_buf(&[BUCKET_META_PREFIX, &backup_name]);
|
||||
let res = timeout(Duration::from_secs(5), async move {
|
||||
save_config(store_clone, backup_name.as_str(), buf).await?;
|
||||
save_config(store_clone, &backup_path, buf).await?;
|
||||
Ok::<(), StorageError>(())
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -259,8 +259,6 @@ pub async fn store_data_usage_in_backend(
|
||||
break;
|
||||
}
|
||||
|
||||
debug!("store_data_usage_in_backend: received data usage info: {:?}", &data_usage_info);
|
||||
|
||||
// Serialize to JSON
|
||||
let data = match serde_json::to_vec(&data_usage_info) {
|
||||
Ok(data) => data,
|
||||
@@ -272,7 +270,7 @@ pub async fn store_data_usage_in_backend(
|
||||
|
||||
// Save a backup every 10th update
|
||||
if attempts > 10 {
|
||||
let backup_path = format!("{:?}.bkp", &DATA_USAGE_OBJ_NAME_PATH);
|
||||
let backup_path = format!("{}.bkp", DATA_USAGE_OBJ_NAME_PATH.as_str());
|
||||
if let Err(e) = save_config(storeapi.clone(), &backup_path, data.clone()).await {
|
||||
warn!("Failed to save data usage backup to {}: {}", backup_path, e);
|
||||
}
|
||||
@@ -280,8 +278,8 @@ pub async fn store_data_usage_in_backend(
|
||||
}
|
||||
|
||||
// Save main configuration
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_OBJ_NAME_PATH, data).await {
|
||||
error!("Failed to save data usage info to {:?}: {e}", &DATA_USAGE_OBJ_NAME_PATH);
|
||||
if let Err(e) = save_config(storeapi.clone(), DATA_USAGE_OBJ_NAME_PATH.as_str(), data).await {
|
||||
error!("Failed to save data usage info to {}: {e}", DATA_USAGE_OBJ_NAME_PATH.as_str());
|
||||
}
|
||||
|
||||
attempts += 1;
|
||||
|
||||
@@ -177,17 +177,17 @@ impl ScannerIO for ECStore {
|
||||
let mut all_merged = DataUsageCache::default();
|
||||
for result in results.iter() {
|
||||
if result.info.last_update.is_none() {
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
all_merged.merge(result);
|
||||
}
|
||||
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update
|
||||
&& let Err(e) = updates
|
||||
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
|
||||
.await {
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update {
|
||||
let dui = all_merged.dui(&all_merged.info.name, &all_buckets_clone);
|
||||
if let Err(e) = updates.send(dui).await {
|
||||
error!("Failed to send data usage info: {}", e);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
@@ -195,15 +195,14 @@ impl ScannerIO for ECStore {
|
||||
let mut all_merged = DataUsageCache::default();
|
||||
for result in results.iter() {
|
||||
if result.info.last_update.is_none() {
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
all_merged.merge(result);
|
||||
}
|
||||
|
||||
if all_merged.root().is_some() && all_merged.info.last_update.unwrap() > last_update {
|
||||
if let Err(e) = updates
|
||||
.send(all_merged.dui(&all_merged.info.name, &all_buckets_clone))
|
||||
.await {
|
||||
let dui = all_merged.dui(&all_merged.info.name, &all_buckets_clone);
|
||||
if let Err(e) = updates.send(dui).await {
|
||||
error!("Failed to send data usage info: {}", e);
|
||||
}
|
||||
last_update = all_merged.info.last_update.unwrap();
|
||||
@@ -299,7 +298,7 @@ impl ScannerIOCache for SetDisks {
|
||||
|
||||
let cache = cache_mutex_clone.lock().await;
|
||||
if cache.info.last_update == last_update {
|
||||
continue;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = cache.save(store_clone.clone(), DATA_USAGE_CACHE_NAME).await {
|
||||
|
||||
Reference in New Issue
Block a user