From be7d1ab0cce40622b6544affd244bca778b154cb Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Wed, 13 Nov 2024 13:10:51 +0000 Subject: [PATCH] tmp(2) Signed-off-by: junxiang Mu <1948535941@qq.com> --- Cargo.lock | 6 + common/common/src/lib.rs | 19 + ecstore/Cargo.toml | 1 + ecstore/src/bucket/metadata_sys.rs | 2 +- ecstore/src/cache_value/metacache_set.rs | 25 +- ecstore/src/disk/local.rs | 38 ++ ecstore/src/disk/mod.rs | 10 + ecstore/src/disk/remote.rs | 14 +- ecstore/src/heal/data_scanner.rs | 656 ++++++++++++++++++++++- ecstore/src/heal/data_usage_cache.rs | 515 +++++++++++++++++- ecstore/src/heal/error.rs | 1 + ecstore/src/heal/heal_ops.rs | 24 +- ecstore/src/heal/mod.rs | 1 + ecstore/src/peer.rs | 2 +- ecstore/src/utils/path.rs | 13 + 15 files changed, 1272 insertions(+), 55 deletions(-) create mode 100644 ecstore/src/heal/error.rs diff --git a/Cargo.lock b/Cargo.lock index 5a24b8a1..880e5d49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -336,6 +336,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" + [[package]] name = "bytestring" version = "1.3.1" diff --git a/common/common/src/lib.rs b/common/common/src/lib.rs index b7a0605d..f3b5db32 100644 --- a/common/common/src/lib.rs +++ b/common/common/src/lib.rs @@ -1,2 +1,21 @@ pub mod error; pub mod globals; + +/// Defers evaluation of a block of code until the end of the scope. +#[macro_export] macro_rules! defer { + ($($body:tt)*) => { + let _guard = { + pub struct Guard(Option); + + impl Drop for Guard { + fn drop(&mut self) { + (self.0).take().map(|f| f()); + } + } + + Guard(Some(|| { + let _ = { $($body)* }; + })) + }; + }; +} diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 1208304d..8b802890 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -13,6 +13,7 @@ async-trait.workspace = true backon.workspace = true blake2 = "0.10.6" bytes.workspace = true +bytesize = "1.3.0" common.workspace = true reader.workspace = true glob = "0.3.1" diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 764a57b5..03ece0d3 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -28,7 +28,7 @@ use super::target::BucketTargets; use lazy_static::lazy_static; lazy_static! { - static ref GLOBAL_BucketMetadataSys: Arc> = Arc::new(RwLock::new(BucketMetadataSys::new())); + pub static ref GLOBAL_BucketMetadataSys: Arc> = Arc::new(RwLock::new(BucketMetadataSys::new())); } pub async fn init_bucket_metadata_sys(api: ECStore, buckets: Vec) { diff --git a/ecstore/src/cache_value/metacache_set.rs b/ecstore/src/cache_value/metacache_set.rs index 42b81a20..18b698a9 100644 --- a/ecstore/src/cache_value/metacache_set.rs +++ b/ecstore/src/cache_value/metacache_set.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ spawn, @@ -14,6 +14,10 @@ use crate::{ error::{Error, Result}, }; +type AgreedFn = Box Pin>> + Send + 'static>; +type PartialFn = Box]) -> Pin>> + Send + 'static>; +type FinishedFn = Box]) -> Pin + Send>> + Send + 'static>; + #[derive(Default)] pub struct ListPathRawOptions { pub disks: Vec>, @@ -26,9 +30,12 @@ pub struct ListPathRawOptions { pub min_disks: usize, pub report_not_found: bool, pub per_disk_limit: i32, - pub agreed: Option>, - pub partial: Option]) + Send + Sync>>, - pub finished: Option]) + Send + Sync>>, + pub agreed: Option, + pub partial: Option, + pub finished: Option, + // pub agreed: Option>, + // pub partial: Option]) + Send + Sync>>, + // pub finished: Option]) + Send + Sync>>, } impl Clone for ListPathRawOptions { @@ -185,8 +192,8 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } if has_err > 0 && has_err > opts.disks.len() - opts.min_disks { - if let Some(finished_fn) = opts.finished.clone() { - finished_fn(&errs); + if let Some(finished_fn) = opts.finished.as_ref() { + finished_fn(&errs).await; } let mut combined_err = Vec::new(); errs.iter().zip(opts.disks.iter()).for_each(|(err, disk)| match (err, disk) { @@ -205,7 +212,7 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - // Break if all at EOF or error. if at_eof + has_err == readers.len() { if has_err > 0 { - if let Some(finished_fn) = opts.finished.clone() { + if let Some(finished_fn) = opts.finished.as_ref() { finished_fn(&errs); } break; @@ -213,13 +220,13 @@ pub async fn list_path_raw(mut rx: B_Receiver, opts: ListPathRawOptions) - } if agree == readers.len() { - if let Some(agreed_fn) = opts.agreed.clone() { + if let Some(agreed_fn) = opts.agreed.as_ref() { agreed_fn(current); } continue; } - if let Some(partial_fn) = opts.partial.clone() { + if let Some(partial_fn) = opts.partial.as_ref() { partial_fn(MetaCacheEntries(top_entries), &errs); } } diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index d34687c0..75ebc264 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -9,6 +9,7 @@ use super::{ UpdateMetadataOpts, VolumeInfo, WalkDirOptions, }; use crate::bitrot::bitrot_verify; +use crate::bucket::metadata_sys::GLOBAL_BucketMetadataSys; use crate::cache_value::cache::{Cache, Opts}; use crate::disk::error::{ convert_access_error, is_sys_err_handle_invalid, is_sys_err_invalid_arg, is_sys_err_is_dir, is_sys_err_not_dir, @@ -18,6 +19,10 @@ use crate::disk::os::{check_path_length, is_empty_dir}; use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE}; use crate::error::{Error, Result}; use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; +use crate::heal::data_scanner::has_active_rules; +use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry}; +use crate::heal::heal_commands::HealScanMode; +use crate::new_object_layer_fn; use crate::set_disk::{ conv_part_err_to_int, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, CHECK_PART_VOLUME_NOT_FOUND, @@ -31,7 +36,9 @@ use crate::{ store_api::{FileInfo, RawFileInfo}, utils, }; +use common::defer; use path_absolutize::Absolutize; +use s3s::dto::{ReplicationConfiguration, ReplicationRuleStatus}; use std::collections::HashSet; use std::fmt::Debug; use std::io::Cursor; @@ -47,6 +54,7 @@ use time::OffsetDateTime; use tokio::fs::{self, File}; use tokio::io::{AsyncReadExt, AsyncWriteExt, ErrorKind}; use tokio::runtime::Runtime; +use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use tracing::{error, info, warn}; use uuid::Uuid; @@ -1868,6 +1876,36 @@ impl DiskAPI for LocalDisk { Ok(info) } + + async fn ns_scanner( + &self, + cache: &DataUsageCache, + updates: Sender, + scan_mode: HealScanMode, + ) -> Result { + self.scanning.fetch_add(1, Ordering::SeqCst); + defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) }); + + // Check if the current bucket has replication configuration + if let Ok((rcfg, _)) = GLOBAL_BucketMetadataSys + .read() + .await + .get_replication_config(&cache.info.name) + .await + { + if has_active_rules(&rcfg, "", true) { + // TODO: globalBucketTargetSys + } + } + + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(Error::msg("errServerNotInitialized")), + }; + todo!() + } } async fn get_disk_info(drive_path: PathBuf) -> Result<(Info, bool)> { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 3a59fa25..920ca072 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -17,6 +17,10 @@ use crate::{ erasure::{ReadAt, Writer}, error::{Error, Result}, file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion}, + heal::{ + data_usage_cache::{DataUsageCache, DataUsageEntry}, + heal_commands::HealScanMode, + }, store_api::{FileInfo, RawFileInfo}, }; use endpoint::Endpoint; @@ -436,6 +440,12 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()>; async fn read_all(&self, volume: &str, path: &str) -> Result>; async fn disk_info(&self, opts: &DiskInfoOptions) -> Result; + async fn ns_scanner( + &self, + cache: &DataUsageCache, + updates: Sender, + scan_mode: HealScanMode, + ) -> Result; } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 0eefe1c5..aff54235 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -10,6 +10,7 @@ use protos::{ UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest, }, }; +use tokio::sync::mpsc::Sender; use tonic::Request; use tracing::info; use uuid::Uuid; @@ -20,9 +21,7 @@ use super::{ RemoteFileWriter, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, }; use crate::{ - disk::error::DiskError, - error::{Error, Result}, - store_api::{FileInfo, RawFileInfo}, + disk::error::DiskError, error::{Error, Result}, heal::{data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::HealScanMode}, store_api::{FileInfo, RawFileInfo} }; use protos::proto_gen::node_service::RenamePartRequst; @@ -729,4 +728,13 @@ impl DiskAPI for RemoteDisk { Ok(disk_info) } + + async fn ns_scanner( + &self, + cache: &DataUsageCache, + updates: Sender, + scan_mode: HealScanMode, + ) -> Result { + todo!() + } } diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index ed290818..06dc702e 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -1,6 +1,14 @@ use std::{ + collections::{HashMap, HashSet}, + fs, + future::Future, io::{Cursor, Read}, - sync::{atomic::{AtomicU32, AtomicU64}, Arc}, + path::{Path, PathBuf}, + pin::Pin, + sync::{ + atomic::{AtomicU32, AtomicU64}, + Arc, + }, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -8,20 +16,46 @@ use byteorder::{LittleEndian, ReadBytesExt}; use lazy_static::lazy_static; use rand::Rng; use rmp_serde::{Deserializer, Serializer}; +use s3s::dto::{ReplicationConfiguration, ReplicationRuleStatus}; use serde::{Deserialize, Serialize}; -use tokio::{sync::{mpsc, RwLock}, time::sleep}; +use tokio::{ + sync::{ + broadcast, + mpsc::{self, Sender}, + RwLock, + }, + time::sleep, +}; use tracing::{error, info}; use crate::{ - config::{common::{read_config, save_config}, heal::Config}, + cache_value::metacache_set::{list_path_raw, ListPathRawOptions}, + config::{ + common::{read_config, save_config}, + heal::Config, + }, + disk::{error::DiskError, DiskStore, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams}, error::{Error, Result}, - global::GLOBAL_IsErasureSD, - heal::data_usage::BACKGROUND_HEAL_INFO_PATH, + global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasure, GLOBAL_IsErasureSD}, + heal::{ + data_usage::BACKGROUND_HEAL_INFO_PATH, + data_usage_cache::{hash_path, DataUsageHashMap}, + error::ERR_IGNORE_FILE_CONTRIB, + heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT}, + heal_ops::{HealSource, BG_HEALING_UUID}, + }, new_object_layer_fn, - store::ECStore, + peer::is_reserved_or_invalid_bucket, + store::{ECStore, ListPathOptions}, + utils::path::{path_join, path_to_bucket_object, path_to_bucket_object_with_base_path, SLASH_SEPARATOR}, }; -use super::{data_scanner_metric::globalScannerMetrics, data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}}; +use super::{ + data_scanner_metric::globalScannerMetrics, + data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, + data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, + heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, +}; const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders. const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles. @@ -103,7 +137,8 @@ async fn run_data_scanner() { } let bg_heal_info = read_background_heal_info(store).await; - let scan_mode = get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; + let scan_mode = + get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; if bg_heal_info.current_scan_mode != scan_mode { let mut new_heal_info = bg_heal_info; new_heal_info.current_scan_mode = scan_mode; @@ -166,7 +201,7 @@ async fn save_background_heal_info(store: &ECStore, info: &BackgroundHealInfo) { async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot_start_time: SystemTime) -> HealScanMode { let bitrot_cycle = globalHealConfig.read().await.bitrot_scan_cycle(); - let v = bitrot_cycle.as_secs_f64() ; + let v = bitrot_cycle.as_secs_f64(); if v == -1.0 { return HEAL_NORMAL_SCAN; } else if v == 0.0 { @@ -291,3 +326,606 @@ fn system_time_to_timestamp(time: &SystemTime) -> u64 { fn timestamp_to_system_time(timestamp: u64) -> SystemTime { UNIX_EPOCH + std::time::Duration::new(timestamp, 0) } + +#[derive(Debug, Default)] +struct Heal { + enabled: bool, + bitrot: bool, +} + +struct ScannerItem { + path: String, + bucket: String, + prefix: String, + object_name: String, + replication: Option, + // todo: lifecycle + // typ: fs::Permissions, + heal: Heal, + debug: bool, +} + +impl ScannerItem { + pub fn transform_meda_dir(&mut self) { + let split = self + .prefix + .split(SLASH_SEPARATOR) + .map(|s| PathBuf::from(s)) + .collect::>(); + if split.len() > 1 { + self.prefix = path_join(&split[0..split.len() - 1]).to_string_lossy().to_string(); + } else { + self.prefix = "".to_string(); + } + self.object_name = split.last().map_or("".to_string(), |v| v.to_string_lossy().to_string()); + } + + pub fn object_path(&self) -> PathBuf { + path_join(&[PathBuf::from(self.prefix.clone()), PathBuf::from(self.object_name.clone())]) + } +} + +#[derive(Debug, Default)] +pub struct SizeSummary { + pub total_size: usize, + pub versions: usize, + pub delete_markers: usize, + pub replicated_size: usize, + pub replicated_count: usize, + pub pending_size: usize, + pub failed_size: usize, + pub replica_size: usize, + pub replica_count: usize, + pub pending_count: usize, + pub failed_count: usize, + pub repl_target_stats: HashMap, + // Todo: tires +} + +#[derive(Debug, Default)] +pub struct ReplTargetSizeSummary { + pub replicated_size: usize, + pub replicated_count: usize, + pub pending_size: usize, + pub failed_size: usize, + pub pending_count: usize, + pub failed_count: usize, +} + +#[derive(Debug, Clone)] +struct CachedFolder { + name: String, + parent: DataUsageHash, + object_heal_prob_div: u32, +} + +type GetSizeFn = Box Pin>>> + Send + 'static>; + +struct FolderScanner { + root: String, + get_size: GetSizeFn, + old_cache: DataUsageCache, + new_cache: DataUsageCache, + update_cache: DataUsageCache, + data_usage_scanner_debug: bool, + heal_object_select: u32, + scan_mode: HealScanMode, + should_heal: Arc bool + Send + Sync>, + disks: Vec>, + disks_quorum: usize, + updates: Sender, + last_update: SystemTime, + update_current_path: Arc, +} + +impl FolderScanner { + async fn scan_folder(&mut self, folder: &CachedFolder, into: &mut DataUsageEntry) -> Result<()> { + let this_hash = hash_path(&folder.name); + let was_compacted = into.compacted; + + loop { + let mut abandoned_children: DataUsageHashMap = if !into.compacted { + self.old_cache.find_children_copy(this_hash.clone()) + } else { + HashSet::new() + }; + + let (_, prefix) = path_to_bucket_object_with_base_path(&self.root, &folder.name); + // Todo: lifeCycle + let replication_cfg = if self.old_cache.info.replication.is_some() + && has_active_rules(self.old_cache.info.replication.as_ref().unwrap(), &prefix, true) + { + self.old_cache.info.replication.clone() + } else { + None + }; + + let mut existing_folders = Vec::new(); + let mut new_folders = Vec::new(); + let mut found_objects: bool = false; + + let path = Path::new(&self.root).join(&folder.name); + if path.is_dir() { + for entry in fs::read_dir(path)? { + let entry = entry?; + let sub_path = entry.path(); + let ent_name = Path::new(&folder.name).join(&sub_path); + let (bucket, prefix) = path_to_bucket_object_with_base_path(&self.root, ent_name.to_str().unwrap()); + if bucket.is_empty() { + continue; + } + if is_reserved_or_invalid_bucket(&bucket, false) { + continue; + } + + if !sub_path.is_dir() { + let h = hash_path(ent_name.to_str().unwrap()); + if h == this_hash { + continue; + } + let this = CachedFolder { + name: ent_name.to_string_lossy().to_string(), + parent: this_hash.clone(), + object_heal_prob_div: folder.object_heal_prob_div, + }; + abandoned_children.remove(&h.key()); + if self.old_cache.cache.contains_key(&h.key()) { + existing_folders.push(this); + self.update_cache + .copy_with_children(&self.old_cache, &h, &Some(this_hash.clone())); + } else { + new_folders.push(this); + } + continue; + } + + let mut item = ScannerItem { + path: Path::new(&self.root).join(&ent_name).to_string_lossy().to_string(), + bucket, + prefix: Path::new(&prefix) + .parent() + .unwrap_or(Path::new("")) + .to_string_lossy() + .to_string(), + object_name: ent_name + .file_name() + .map(|name| name.to_string_lossy().into_owned()) + .unwrap_or_default(), + debug: self.data_usage_scanner_debug, + replication: replication_cfg.clone(), + heal: Heal::default(), + }; + + item.heal.enabled = this_hash.mod_alt( + self.old_cache.info.next_cycle / folder.object_heal_prob_div, + self.heal_object_select / folder.object_heal_prob_div, + ) && (self.should_heal)(); + item.heal.bitrot = self.scan_mode == HEAL_DEEP_SCAN; + + let (sz, err) = match (self.get_size)(&item).await { + Ok(sz) => (sz, None), + Err(err) => { + if err.to_string() != ERR_IGNORE_FILE_CONTRIB { + continue; + } + (SizeSummary::default(), Some(err)) + } + }; + // successfully read means we have a valid object. + found_objects = true; + // Remove filename i.e is the meta file to construct object name + item.transform_meda_dir(); + // Object already accounted for, remove from heal map, + // simply because getSize() function already heals the + // object. + abandoned_children.remove( + &path_join(&[PathBuf::from(item.bucket.clone()), item.object_path()]) + .to_string_lossy() + .to_string(), + ); + + if err.is_none() || err.unwrap().to_string() != ERR_IGNORE_FILE_CONTRIB { + into.add_sizes(&sz); + into.objects += 1; + } + } + } + if found_objects && *GLOBAL_IsErasure.read().await { + // If we found an object in erasure mode, we skip subdirs (only datadirs)... + break; + } + + let should_compact = self.new_cache.info.name != folder.name + && existing_folders.len() + new_folders.len() >= DATA_SCANNER_COMPACT_AT_FOLDERS.try_into().unwrap() + || existing_folders.len() + new_folders.len() >= DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS.try_into().unwrap(); + + let total_folders = existing_folders.len() + new_folders.len(); + if total_folders + > SCANNER_EXCESS_FOLDERS + .load(std::sync::atomic::Ordering::SeqCst) + .try_into() + .unwrap() + { + let _prefix_name = format!("{}/", folder.name.trim_end_matches('/')); + // todo: notification + } + + if !into.compacted && should_compact { + into.compacted = true; + new_folders.extend(existing_folders.clone()); + existing_folders.clear(); + } + + // Transfer existing + if !into.compacted { + for folder in existing_folders.iter() { + let h = hash_path(&folder.name); + self.update_cache + .copy_with_children(&self.old_cache, &h, &Some(folder.parent.clone())); + } + } + + // Scan new... + for folder in new_folders.iter() { + let h = hash_path(&folder.name); + if !into.compacted { + let mut found_any = false; + let mut parent = this_hash.clone(); + while parent != hash_path(&self.update_cache.info.name) { + let e = self.update_cache.find(&parent.key()); + if e.is_none() || e.as_ref().unwrap().compacted { + found_any = true; + break; + } + match self.update_cache.search_parent(&parent) { + Some(next) => { + parent = next; + } + None => { + found_any = true; + break; + } + } + } + if !found_any { + self.update_cache + .replace_hashed(&h, &Some(this_hash.clone()), &DataUsageEntry::default()); + } + } + (self.update_current_path)(&folder.name); + scan(folder, into, self).await; + // Add new folders if this is new and we don't have existing. + if !into.compacted { + if let Some(parent) = self.update_cache.find(&this_hash.key()) { + if !parent.compacted { + self.update_cache.delete_recursive(&h); + self.update_cache + .copy_with_children(&self.new_cache, &h, &Some(this_hash.clone())); + } + } + } + } + + // Scan existing... + for folder in existing_folders.iter() { + let h = hash_path(&folder.name); + if !into.compacted && self.old_cache.is_compacted(&h) { + if !h.mod_(self.old_cache.info.next_cycle, DATA_USAGE_UPDATE_DIR_CYCLES) { + self.new_cache + .copy_with_children(&self.old_cache, &h, &Some(folder.parent.clone())); + into.add_child(&h); + continue; + } + } + (self.update_current_path)(&folder.name); + scan(folder, into, self).await; + } + + // Scan for healing + if abandoned_children.is_empty() || !(self.should_heal)() { + break; + } + + if self.disks.is_empty() || self.disks_quorum == 0 { + break; + } + + let (bg_seq, found) = GLOBAL_BackgroundHealState + .read() + .await + .get_heal_sequence_by_token(BG_HEALING_UUID) + .await; + if !found { + break; + } + let bg_seq = bg_seq.unwrap(); + + let mut resolver = MetadataResolutionParams { + dir_quorum: self.disks_quorum, + obj_quorum: self.disks_quorum, + bucket: "".to_string(), + strict: false, + ..Default::default() + }; + + for k in abandoned_children.iter() { + if !(self.should_heal)() { + break; + } + + let (bucket, prefix) = path_to_bucket_object(k); + (self.update_current_path)(k); + + if bucket != resolver.bucket { + let _ = bg_seq + .clone() + .write() + .await + .queue_heal_task( + HealSource { + bucket: bucket.clone(), + ..Default::default() + }, + HEAL_ITEM_BUCKET.to_owned(), + ) + .await?; + } + + resolver.bucket = bucket.clone(); + let found_objs = Arc::new(RwLock::new(false)); + let found_objs_clone = found_objs.clone(); + let (tx, rx) = broadcast::channel(1); + let tx_partial = tx.clone(); + let tx_finished = tx.clone(); + let update_current_path_agreed = self.update_current_path.clone(); + let update_current_path_partial = self.update_current_path.clone(); + let should_heal_clone = self.should_heal.clone(); + let resolver_clone = resolver.clone(); + let bg_seq_clone = bg_seq.clone(); + let lopts = ListPathRawOptions { + disks: self.disks.clone(), + bucket: bucket.clone(), + path: prefix.clone(), + recursice: true, + report_not_found: true, + min_disks: self.disks_quorum, + agreed: Some(Box::new(move |entry: MetaCacheEntry| { + Box::pin({ + let update_current_path_agreed = update_current_path_agreed.clone(); + async move { + update_current_path_agreed(&entry.name); + } + }) + })), + partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { + Box::pin({ + let update_current_path_partial = update_current_path_partial.clone(); + let should_heal_partial = should_heal_clone.clone(); + let tx_partial = tx_partial.clone(); + let resolver_partial = resolver_clone.clone(); + let bucket_partial = bucket.clone(); + let found_objs_clone = found_objs_clone.clone(); + let bg_seq_partial = bg_seq_clone.clone(); + async move { + if !should_heal_partial() { + let _ = tx_partial.send(true); + return; + } + let entry = match entries.resolve(resolver_partial) { + Ok(Some(entry)) => entry, + _ => match entries.first_found() { + (Some(entry), _) => entry, + _ => return, + }, + }; + + update_current_path_partial(&entry.name); + let mut custom = HashMap::new(); + if entry.is_dir() { + return; + } + + // We got an entry which we should be able to heal. + let fiv = match entry.file_info_versions(&bucket_partial) { + Ok(fiv) => fiv, + Err(_) => { + if let Err(err) = bg_seq_partial + .write() + .await + .queue_heal_task( + HealSource { + bucket: bucket_partial.clone(), + object: entry.name.clone(), + version_id: "".to_string(), + ..Default::default() + }, + HEAL_ITEM_OBJECT.to_string(), + ) + .await + { + match err.downcast_ref() { + Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {} + _ => { + info!("{}", err.to_string()); + } + } + } else { + let mut w = found_objs_clone.write().await; + *w = *w || true; + } + return; + } + }; + + custom.insert("versions", fiv.versions.len().to_string()); + let (mut success_versions, mut fail_versions) = (0, 0); + for ver in fiv.versions.iter() { + match bg_seq_partial + .write() + .await + .queue_heal_task( + HealSource { + bucket: bucket_partial.clone(), + object: entry.name.clone(), + version_id: "".to_string(), + ..Default::default() + }, + HEAL_ITEM_OBJECT.to_string(), + ) + .await + { + Ok(_) => { + success_versions += 1; + let mut w = found_objs_clone.write().await; + *w = *w || true; + } + Err(_) => fail_versions += 1, + } + } + custom.insert("success_versions", success_versions.to_string()); + custom.insert("failed_versions", fail_versions.to_string()); + } + }) + })), + finished: Some(Box::new(move |err: &[Option]| { + Box::pin({ + let tx_finished = tx_finished.clone(); + async move { + let _ = tx_finished.send(true); + } + }) + })), + ..Default::default() + }; + let _ = list_path_raw(rx, lopts).await; + + if *found_objs.read().await { + let this = CachedFolder { + name: k.clone(), + parent: this_hash.clone(), + object_heal_prob_div: 1, + }; + scan(&this, into, self).await; + } + } + break; + } + if !was_compacted { + self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &into); + } + + if !into.compacted && self.new_cache.info.name != folder.name { + let mut flat = self + .new_cache + .size_recursive(&this_hash.key()) + .unwrap_or(DataUsageEntry::default()); + flat.compacted = true; + let mut compact = false; + if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() { + compact = true; + } else { + // Compact if we only have objects as children... + compact = true; + for k in into.children.iter() { + if let Some(v) = self.new_cache.cache.get(k) { + if !v.children.is_empty() || v.objects > 1 { + compact = false; + break; + } + } + } + } + if compact { + self.new_cache.delete_recursive(&this_hash); + self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat); + let mut total: HashMap = HashMap::new(); + total.insert("objects".to_string(), flat.objects.to_string()); + total.insert("size".to_string(), flat.size.to_string()); + if flat.versions > 0 { + total.insert("versions".to_string(), flat.versions.to_string()); + } + } + } + // Compact if too many children... + if !into.compacted { + self.new_cache.reduce_children_of( + &this_hash, + DATA_SCANNER_COMPACT_AT_CHILDREN.try_into().unwrap(), + self.new_cache.info.name != folder.name, + ); + } + if self.update_cache.cache.contains_key(&this_hash.key()) && !was_compacted { + // Replace if existed before. + if let Some(flat) = self.new_cache.size_recursive(&this_hash.key()) { + self.update_cache.delete_recursive(&this_hash); + self.update_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat); + } + } + Ok(()) + } + + async fn send_update(&mut self) { + if SystemTime::now().duration_since(self.last_update).unwrap() < Duration::from_secs(60) { + return; + } + if let Some(flat) = self.update_cache.size_recursive(&self.new_cache.info.name) { + let _ = self.updates.send(flat).await; + self.last_update = SystemTime::now(); + } + } +} + +async fn scan(folder: &CachedFolder, into: &mut DataUsageEntry, folder_scanner: &mut FolderScanner) { + let mut dst = if !into.compacted { + DataUsageEntry::default() + } else { + into.clone() + }; + + if Box::pin(folder_scanner.scan_folder(folder, &mut dst)).await.is_err() { + return; + } + if !into.compacted { + let h = DataUsageHash(folder.name.clone()); + into.add_child(&h); + folder_scanner.update_cache.delete_recursive(&h); + folder_scanner + .update_cache + .copy_with_children(&folder_scanner.new_cache, &h, &Some(folder.parent.clone())); + folder_scanner.send_update().await; + } +} + +pub fn has_active_rules(config: &ReplicationConfiguration, prefix: &str, recursive: bool) -> bool { + if config.rules.is_empty() { + return false; + } + + for rule in config.rules.iter() { + if rule + .status + .eq(&ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)) + { + continue; + } + if !prefix.is_empty() { + if let Some(filter) = &rule.filter { + if let Some(r_prefix) = &filter.prefix { + if !r_prefix.is_empty() { + // incoming prefix must be in rule prefix + if !recursive && !prefix.starts_with(r_prefix) { + continue; + } + // If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix + // does not match + if recursive && !r_prefix.starts_with(prefix) && !prefix.starts_with(r_prefix) { + continue; + } + } + } + } + } + return true; + } + false +} diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index 3bbf1f50..037f503b 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -1,10 +1,16 @@ +use bytesize::ByteSize; use http::HeaderMap; +use path_clean::PathClean; use rand::Rng; use rmp_serde::Serializer; +use s3s::dto::ReplicationConfiguration; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::path::Path; +use std::sync::Arc; use std::time::{Duration, SystemTime}; +use std::u64; use s3s::{S3Error, S3ErrorCode}; use tokio::sync::mpsc::Sender; use tokio::time::sleep; @@ -14,21 +20,162 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; use crate::error::{Error, Result}; use crate::new_object_layer_fn; use crate::set_disk::SetDisks; -use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions, StorageAPI}; +use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions}; +use super::data_scanner::SizeSummary; use super::data_usage::DATA_USAGE_ROOT; // DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals pub const DATA_USAGE_BUCKET_LEN: usize = 11; pub const DATA_USAGE_VERSION_LEN: usize = 7; -type DataUsageHashMap = HashSet; -// sizeHistogram is a size histogram. -type SizeHistogram = Vec; -// versionsHistogram is a histogram of number of versions in an object. -type VersionsHistogram = Vec; +pub type DataUsageHashMap = HashSet; -#[derive(Debug, Clone, Serialize, Deserialize)] +struct ObjectHistogramInterval { + name: &'static str, + start: u64, + end: u64, +} + +const OBJECTS_HISTOGRAM_INTERVALS: [ObjectHistogramInterval; DATA_USAGE_BUCKET_LEN] = [ + ObjectHistogramInterval { + name: "LESS_THAN_1024_B", + start: 0, + end: ByteSize::kib(1).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_1024_B_AND_64_KB", + start: ByteSize::kib(1).as_u64(), + end: ByteSize::kib(64).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_64_KB_AND_256_KB", + start: ByteSize::kib(64).as_u64(), + end: ByteSize::kib(256).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_256_KB_AND_512_KB", + start: ByteSize::kib(256).as_u64(), + end: ByteSize::kib(512).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_512_KB_AND_1_MB", + start: ByteSize::kib(512).as_u64(), + end: ByteSize::mib(1).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_1024B_AND_1_MB", + start: ByteSize::kib(1).as_u64(), + end: ByteSize::mib(1).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_1_MB_AND_10_MB", + start: ByteSize::mib(1).as_u64(), + end: ByteSize::mib(10).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_10_MB_AND_64_MB", + start: ByteSize::mib(10).as_u64(), + end: ByteSize::mib(64).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_64_MB_AND_128_MB", + start: ByteSize::mib(64).as_u64(), + end: ByteSize::mib(128).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_128_MB_AND_512_MB", + start: ByteSize::mib(128).as_u64(), + end: ByteSize::mib(512).as_u64() - 1, + }, + ObjectHistogramInterval { + name: "GREATER_THAN_512_MB", + start: ByteSize::mib(512).as_u64(), + end: u64::MAX, + }, +]; + +const OBJECTS_VERSION_COUNT_INTERVALS: [ObjectHistogramInterval; DATA_USAGE_VERSION_LEN] = [ + ObjectHistogramInterval { + name: "UNVERSIONED", + start: 0, + end: 0, + }, + ObjectHistogramInterval { + name: "SINGLE_VERSION", + start: 1, + end: 1, + }, + ObjectHistogramInterval { + name: "BETWEEN_2_AND_10", + start: 2, + end: 9, + }, + ObjectHistogramInterval { + name: "BETWEEN_10_AND_100", + start: 10, + end: 99, + }, + ObjectHistogramInterval { + name: "BETWEEN_100_AND_1000", + start: 100, + end: 999, + }, + ObjectHistogramInterval { + name: "BETWEEN_1000_AND_10000", + start: 1000, + end: 9999, + }, + ObjectHistogramInterval { + name: "GREATER_THAN_10000", + start: 10000, + end: u64::MAX, + }, +]; + +// sizeHistogram is a size histogram. +#[derive(Clone, Serialize, Deserialize)] +pub struct SizeHistogram(Vec); + +impl Default for SizeHistogram { + fn default() -> Self { + Self(vec![0; DATA_USAGE_BUCKET_LEN]) + } +} + +impl SizeHistogram { + fn add(&mut self, size: u64) { + for (idx, interval) in OBJECTS_HISTOGRAM_INTERVALS.iter().enumerate() { + if size >= interval.start && size <= interval.end { + self.0[idx] += 1; + break; + } + } + } +} + +// versionsHistogram is a histogram of number of versions in an object. +#[derive(Clone, Serialize, Deserialize)] +pub struct VersionsHistogram(Vec); + +impl Default for VersionsHistogram { + fn default() -> Self { + Self(vec![0; DATA_USAGE_VERSION_LEN]) + } +} + +impl VersionsHistogram { + fn add(&mut self, size: u64) { + for (idx, interval) in OBJECTS_VERSION_COUNT_INTERVALS.iter().enumerate() { + if size >= interval.start && size <= interval.end { + self.0[idx] += 1; + break; + } + } + } +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ReplicationStats { pub pending_size: u64, pub replicated_size: u64, @@ -48,7 +195,7 @@ impl ReplicationStats { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ReplicationAllStats { pub targets: HashMap, pub replica_size: u64, @@ -70,35 +217,117 @@ impl ReplicationAllStats { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Default, Serialize, Deserialize)] pub struct DataUsageEntry { pub children: DataUsageHashMap, // These fields do no include any children. - pub size: i64, - pub objects: u64, - pub versions: u64, - pub delete_markers: u64, + pub size: usize, + pub objects: usize, + pub versions: usize, + pub delete_markers: usize, pub obj_sizes: SizeHistogram, pub obj_versions: VersionsHistogram, - pub replication_stats: ReplicationAllStats, + pub replication_stats: Option, // Todo: tier // pub all_tier_stats: , pub compacted: bool, } +impl DataUsageEntry { + pub fn add_child(&mut self, hash: &DataUsageHash) { + if self.children.contains(&hash.key()) { + return; + } + + self.children.insert(hash.key()); + } + + pub fn add_sizes(&mut self, summary: &SizeSummary) { + self.size += summary.total_size; + self.versions += summary.versions; + self.delete_markers += summary.delete_markers; + self.obj_sizes.add(summary.total_size as u64); + self.obj_versions.add(summary.versions as u64); + + let replication_stats = if self.replication_stats.is_none() { + self.replication_stats = Some(ReplicationAllStats::default()); + self.replication_stats.as_mut().unwrap() + } else { + self.replication_stats.as_mut().unwrap() + }; + replication_stats.replica_size += summary.replica_size as u64; + replication_stats.replica_count += summary.replica_count as u64; + + for (arn, st) in &summary.repl_target_stats { + let tgt_stat = replication_stats.targets.entry(arn.to_string()).or_insert(ReplicationStats::default()); + tgt_stat.pending_size += st.pending_size as u64; + tgt_stat.failed_size += st.failed_size as u64; + tgt_stat.replicated_size += st.replicated_size as u64; + tgt_stat.replicated_count += st.replicated_count as u64; + tgt_stat.failed_count += st.failed_count as u64; + tgt_stat.pending_count += st.pending_count as u64; + } + // Todo:: tiers + } + + pub fn merge(&mut self, other: &DataUsageEntry) { + self.objects += other.objects; + self.versions += other.versions; + self.delete_markers += other.delete_markers; + self.size += other.size; + if let Some(o_rep) = &other.replication_stats { + if self.replication_stats.is_none() { + self.replication_stats = Some(ReplicationAllStats::default()); + } + let s_rep = self.replication_stats.as_mut().unwrap(); + s_rep.targets.clear(); + s_rep.replica_size += o_rep.replica_size; + s_rep.replica_count += o_rep.replica_count; + for (arn, stat) in o_rep.targets.iter() { + let st = s_rep.targets.entry(arn.clone()).or_insert(ReplicationStats::default()); + *st = ReplicationStats { + pending_size: stat.pending_size + st.pending_size, + failed_size: stat.failed_size + st.failed_size, + replicated_size: stat.replicated_size + st.replicated_size, + pending_count: stat.pending_count + st.pending_count, + failed_count: stat.failed_count + st.failed_count, + replicated_count: stat.replicated_count + st.replicated_count, + ..Default::default() + }; + } + } + + for (i, v) in other.obj_sizes.0.iter().enumerate() { + self.obj_sizes.0[i] += v; + } + + for (i, v) in other.obj_versions.0.iter().enumerate() { + self.obj_versions.0[i] += v; + } + + // todo: tiers + } +} + +#[derive(Clone)] +pub struct DataUsageEntryInfo { + pub name: String, + pub parent: String, + pub entry: DataUsageEntry, +} + #[derive(Clone, Serialize, Deserialize)] pub struct DataUsageCacheInfo { pub name: String, - pub next_cycle: usize, + pub next_cycle: u32, pub last_update: SystemTime, pub skip_healing: bool, // todo: life_cycle // pub life_cycle: #[serde(skip)] pub updates: Option>, - // Todo: replication - // #[serde(skip_serializing)] - // replication: + #[serde(skip)] + pub replication: Option, } impl Default for DataUsageCacheInfo { @@ -109,6 +338,7 @@ impl Default for DataUsageCacheInfo { last_update: SystemTime::now(), skip_healing: Default::default(), updates: Default::default(), + replication: Default::default(), } } } @@ -202,6 +432,195 @@ impl DataUsageCache { pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) { let hash = hash_path(path); + self.cache.insert(hash.key(), e); + if !parent.is_empty() { + let phash = hash_path(parent); + let p = { + let p = self.cache.entry(phash.key()).or_insert(DataUsageEntry::default()); + p.add_child(&hash); + p.clone() + }; + self.cache.insert(phash.key(), p); + } + } + + pub fn replace_hashed(&mut self, hash: &DataUsageHash, parent: &Option, e: &DataUsageEntry) { + self.cache.insert(hash.key(), e.clone()); + if let Some(parent) = parent { + self.cache.entry(parent.key()).or_insert(DataUsageEntry::default()).add_child(hash); + } + } + + pub fn find(&self, path: &str) -> Option { + self.cache.get(&hash_path(path).key()).cloned() + } + + pub fn find_children_copy(&mut self, h: DataUsageHash) -> DataUsageHashMap { + self.cache.entry(h.string()).or_insert(DataUsageEntry::default()).children.clone() + } + + pub fn flatten(&self, root: &DataUsageEntry) -> DataUsageEntry { + let mut root = root.clone(); + for id in root.children.clone().iter() { + if let Some(e) = self.cache.get(id) { + let mut e = e.clone(); + if !e.children.is_empty() { + e = self.flatten(&e); + } + root.merge(&e); + } + } + root.children.clear(); + root + } + + pub fn copy_with_children(&mut self, src: &DataUsageCache, hash: &DataUsageHash, parent: &Option) { + match src.cache.get(&hash.string()) { + Some(e) => { + self.cache.insert(hash.key(), e.clone()); + for ch in e.children.iter() { + if *ch == hash.key() { + return; + } + self.copy_with_children(src, &DataUsageHash(ch.to_string()), &Some(hash.clone())); + } + if let Some(parent) = parent { + let p = self.cache.entry(parent.key()).or_insert(DataUsageEntry::default()); + p.add_child(hash); + } + }, + None => return, + } + } + + pub fn delete_recursive(&mut self, hash: &DataUsageHash) { + let mut need_remove = Vec::new(); + if let Some(v) = self.cache.get(&hash.string()) { + for child in v.children.iter() { + need_remove.push(child.clone()); + } + } + self.cache.remove(&hash.string()); + need_remove.iter().for_each(|child| { + self.delete_recursive(&DataUsageHash(child.to_string())); + }); + } + + pub fn size_recursive(&self, path: &str) -> Option { + match self.find(path) { + Some(root) => { + if root.children.is_empty() { + return None; + } + let mut flat = self.flatten(&root); + if flat.replication_stats.is_some() && flat.replication_stats.as_ref().unwrap().empty() { + flat.replication_stats = None; + } + return Some(flat); + }, + None => None + } + } + + pub fn search_parent(&self, hash: &DataUsageHash) -> Option { + let want = hash.key(); + if let Some(last_index) = want.rfind('/') { + if let Some(v) = self.find(&want[0..last_index]) { + if v.children.contains(&want) { + let found = hash_path(&want[0..last_index]); + return Some(found); + } + } + } + + for (k, v) in self.cache.iter() { + if v.children.contains(&want) { + let found = DataUsageHash(k.clone()); + return Some(found); + } + } + None + } + + pub fn is_compacted(&self, hash: &DataUsageHash) -> bool { + match self.cache.get(&hash.key()) { + Some(due) => { + due.compacted + }, + None => false + } + } + + pub fn reduce_children_of(&mut self, path: &DataUsageHash, limit: usize, compact_self: bool) { + let e = match self.cache.get(&path.key()) { + Some(e) => e, + None => return, + }; + + if e.compacted { + return; + } + + if e.children.len() > limit && compact_self { + let mut flat = self.size_recursive(&path.key()).unwrap_or(DataUsageEntry::default()); + flat.compacted = true; + self.delete_recursive(path); + self.replace_hashed(path, &None, &flat); + return; + } + let total = self.total_children_rec(&path.key()); + if total < limit { + return; + } + + let mut leaves = Vec::new(); + let mut remove = total - limit; + add(self, path, &mut leaves); + leaves.sort_by(|a, b| { + a.objects.cmp(&b.objects) + }); + + while remove > 0 && leaves.len() > 0 { + let e = leaves.first().unwrap(); + let candidate = e.path.clone(); + if candidate == *path && !compact_self { + break; + } + let removing = self.total_children_rec(&candidate.key()); + let mut flat = match self.size_recursive(&candidate.key()) { + Some(flat) => flat, + None => { + leaves.remove(0); + continue; + } + }; + + flat.compacted = true; + self.delete_recursive(&candidate); + self.replace_hashed(&candidate, &None, &flat); + + remove -= removing; + leaves.remove(0); + } + + } + + pub fn total_children_rec(&self, path: &str) -> usize { + let root = self.find(path); + + if root.is_none() { + return 0; + } + let root = root.unwrap(); + if root.children.is_empty() { + return 0; + } + + let mut n = root.children.len(); + for ch in root.children.iter() { + n += self.total_children_rec(&ch); + } + n } pub fn marshal_msg(&self) -> Result> { @@ -218,10 +637,63 @@ impl DataUsageCache { } } -struct DataUsageHash(String); +#[derive(Default, Clone)] +struct Inner { + objects: usize, + path: DataUsageHash, +} + +fn add(data_usage_cache: &DataUsageCache, path: &DataUsageHash, leaves: &mut Vec) { + let e = match data_usage_cache.cache.get(&path.key()) { + Some(e) => e, + None => return, + }; + if !e.children.is_empty() { + return; + } + + let sz = data_usage_cache.size_recursive(&path.key()).unwrap_or(DataUsageEntry::default()); + leaves.push(Inner{objects: sz.objects, path: path.clone()}); + for ch in e.children.iter() { + add(data_usage_cache, &DataUsageHash(ch.clone()), leaves); + } +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct DataUsageHash(pub String); impl DataUsageHash { - + pub fn string(&self) -> String { + self.0.clone() + } + + pub fn key(&self) -> String { + self.0.clone() + } + + pub fn mod_(&self, cycle: u32, cycles: u32) -> bool { + if cycles <= 1 { + return cycles == 1; + } + + let hash = self.calculate_hash(); + hash as u32 % cycles == cycle % cycles + } + + pub fn mod_alt(&self, cycle: u32, cycles: u32) -> bool { + if cycles <= 1 { + return cycles == 1; + } + + let hash = self.calculate_hash(); + (hash >> 32) as u32 % cycles == cycle % cycles + } + + fn calculate_hash(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.0.hash(&mut hasher); + hasher.finish() + } } pub fn hash_path(data: &str) -> DataUsageHash { @@ -229,6 +701,5 @@ pub fn hash_path(data: &str) -> DataUsageHash { if data != DATA_USAGE_ROOT { data = data.trim_matches('/'); } - Path::new(&data); - todo!() + DataUsageHash(Path::new(&data).clean().to_string_lossy().to_string()) } diff --git a/ecstore/src/heal/error.rs b/ecstore/src/heal/error.rs new file mode 100644 index 00000000..9b5e9d0c --- /dev/null +++ b/ecstore/src/heal/error.rs @@ -0,0 +1 @@ +pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage"; \ No newline at end of file diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 0c9415d2..de79a66f 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -60,12 +60,13 @@ pub struct HealSequenceStatus { pub items: Vec, } +#[derive(Debug, Default)] pub struct HealSource { pub bucket: String, pub object: String, pub version_id: String, pub no_wait: bool, - opts: Option, + pub opts: Option, } #[derive(Clone, Debug)] @@ -245,7 +246,7 @@ impl HealSequence { Ok(()) } - async fn queue_heal_task(&mut self, source: HealSource, heal_type: HealItemType) -> Result<()> { + pub async fn queue_heal_task(&mut self, source: HealSource, heal_type: HealItemType) -> Result<()> { let mut task = HealTask::new(&source.bucket, &source.object, &source.version_id, &self.setting); if let Some(opts) = source.opts { task.opts = opts; @@ -348,7 +349,7 @@ pub async fn heal_sequence_start(h: Arc) { pub struct AllHealState { mu: RwLock, - heal_seq_map: HashMap, + heal_seq_map: HashMap>>, heal_local_disks: HashMap, heal_status: HashMap, } @@ -449,7 +450,8 @@ impl AllHealState { let mut keys_to_reomve = Vec::new(); for (k, v) in self.heal_seq_map.iter() { - if v.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(v.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now { + let r = v.read().await; + if r.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(r.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now { keys_to_reomve.push(k.clone()) } } @@ -458,11 +460,12 @@ impl AllHealState { } } - async fn get_heal_sequence_by_token(&self, token: &str) -> (Option, bool) { + pub async fn get_heal_sequence_by_token(&self, token: &str) -> (Option>>, bool) { let _ = self.mu.read().await; for v in self.heal_seq_map.values() { - if v.client_token == token { + let r = v.read().await; + if r.client_token == token { return (Some(v.clone()), true); } } @@ -470,7 +473,7 @@ impl AllHealState { (None, false) } - async fn get_heal_sequence(&self, path: &str) -> Option { + async fn get_heal_sequence(&self, path: &str) -> Option>> { let _ = self.mu.read().await; self.heal_seq_map.get(path).cloned() @@ -479,6 +482,7 @@ impl AllHealState { async fn stop_heal_sequence(&mut self, path: &str) -> Result> { let mut hsp = HealStopSuccess::default(); if let Some(he) = self.get_heal_sequence(path).await { + let he = he.read().await; let client_token = he.client_token.clone(); if *GLOBAL_IsDistErasure.read().await { // TODO: proxy @@ -525,7 +529,7 @@ impl AllHealState { self.stop_heal_sequence(path_s).await?; } else { if let Some(hs) = self.get_heal_sequence(path_s).await { - if !hs.has_ended().await { + if !hs.read().await.has_ended().await { return Err(Error::from_string(format!("Heal is already running on the given path (use force-start option to stop and start afresh). The heal was started by IP {} at {}, token is {}", heal_sequence.client_address, heal_sequence.start_time, heal_sequence.client_token))); } } @@ -534,7 +538,7 @@ impl AllHealState { let _ = self.mu.write().await; for (k, v) in self.heal_seq_map.iter() { - if !v.has_ended().await && (has_profix(k, path_s) || has_profix(path_s, k)) { + if !v.read().await.has_ended().await && (has_profix(&k, path_s) || has_profix(path_s, &k)) { return Err(Error::from_string(format!( "The provided heal sequence path overlaps with an existing heal path: {}", k @@ -542,7 +546,7 @@ impl AllHealState { } } - self.heal_seq_map.insert(path_s.to_string(), heal_sequence.clone()); + self.heal_seq_map.insert(path_s.to_string(), Arc::new(RwLock::new(heal_sequence.clone()))); let client_token = heal_sequence.client_token.clone(); if *GLOBAL_IsDistErasure.read().await { diff --git a/ecstore/src/heal/mod.rs b/ecstore/src/heal/mod.rs index f477be10..9a3c2827 100644 --- a/ecstore/src/heal/mod.rs +++ b/ecstore/src/heal/mod.rs @@ -2,6 +2,7 @@ pub mod background_heal_ops; pub mod data_scanner; pub mod data_scanner_metric; pub mod data_usage; +pub mod error; pub mod heal_commands; pub mod heal_ops; pub mod data_usage_cache; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index a56fdb42..e76cbf76 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -528,7 +528,7 @@ fn is_reserved_bucket(bucket_name: &str) -> bool { } // 检查桶名是否为保留名或无效名 -fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool { +pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool { if bucket_entry.is_empty() { return true; } diff --git a/ecstore/src/utils/path.rs b/ecstore/src/utils/path.rs index b505bd72..654333ec 100644 --- a/ecstore/src/utils/path.rs +++ b/ecstore/src/utils/path.rs @@ -69,6 +69,19 @@ pub fn path_join(elem: &[PathBuf]) -> PathBuf { joined_path } +pub fn path_to_bucket_object_with_base_path(bash_path: &str, path: &str) -> (String, String) { + let path = path.trim_start_matches(bash_path).trim_start_matches(SLASH_SEPARATOR); + if let Some(m) = path.find(SLASH_SEPARATOR) { + return (path[..m].to_string(), path[m + SLASH_SEPARATOR.len()..].to_string()); + } + + (path.to_string(), "".to_string()) +} + +pub fn path_to_bucket_object(s: &str) -> (String, String) { + path_to_bucket_object_with_base_path("", s) +} + pub fn base_dir_from_prefix(prefix: &str) -> String { let mut base_dir = dir(prefix).to_owned(); if base_dir == "." || base_dir == "./" || base_dir == "/" {