Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-11-14 07:43:20 +00:00
parent caad7a38a4
commit 2defaa801b
3 changed files with 183 additions and 42 deletions

View File

@@ -6,7 +6,7 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
sync::{
atomic::{AtomicU32, AtomicU64},
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
@@ -28,13 +28,22 @@ use tokio::{
};
use tracing::{error, info};
use super::{
data_scanner_metric::globalScannerMetrics,
data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH},
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
};
use crate::disk::DiskAPI;
use crate::heal::data_scanner_metric::current_path_updater;
use crate::heal::data_usage::DATA_USAGE_ROOT;
use crate::{
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
config::{
common::{read_config, save_config},
heal::Config,
},
disk::{error::DiskError, DiskStore, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams},
disk::{error::DiskError, DiskInfoOptions, DiskStore, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams},
error::{Error, Result},
global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasure, GLOBAL_IsErasureSD},
heal::{
@@ -50,19 +59,12 @@ use crate::{
utils::path::{path_join, path_to_bucket_object, path_to_bucket_object_with_base_path, SLASH_SEPARATOR},
};
use super::{
data_scanner_metric::globalScannerMetrics,
data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH},
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
};
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch.
const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch.
const DATA_SCANNER_COMPACT_AT_FOLDERS: u64 = DATA_SCANNER_COMPACT_AT_CHILDREN / 4; // Compact when this many subfolders in a single folder.
const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: u64 = 250_000; // Compact when this many subfolders in a single folder (even top level).
pub const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: u64 = 250_000; // Compact when this many subfolders in a single folder (even top level).
const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to wait on startup and between cycles.
const HEAL_DELETE_DANGLING: bool = true;
@@ -333,7 +335,7 @@ struct Heal {
bitrot: bool,
}
struct ScannerItem {
pub struct ScannerItem {
path: String,
bucket: String,
prefix: String,
@@ -399,7 +401,9 @@ struct CachedFolder {
object_heal_prob_div: u32,
}
type GetSizeFn = Box<dyn Fn(&ScannerItem) -> Pin<Box<dyn Future<Output = Result<SizeSummary>>>> + Send + 'static>;
pub type GetSizeFn =
Box<dyn Fn(&ScannerItem) -> Pin<Box<dyn Future<Output = Result<SizeSummary>> + Send>> + Send + Sync + 'static>;
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
struct FolderScanner {
root: String,
@@ -410,15 +414,32 @@ struct FolderScanner {
data_usage_scanner_debug: bool,
heal_object_select: u32,
scan_mode: HealScanMode,
should_heal: Arc<dyn Fn() -> bool + Send + Sync>,
disks: Vec<Option<DiskStore>>,
disks_quorum: usize,
updates: Sender<DataUsageEntry>,
last_update: SystemTime,
update_current_path: Arc<dyn Fn(&str) + Send + Sync>,
update_current_path: UpdateCurrentPathFn,
skip_heal: AtomicBool,
drive: DiskStore,
}
impl FolderScanner {
async fn should_heal(&self) -> bool {
if self.skip_heal.load(Ordering::SeqCst) {
return false;
}
if self.heal_object_select == 0 {
return false;
}
if let Ok(info) = self.drive.disk_info(&DiskInfoOptions::default()).await {
if info.healing {
self.skip_heal.store(true, Ordering::SeqCst);
return false;
}
}
true
}
async fn scan_folder(&mut self, folder: &CachedFolder, into: &mut DataUsageEntry) -> Result<()> {
let this_hash = hash_path(&folder.name);
let was_compacted = into.compacted;
@@ -499,7 +520,7 @@ impl FolderScanner {
item.heal.enabled = this_hash.mod_alt(
self.old_cache.info.next_cycle / folder.object_heal_prob_div,
self.heal_object_select / folder.object_heal_prob_div,
) && (self.should_heal)();
) && self.should_heal().await;
item.heal.bitrot = self.scan_mode == HEAL_DEEP_SCAN;
let (sz, err) = match (self.get_size)(&item).await {
@@ -592,7 +613,7 @@ impl FolderScanner {
.replace_hashed(&h, &Some(this_hash.clone()), &DataUsageEntry::default());
}
}
(self.update_current_path)(&folder.name);
(self.update_current_path)(&folder.name).await;
scan(folder, into, self).await;
// Add new folders if this is new and we don't have existing.
if !into.compacted {
@@ -617,12 +638,12 @@ impl FolderScanner {
continue;
}
}
(self.update_current_path)(&folder.name);
(self.update_current_path)(&folder.name).await;
scan(folder, into, self).await;
}
// Scan for healing
if abandoned_children.is_empty() || !(self.should_heal)() {
if abandoned_children.is_empty() || !self.should_heal().await {
break;
}
@@ -649,12 +670,12 @@ impl FolderScanner {
};
for k in abandoned_children.iter() {
if !(self.should_heal)() {
if !self.should_heal().await {
break;
}
let (bucket, prefix) = path_to_bucket_object(k);
(self.update_current_path)(k);
(self.update_current_path)(k).await;
if bucket != resolver.bucket {
let _ = bg_seq
@@ -675,11 +696,10 @@ impl FolderScanner {
let found_objs = Arc::new(RwLock::new(false));
let found_objs_clone = found_objs.clone();
let (tx, rx) = broadcast::channel(1);
let tx_partial = tx.clone();
// let tx_partial = tx.clone();
let tx_finished = tx.clone();
let update_current_path_agreed = self.update_current_path.clone();
let update_current_path_partial = self.update_current_path.clone();
let should_heal_clone = self.should_heal.clone();
let resolver_clone = resolver.clone();
let bg_seq_clone = bg_seq.clone();
let lopts = ListPathRawOptions {
@@ -693,24 +713,24 @@ impl FolderScanner {
Box::pin({
let update_current_path_agreed = update_current_path_agreed.clone();
async move {
update_current_path_agreed(&entry.name);
update_current_path_agreed(&entry.name).await;
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<Error>]| {
Box::pin({
let update_current_path_partial = update_current_path_partial.clone();
let should_heal_partial = should_heal_clone.clone();
let tx_partial = tx_partial.clone();
// let tx_partial = tx_partial.clone();
let resolver_partial = resolver_clone.clone();
let bucket_partial = bucket.clone();
let found_objs_clone = found_objs_clone.clone();
let bg_seq_partial = bg_seq_clone.clone();
async move {
if !should_heal_partial() {
let _ = tx_partial.send(true);
return;
}
// Todo
// if !fs.should_heal().await {
// let _ = tx_partial.send(true);
// return;
// }
let entry = match entries.resolve(resolver_partial) {
Ok(Some(entry)) => entry,
_ => match entries.first_found() {
@@ -719,7 +739,7 @@ impl FolderScanner {
},
};
update_current_path_partial(&entry.name);
update_current_path_partial(&entry.name).await;
let mut custom = HashMap::new();
if entry.is_dir() {
return;
@@ -766,8 +786,8 @@ impl FolderScanner {
.queue_heal_task(
HealSource {
bucket: bucket_partial.clone(),
object: entry.name.clone(),
version_id: "".to_string(),
object: fiv.name.clone(),
version_id: ver.version_id.map_or("".to_string(), |ver_id| ver_id.to_string()),
..Default::default()
},
HEAL_ITEM_OBJECT.to_string(),
@@ -787,7 +807,7 @@ impl FolderScanner {
}
})
})),
finished: Some(Box::new(move |err: &[Option<Error>]| {
finished: Some(Box::new(move |_: &[Option<Error>]| {
Box::pin({
let tx_finished = tx_finished.clone();
async move {
@@ -820,12 +840,11 @@ impl FolderScanner {
.size_recursive(&this_hash.key())
.unwrap_or(DataUsageEntry::default());
flat.compacted = true;
let mut compact = false;
if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() {
compact = true;
let compact = if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() {
true
} else {
// Compact if we only have objects as children...
compact = true;
let mut compact = true;
for k in into.children.iter() {
if let Some(v) = self.new_cache.cache.get(k) {
if !v.children.is_empty() || v.objects > 1 {
@@ -834,7 +853,8 @@ impl FolderScanner {
}
}
}
}
compact
};
if compact {
self.new_cache.delete_recursive(&this_hash);
self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat);
@@ -930,3 +950,60 @@ pub fn has_active_rules(config: &ReplicationConfiguration, prefix: &str, recursi
}
false
}
pub async fn scan_data_folder(
disks: &[Option<DiskStore>],
drive: &DiskStore,
cache: &DataUsageCache,
get_size_fn: GetSizeFn,
heal_scan_mode: HealScanMode,
) -> Result<DataUsageCache> {
if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT {
return Err(Error::from_string("internal error: root scan attempted"));
}
let base_path = drive.to_string();
let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name);
let mut s = FolderScanner {
root: base_path,
get_size: get_size_fn,
old_cache: cache.clone(),
new_cache: DataUsageCache::default(),
update_cache: DataUsageCache::default(),
data_usage_scanner_debug: false,
heal_object_select: 0,
scan_mode: heal_scan_mode,
updates: cache.info.updates.clone().unwrap(),
last_update: SystemTime::now(),
update_current_path: update_path,
disks: disks.to_vec(),
disks_quorum: disks.len() / 2,
skip_heal: if *GLOBAL_IsErasure.read().await || cache.info.skip_healing {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
},
drive: drive.clone(),
};
if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing {
s.heal_object_select = HEAL_OBJECT_SELECT_PROB as u32;
}
let mut root = DataUsageEntry::default();
let folder = CachedFolder {
name: cache.info.name.clone(),
object_heal_prob_div: 1,
parent: DataUsageHash("".to_string()),
};
if s.scan_folder(&folder, &mut root).await.is_err() {
close_disk().await;
}
s.new_cache
.force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN.try_into().unwrap());
s.new_cache.info.last_update = SystemTime::now();
s.new_cache.info.next_cycle = cache.info.next_cycle;
close_disk().await;
Ok(s.new_cache)
}

View File

@@ -1,3 +1,6 @@
use lazy_static::lazy_static;
use std::future::Future;
use std::pin::Pin;
use std::{
collections::HashMap,
sync::{
@@ -6,11 +9,9 @@ use std::{
},
time::SystemTime,
};
use lazy_static::lazy_static;
use tokio::sync::RwLock;
use super::data_scanner::CurrentScannerCycle;
use super::data_scanner::{CurrentScannerCycle, UpdateCurrentPathFn};
lazy_static! {
pub static ref globalScannerMetrics: Arc<RwLock<ScannerMetrics>> = Arc::new(RwLock::new(ScannerMetrics::new()));
@@ -55,6 +56,7 @@ pub enum ScannerMetric {
pub struct ScannerMetrics {
operations: Vec<AtomicU32>,
cycle_info: RwLock<Option<CurrentScannerCycle>>,
current_paths: HashMap<String, String>,
}
impl ScannerMetrics {
@@ -62,6 +64,7 @@ impl ScannerMetrics {
Self {
operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU32::new(0)).collect(),
cycle_info: RwLock::new(None),
current_paths: HashMap::new(),
}
}
@@ -75,3 +78,28 @@ impl ScannerMetrics {
*self.cycle_info.write().await = c;
}
}
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let disk_1 = disk.to_string();
let disk_2 = disk.to_string();
(
Arc::new(move |path: &str| {
let disk_inner = disk_1.clone();
let path = path.to_string();
Box::pin(async move {
globalScannerMetrics
.write()
.await
.current_paths
.insert(disk_inner, path.to_string());
})
}),
Arc::new(move || {
let disk_inner = disk_2.clone();
Box::pin(async move {
globalScannerMetrics.write().await.current_paths.remove(&disk_inner);
})
}),
)
}

View File

@@ -22,7 +22,7 @@ use std::u64;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use super::data_scanner::SizeSummary;
use super::data_scanner::{SizeSummary, DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS};
use super::data_usage::DATA_USAGE_ROOT;
// DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals
@@ -559,6 +559,33 @@ impl DataUsageCache {
}
}
pub fn force_compact(&mut self, limit: usize) {
if self.cache.len() < limit {
return;
}
let top = hash_path(&self.info.name).key();
let top_e = match self.find(&top) {
Some(e) => e,
None => return,
};
if top_e.children.len() > DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS.try_into().unwrap() {
self.reduce_children_of(&hash_path(&self.info.name), limit, true);
}
if self.cache.len() <= limit {
return;
}
let mut found = HashSet::new();
found.insert(top);
mark(self, &top_e, &mut found);
self.cache.retain(|k, _| {
if !found.contains(k) {
return false;
}
true
});
}
pub fn reduce_children_of(&mut self, path: &DataUsageHash, limit: usize, compact_self: bool) {
let e = match self.cache.get(&path.key()) {
Some(e) => e,
@@ -669,6 +696,15 @@ fn add(data_usage_cache: &DataUsageCache, path: &DataUsageHash, leaves: &mut Vec
}
}
fn mark(duc: &DataUsageCache, entry: &DataUsageEntry, found: &mut HashSet<String>) {
for k in entry.children.iter() {
found.insert(k.to_string());
if let Some(ch) = duc.cache.get(k) {
mark(duc, ch, found);
}
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DataUsageHash(pub String);