From 2defaa801bc47e4561eb33e9d1c1d8dfdedf613f Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Thu, 14 Nov 2024 07:43:20 +0000 Subject: [PATCH] temp(3) Signed-off-by: junxiang Mu <1948535941@qq.com> --- ecstore/src/heal/data_scanner.rs | 153 ++++++++++++++++++------ ecstore/src/heal/data_scanner_metric.rs | 34 +++++- ecstore/src/heal/data_usage_cache.rs | 38 +++++- 3 files changed, 183 insertions(+), 42 deletions(-) diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 15916d17..b9d82f2e 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -6,7 +6,7 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::{ - atomic::{AtomicU32, AtomicU64}, + atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, }, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -28,13 +28,22 @@ use tokio::{ }; use tracing::{error, info}; +use super::{ + data_scanner_metric::globalScannerMetrics, + data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, + data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, + heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, +}; +use crate::disk::DiskAPI; +use crate::heal::data_scanner_metric::current_path_updater; +use crate::heal::data_usage::DATA_USAGE_ROOT; use crate::{ cache_value::metacache_set::{list_path_raw, ListPathRawOptions}, config::{ common::{read_config, save_config}, heal::Config, }, - disk::{error::DiskError, DiskStore, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams}, + disk::{error::DiskError, DiskInfoOptions, DiskStore, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams}, error::{Error, Result}, global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasure, GLOBAL_IsErasureSD}, heal::{ @@ -50,19 +59,12 @@ use crate::{ utils::path::{path_join, path_to_bucket_object, path_to_bucket_object_with_base_path, SLASH_SEPARATOR}, }; -use super::{ - data_scanner_metric::globalScannerMetrics, - data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, - data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, - heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, -}; - const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders. const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles. const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch. const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch. const DATA_SCANNER_COMPACT_AT_FOLDERS: u64 = DATA_SCANNER_COMPACT_AT_CHILDREN / 4; // Compact when this many subfolders in a single folder. -const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: u64 = 250_000; // Compact when this many subfolders in a single folder (even top level). +pub const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: u64 = 250_000; // Compact when this many subfolders in a single folder (even top level). const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to wait on startup and between cycles. const HEAL_DELETE_DANGLING: bool = true; @@ -333,7 +335,7 @@ struct Heal { bitrot: bool, } -struct ScannerItem { +pub struct ScannerItem { path: String, bucket: String, prefix: String, @@ -399,7 +401,9 @@ struct CachedFolder { object_heal_prob_div: u32, } -type GetSizeFn = Box Pin>>> + Send + 'static>; +pub type GetSizeFn = + Box Pin> + Send>> + Send + Sync + 'static>; +pub type UpdateCurrentPathFn = Arc Pin + Send>> + Send + Sync + 'static>; struct FolderScanner { root: String, @@ -410,15 +414,32 @@ struct FolderScanner { data_usage_scanner_debug: bool, heal_object_select: u32, scan_mode: HealScanMode, - should_heal: Arc bool + Send + Sync>, disks: Vec>, disks_quorum: usize, updates: Sender, last_update: SystemTime, - update_current_path: Arc, + update_current_path: UpdateCurrentPathFn, + skip_heal: AtomicBool, + drive: DiskStore, } impl FolderScanner { + async fn should_heal(&self) -> bool { + if self.skip_heal.load(Ordering::SeqCst) { + return false; + } + if self.heal_object_select == 0 { + return false; + } + if let Ok(info) = self.drive.disk_info(&DiskInfoOptions::default()).await { + if info.healing { + self.skip_heal.store(true, Ordering::SeqCst); + return false; + } + } + true + } + async fn scan_folder(&mut self, folder: &CachedFolder, into: &mut DataUsageEntry) -> Result<()> { let this_hash = hash_path(&folder.name); let was_compacted = into.compacted; @@ -499,7 +520,7 @@ impl FolderScanner { item.heal.enabled = this_hash.mod_alt( self.old_cache.info.next_cycle / folder.object_heal_prob_div, self.heal_object_select / folder.object_heal_prob_div, - ) && (self.should_heal)(); + ) && self.should_heal().await; item.heal.bitrot = self.scan_mode == HEAL_DEEP_SCAN; let (sz, err) = match (self.get_size)(&item).await { @@ -592,7 +613,7 @@ impl FolderScanner { .replace_hashed(&h, &Some(this_hash.clone()), &DataUsageEntry::default()); } } - (self.update_current_path)(&folder.name); + (self.update_current_path)(&folder.name).await; scan(folder, into, self).await; // Add new folders if this is new and we don't have existing. if !into.compacted { @@ -617,12 +638,12 @@ impl FolderScanner { continue; } } - (self.update_current_path)(&folder.name); + (self.update_current_path)(&folder.name).await; scan(folder, into, self).await; } // Scan for healing - if abandoned_children.is_empty() || !(self.should_heal)() { + if abandoned_children.is_empty() || !self.should_heal().await { break; } @@ -649,12 +670,12 @@ impl FolderScanner { }; for k in abandoned_children.iter() { - if !(self.should_heal)() { + if !self.should_heal().await { break; } let (bucket, prefix) = path_to_bucket_object(k); - (self.update_current_path)(k); + (self.update_current_path)(k).await; if bucket != resolver.bucket { let _ = bg_seq @@ -675,11 +696,10 @@ impl FolderScanner { let found_objs = Arc::new(RwLock::new(false)); let found_objs_clone = found_objs.clone(); let (tx, rx) = broadcast::channel(1); - let tx_partial = tx.clone(); + // let tx_partial = tx.clone(); let tx_finished = tx.clone(); let update_current_path_agreed = self.update_current_path.clone(); let update_current_path_partial = self.update_current_path.clone(); - let should_heal_clone = self.should_heal.clone(); let resolver_clone = resolver.clone(); let bg_seq_clone = bg_seq.clone(); let lopts = ListPathRawOptions { @@ -693,24 +713,24 @@ impl FolderScanner { Box::pin({ let update_current_path_agreed = update_current_path_agreed.clone(); async move { - update_current_path_agreed(&entry.name); + update_current_path_agreed(&entry.name).await; } }) })), partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { Box::pin({ let update_current_path_partial = update_current_path_partial.clone(); - let should_heal_partial = should_heal_clone.clone(); - let tx_partial = tx_partial.clone(); + // let tx_partial = tx_partial.clone(); let resolver_partial = resolver_clone.clone(); let bucket_partial = bucket.clone(); let found_objs_clone = found_objs_clone.clone(); let bg_seq_partial = bg_seq_clone.clone(); async move { - if !should_heal_partial() { - let _ = tx_partial.send(true); - return; - } + // Todo + // if !fs.should_heal().await { + // let _ = tx_partial.send(true); + // return; + // } let entry = match entries.resolve(resolver_partial) { Ok(Some(entry)) => entry, _ => match entries.first_found() { @@ -719,7 +739,7 @@ impl FolderScanner { }, }; - update_current_path_partial(&entry.name); + update_current_path_partial(&entry.name).await; let mut custom = HashMap::new(); if entry.is_dir() { return; @@ -766,8 +786,8 @@ impl FolderScanner { .queue_heal_task( HealSource { bucket: bucket_partial.clone(), - object: entry.name.clone(), - version_id: "".to_string(), + object: fiv.name.clone(), + version_id: ver.version_id.map_or("".to_string(), |ver_id| ver_id.to_string()), ..Default::default() }, HEAL_ITEM_OBJECT.to_string(), @@ -787,7 +807,7 @@ impl FolderScanner { } }) })), - finished: Some(Box::new(move |err: &[Option]| { + finished: Some(Box::new(move |_: &[Option]| { Box::pin({ let tx_finished = tx_finished.clone(); async move { @@ -820,12 +840,11 @@ impl FolderScanner { .size_recursive(&this_hash.key()) .unwrap_or(DataUsageEntry::default()); flat.compacted = true; - let mut compact = false; - if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() { - compact = true; + let compact = if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() { + true } else { // Compact if we only have objects as children... - compact = true; + let mut compact = true; for k in into.children.iter() { if let Some(v) = self.new_cache.cache.get(k) { if !v.children.is_empty() || v.objects > 1 { @@ -834,7 +853,8 @@ impl FolderScanner { } } } - } + compact + }; if compact { self.new_cache.delete_recursive(&this_hash); self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat); @@ -930,3 +950,60 @@ pub fn has_active_rules(config: &ReplicationConfiguration, prefix: &str, recursi } false } + +pub async fn scan_data_folder( + disks: &[Option], + drive: &DiskStore, + cache: &DataUsageCache, + get_size_fn: GetSizeFn, + heal_scan_mode: HealScanMode, +) -> Result { + if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT { + return Err(Error::from_string("internal error: root scan attempted")); + } + + let base_path = drive.to_string(); + let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name); + let mut s = FolderScanner { + root: base_path, + get_size: get_size_fn, + old_cache: cache.clone(), + new_cache: DataUsageCache::default(), + update_cache: DataUsageCache::default(), + data_usage_scanner_debug: false, + heal_object_select: 0, + scan_mode: heal_scan_mode, + updates: cache.info.updates.clone().unwrap(), + last_update: SystemTime::now(), + update_current_path: update_path, + disks: disks.to_vec(), + disks_quorum: disks.len() / 2, + skip_heal: if *GLOBAL_IsErasure.read().await || cache.info.skip_healing { + AtomicBool::new(true) + } else { + AtomicBool::new(false) + }, + drive: drive.clone(), + }; + + if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing { + s.heal_object_select = HEAL_OBJECT_SELECT_PROB as u32; + } + + let mut root = DataUsageEntry::default(); + let folder = CachedFolder { + name: cache.info.name.clone(), + object_heal_prob_div: 1, + parent: DataUsageHash("".to_string()), + }; + + if s.scan_folder(&folder, &mut root).await.is_err() { + close_disk().await; + } + s.new_cache + .force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN.try_into().unwrap()); + s.new_cache.info.last_update = SystemTime::now(); + s.new_cache.info.next_cycle = cache.info.next_cycle; + close_disk().await; + Ok(s.new_cache) +} diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index a5e66efe..4947aa81 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -1,3 +1,6 @@ +use lazy_static::lazy_static; +use std::future::Future; +use std::pin::Pin; use std::{ collections::HashMap, sync::{ @@ -6,11 +9,9 @@ use std::{ }, time::SystemTime, }; - -use lazy_static::lazy_static; use tokio::sync::RwLock; -use super::data_scanner::CurrentScannerCycle; +use super::data_scanner::{CurrentScannerCycle, UpdateCurrentPathFn}; lazy_static! { pub static ref globalScannerMetrics: Arc> = Arc::new(RwLock::new(ScannerMetrics::new())); @@ -55,6 +56,7 @@ pub enum ScannerMetric { pub struct ScannerMetrics { operations: Vec, cycle_info: RwLock>, + current_paths: HashMap, } impl ScannerMetrics { @@ -62,6 +64,7 @@ impl ScannerMetrics { Self { operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU32::new(0)).collect(), cycle_info: RwLock::new(None), + current_paths: HashMap::new(), } } @@ -75,3 +78,28 @@ impl ScannerMetrics { *self.cycle_info.write().await = c; } } + +pub type CloseDiskFn = Arc Pin + Send>> + Send + 'static>; +pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { + let disk_1 = disk.to_string(); + let disk_2 = disk.to_string(); + ( + Arc::new(move |path: &str| { + let disk_inner = disk_1.clone(); + let path = path.to_string(); + Box::pin(async move { + globalScannerMetrics + .write() + .await + .current_paths + .insert(disk_inner, path.to_string()); + }) + }), + Arc::new(move || { + let disk_inner = disk_2.clone(); + Box::pin(async move { + globalScannerMetrics.write().await.current_paths.remove(&disk_inner); + }) + }), + ) +} diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index 4ebc3ced..820bc858 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -22,7 +22,7 @@ use std::u64; use tokio::sync::mpsc::Sender; use tokio::time::sleep; -use super::data_scanner::SizeSummary; +use super::data_scanner::{SizeSummary, DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS}; use super::data_usage::DATA_USAGE_ROOT; // DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals @@ -559,6 +559,33 @@ impl DataUsageCache { } } + pub fn force_compact(&mut self, limit: usize) { + if self.cache.len() < limit { + return; + } + let top = hash_path(&self.info.name).key(); + let top_e = match self.find(&top) { + Some(e) => e, + None => return, + }; + if top_e.children.len() > DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS.try_into().unwrap() { + self.reduce_children_of(&hash_path(&self.info.name), limit, true); + } + if self.cache.len() <= limit { + return; + } + + let mut found = HashSet::new(); + found.insert(top); + mark(self, &top_e, &mut found); + self.cache.retain(|k, _| { + if !found.contains(k) { + return false; + } + true + }); + } + pub fn reduce_children_of(&mut self, path: &DataUsageHash, limit: usize, compact_self: bool) { let e = match self.cache.get(&path.key()) { Some(e) => e, @@ -669,6 +696,15 @@ fn add(data_usage_cache: &DataUsageCache, path: &DataUsageHash, leaves: &mut Vec } } +fn mark(duc: &DataUsageCache, entry: &DataUsageEntry, found: &mut HashSet) { + for k in entry.children.iter() { + found.insert(k.to_string()); + if let Some(ch) = duc.cache.get(k) { + mark(duc, ch, found); + } + } +} + #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct DataUsageHash(pub String);