Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-11-13 13:10:51 +00:00
parent fe50ccd39f
commit be7d1ab0cc
15 changed files with 1272 additions and 55 deletions

6
Cargo.lock generated
View File

@@ -336,6 +336,12 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
[[package]]
name = "bytesize"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
[[package]]
name = "bytestring"
version = "1.3.1"

View File

@@ -1,2 +1,21 @@
pub mod error;
pub mod globals;
/// Defers evaluation of a block of code until the end of the scope.
#[macro_export] macro_rules! defer {
($($body:tt)*) => {
let _guard = {
pub struct Guard<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for Guard<F> {
fn drop(&mut self) {
(self.0).take().map(|f| f());
}
}
Guard(Some(|| {
let _ = { $($body)* };
}))
};
};
}

View File

@@ -13,6 +13,7 @@ async-trait.workspace = true
backon.workspace = true
blake2 = "0.10.6"
bytes.workspace = true
bytesize = "1.3.0"
common.workspace = true
reader.workspace = true
glob = "0.3.1"

View File

@@ -28,7 +28,7 @@ use super::target::BucketTargets;
use lazy_static::lazy_static;
lazy_static! {
static ref GLOBAL_BucketMetadataSys: Arc<RwLock<BucketMetadataSys>> = Arc::new(RwLock::new(BucketMetadataSys::new()));
pub static ref GLOBAL_BucketMetadataSys: Arc<RwLock<BucketMetadataSys>> = Arc::new(RwLock::new(BucketMetadataSys::new()));
}
pub async fn init_bucket_metadata_sys(api: ECStore, buckets: Vec<String>) {

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
spawn,
@@ -14,6 +14,10 @@ use crate::{
error::{Error, Result},
};
type AgreedFn = Box<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()>>> + Send + 'static>;
type PartialFn = Box<dyn Fn(MetaCacheEntries, &[Option<Error>]) -> Pin<Box<dyn Future<Output = ()>>> + Send + 'static>;
type FinishedFn = Box<dyn Fn(&[Option<Error>]) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
#[derive(Default)]
pub struct ListPathRawOptions {
pub disks: Vec<Option<DiskStore>>,
@@ -26,9 +30,12 @@ pub struct ListPathRawOptions {
pub min_disks: usize,
pub report_not_found: bool,
pub per_disk_limit: i32,
pub agreed: Option<Arc<dyn Fn(MetaCacheEntry) + Send + Sync>>,
pub partial: Option<Arc<dyn Fn(MetaCacheEntries, &[Option<Error>]) + Send + Sync>>,
pub finished: Option<Arc<dyn Fn(&[Option<Error>]) + Send + Sync>>,
pub agreed: Option<AgreedFn>,
pub partial: Option<PartialFn>,
pub finished: Option<FinishedFn>,
// pub agreed: Option<Arc<dyn Fn(MetaCacheEntry) + Send + Sync>>,
// pub partial: Option<Arc<dyn Fn(MetaCacheEntries, &[Option<Error>]) + Send + Sync>>,
// pub finished: Option<Arc<dyn Fn(&[Option<Error>]) + Send + Sync>>,
}
impl Clone for ListPathRawOptions {
@@ -185,8 +192,8 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
if has_err > 0 && has_err > opts.disks.len() - opts.min_disks {
if let Some(finished_fn) = opts.finished.clone() {
finished_fn(&errs);
if let Some(finished_fn) = opts.finished.as_ref() {
finished_fn(&errs).await;
}
let mut combined_err = Vec::new();
errs.iter().zip(opts.disks.iter()).for_each(|(err, disk)| match (err, disk) {
@@ -205,7 +212,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
// Break if all at EOF or error.
if at_eof + has_err == readers.len() {
if has_err > 0 {
if let Some(finished_fn) = opts.finished.clone() {
if let Some(finished_fn) = opts.finished.as_ref() {
finished_fn(&errs);
}
break;
@@ -213,13 +220,13 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
if agree == readers.len() {
if let Some(agreed_fn) = opts.agreed.clone() {
if let Some(agreed_fn) = opts.agreed.as_ref() {
agreed_fn(current);
}
continue;
}
if let Some(partial_fn) = opts.partial.clone() {
if let Some(partial_fn) = opts.partial.as_ref() {
partial_fn(MetaCacheEntries(top_entries), &errs);
}
}

View File

@@ -9,6 +9,7 @@ use super::{
UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
};
use crate::bitrot::bitrot_verify;
use crate::bucket::metadata_sys::GLOBAL_BucketMetadataSys;
use crate::cache_value::cache::{Cache, Opts};
use crate::disk::error::{
convert_access_error, is_sys_err_handle_invalid, is_sys_err_invalid_arg, is_sys_err_is_dir, is_sys_err_not_dir,
@@ -18,6 +19,10 @@ use crate::disk::os::{check_path_length, is_empty_dir};
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::error::{Error, Result};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
use crate::heal::data_scanner::has_active_rules;
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
use crate::heal::heal_commands::HealScanMode;
use crate::new_object_layer_fn;
use crate::set_disk::{
conv_part_err_to_int, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN,
CHECK_PART_VOLUME_NOT_FOUND,
@@ -31,7 +36,9 @@ use crate::{
store_api::{FileInfo, RawFileInfo},
utils,
};
use common::defer;
use path_absolutize::Absolutize;
use s3s::dto::{ReplicationConfiguration, ReplicationRuleStatus};
use std::collections::HashSet;
use std::fmt::Debug;
use std::io::Cursor;
@@ -47,6 +54,7 @@ use time::OffsetDateTime;
use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt, ErrorKind};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use uuid::Uuid;
@@ -1868,6 +1876,36 @@ impl DiskAPI for LocalDisk {
Ok(info)
}
async fn ns_scanner(
&self,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache> {
self.scanning.fetch_add(1, Ordering::SeqCst);
defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) });
// Check if the current bucket has replication configuration
if let Ok((rcfg, _)) = GLOBAL_BucketMetadataSys
.read()
.await
.get_replication_config(&cache.info.name)
.await
{
if has_active_rules(&rcfg, "", true) {
// TODO: globalBucketTargetSys
}
}
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::msg("errServerNotInitialized")),
};
todo!()
}
}
async fn get_disk_info(drive_path: PathBuf) -> Result<(Info, bool)> {

View File

@@ -17,6 +17,10 @@ use crate::{
erasure::{ReadAt, Writer},
error::{Error, Result},
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion},
heal::{
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::HealScanMode,
},
store_api::{FileInfo, RawFileInfo},
};
use endpoint::Endpoint;
@@ -436,6 +440,12 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>>;
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo>;
async fn ns_scanner(
&self,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache>;
}
#[derive(Debug, Default, Serialize, Deserialize)]

View File

@@ -10,6 +10,7 @@ use protos::{
UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
},
};
use tokio::sync::mpsc::Sender;
use tonic::Request;
use tracing::info;
use uuid::Uuid;
@@ -20,9 +21,7 @@ use super::{
RemoteFileWriter, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
};
use crate::{
disk::error::DiskError,
error::{Error, Result},
store_api::{FileInfo, RawFileInfo},
disk::error::DiskError, error::{Error, Result}, heal::{data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::HealScanMode}, store_api::{FileInfo, RawFileInfo}
};
use protos::proto_gen::node_service::RenamePartRequst;
@@ -729,4 +728,13 @@ impl DiskAPI for RemoteDisk {
Ok(disk_info)
}
async fn ns_scanner(
&self,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache> {
todo!()
}
}

View File

@@ -1,6 +1,14 @@
use std::{
collections::{HashMap, HashSet},
fs,
future::Future,
io::{Cursor, Read},
sync::{atomic::{AtomicU32, AtomicU64}, Arc},
path::{Path, PathBuf},
pin::Pin,
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
@@ -8,20 +16,46 @@ use byteorder::{LittleEndian, ReadBytesExt};
use lazy_static::lazy_static;
use rand::Rng;
use rmp_serde::{Deserializer, Serializer};
use s3s::dto::{ReplicationConfiguration, ReplicationRuleStatus};
use serde::{Deserialize, Serialize};
use tokio::{sync::{mpsc, RwLock}, time::sleep};
use tokio::{
sync::{
broadcast,
mpsc::{self, Sender},
RwLock,
},
time::sleep,
};
use tracing::{error, info};
use crate::{
config::{common::{read_config, save_config}, heal::Config},
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
config::{
common::{read_config, save_config},
heal::Config,
},
disk::{error::DiskError, DiskStore, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams},
error::{Error, Result},
global::GLOBAL_IsErasureSD,
heal::data_usage::BACKGROUND_HEAL_INFO_PATH,
global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasure, GLOBAL_IsErasureSD},
heal::{
data_usage::BACKGROUND_HEAL_INFO_PATH,
data_usage_cache::{hash_path, DataUsageHashMap},
error::ERR_IGNORE_FILE_CONTRIB,
heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT},
heal_ops::{HealSource, BG_HEALING_UUID},
},
new_object_layer_fn,
store::ECStore,
peer::is_reserved_or_invalid_bucket,
store::{ECStore, ListPathOptions},
utils::path::{path_join, path_to_bucket_object, path_to_bucket_object_with_base_path, SLASH_SEPARATOR},
};
use super::{data_scanner_metric::globalScannerMetrics, data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}};
use super::{
data_scanner_metric::globalScannerMetrics,
data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH},
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
};
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
@@ -103,7 +137,8 @@ async fn run_data_scanner() {
}
let bg_heal_info = read_background_heal_info(store).await;
let scan_mode = get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await;
let scan_mode =
get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await;
if bg_heal_info.current_scan_mode != scan_mode {
let mut new_heal_info = bg_heal_info;
new_heal_info.current_scan_mode = scan_mode;
@@ -166,7 +201,7 @@ async fn save_background_heal_info(store: &ECStore, info: &BackgroundHealInfo) {
async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot_start_time: SystemTime) -> HealScanMode {
let bitrot_cycle = globalHealConfig.read().await.bitrot_scan_cycle();
let v = bitrot_cycle.as_secs_f64() ;
let v = bitrot_cycle.as_secs_f64();
if v == -1.0 {
return HEAL_NORMAL_SCAN;
} else if v == 0.0 {
@@ -291,3 +326,606 @@ fn system_time_to_timestamp(time: &SystemTime) -> u64 {
fn timestamp_to_system_time(timestamp: u64) -> SystemTime {
UNIX_EPOCH + std::time::Duration::new(timestamp, 0)
}
#[derive(Debug, Default)]
struct Heal {
enabled: bool,
bitrot: bool,
}
struct ScannerItem {
path: String,
bucket: String,
prefix: String,
object_name: String,
replication: Option<ReplicationConfiguration>,
// todo: lifecycle
// typ: fs::Permissions,
heal: Heal,
debug: bool,
}
impl ScannerItem {
pub fn transform_meda_dir(&mut self) {
let split = self
.prefix
.split(SLASH_SEPARATOR)
.map(|s| PathBuf::from(s))
.collect::<Vec<_>>();
if split.len() > 1 {
self.prefix = path_join(&split[0..split.len() - 1]).to_string_lossy().to_string();
} else {
self.prefix = "".to_string();
}
self.object_name = split.last().map_or("".to_string(), |v| v.to_string_lossy().to_string());
}
pub fn object_path(&self) -> PathBuf {
path_join(&[PathBuf::from(self.prefix.clone()), PathBuf::from(self.object_name.clone())])
}
}
#[derive(Debug, Default)]
pub struct SizeSummary {
pub total_size: usize,
pub versions: usize,
pub delete_markers: usize,
pub replicated_size: usize,
pub replicated_count: usize,
pub pending_size: usize,
pub failed_size: usize,
pub replica_size: usize,
pub replica_count: usize,
pub pending_count: usize,
pub failed_count: usize,
pub repl_target_stats: HashMap<String, ReplTargetSizeSummary>,
// Todo: tires
}
#[derive(Debug, Default)]
pub struct ReplTargetSizeSummary {
pub replicated_size: usize,
pub replicated_count: usize,
pub pending_size: usize,
pub failed_size: usize,
pub pending_count: usize,
pub failed_count: usize,
}
#[derive(Debug, Clone)]
struct CachedFolder {
name: String,
parent: DataUsageHash,
object_heal_prob_div: u32,
}
type GetSizeFn = Box<dyn Fn(&ScannerItem) -> Pin<Box<dyn Future<Output = Result<SizeSummary>>>> + Send + 'static>;
struct FolderScanner {
root: String,
get_size: GetSizeFn,
old_cache: DataUsageCache,
new_cache: DataUsageCache,
update_cache: DataUsageCache,
data_usage_scanner_debug: bool,
heal_object_select: u32,
scan_mode: HealScanMode,
should_heal: Arc<dyn Fn() -> bool + Send + Sync>,
disks: Vec<Option<DiskStore>>,
disks_quorum: usize,
updates: Sender<DataUsageEntry>,
last_update: SystemTime,
update_current_path: Arc<dyn Fn(&str) + Send + Sync>,
}
impl FolderScanner {
async fn scan_folder(&mut self, folder: &CachedFolder, into: &mut DataUsageEntry) -> Result<()> {
let this_hash = hash_path(&folder.name);
let was_compacted = into.compacted;
loop {
let mut abandoned_children: DataUsageHashMap = if !into.compacted {
self.old_cache.find_children_copy(this_hash.clone())
} else {
HashSet::new()
};
let (_, prefix) = path_to_bucket_object_with_base_path(&self.root, &folder.name);
// Todo: lifeCycle
let replication_cfg = if self.old_cache.info.replication.is_some()
&& has_active_rules(self.old_cache.info.replication.as_ref().unwrap(), &prefix, true)
{
self.old_cache.info.replication.clone()
} else {
None
};
let mut existing_folders = Vec::new();
let mut new_folders = Vec::new();
let mut found_objects: bool = false;
let path = Path::new(&self.root).join(&folder.name);
if path.is_dir() {
for entry in fs::read_dir(path)? {
let entry = entry?;
let sub_path = entry.path();
let ent_name = Path::new(&folder.name).join(&sub_path);
let (bucket, prefix) = path_to_bucket_object_with_base_path(&self.root, ent_name.to_str().unwrap());
if bucket.is_empty() {
continue;
}
if is_reserved_or_invalid_bucket(&bucket, false) {
continue;
}
if !sub_path.is_dir() {
let h = hash_path(ent_name.to_str().unwrap());
if h == this_hash {
continue;
}
let this = CachedFolder {
name: ent_name.to_string_lossy().to_string(),
parent: this_hash.clone(),
object_heal_prob_div: folder.object_heal_prob_div,
};
abandoned_children.remove(&h.key());
if self.old_cache.cache.contains_key(&h.key()) {
existing_folders.push(this);
self.update_cache
.copy_with_children(&self.old_cache, &h, &Some(this_hash.clone()));
} else {
new_folders.push(this);
}
continue;
}
let mut item = ScannerItem {
path: Path::new(&self.root).join(&ent_name).to_string_lossy().to_string(),
bucket,
prefix: Path::new(&prefix)
.parent()
.unwrap_or(Path::new(""))
.to_string_lossy()
.to_string(),
object_name: ent_name
.file_name()
.map(|name| name.to_string_lossy().into_owned())
.unwrap_or_default(),
debug: self.data_usage_scanner_debug,
replication: replication_cfg.clone(),
heal: Heal::default(),
};
item.heal.enabled = this_hash.mod_alt(
self.old_cache.info.next_cycle / folder.object_heal_prob_div,
self.heal_object_select / folder.object_heal_prob_div,
) && (self.should_heal)();
item.heal.bitrot = self.scan_mode == HEAL_DEEP_SCAN;
let (sz, err) = match (self.get_size)(&item).await {
Ok(sz) => (sz, None),
Err(err) => {
if err.to_string() != ERR_IGNORE_FILE_CONTRIB {
continue;
}
(SizeSummary::default(), Some(err))
}
};
// successfully read means we have a valid object.
found_objects = true;
// Remove filename i.e is the meta file to construct object name
item.transform_meda_dir();
// Object already accounted for, remove from heal map,
// simply because getSize() function already heals the
// object.
abandoned_children.remove(
&path_join(&[PathBuf::from(item.bucket.clone()), item.object_path()])
.to_string_lossy()
.to_string(),
);
if err.is_none() || err.unwrap().to_string() != ERR_IGNORE_FILE_CONTRIB {
into.add_sizes(&sz);
into.objects += 1;
}
}
}
if found_objects && *GLOBAL_IsErasure.read().await {
// If we found an object in erasure mode, we skip subdirs (only datadirs)...
break;
}
let should_compact = self.new_cache.info.name != folder.name
&& existing_folders.len() + new_folders.len() >= DATA_SCANNER_COMPACT_AT_FOLDERS.try_into().unwrap()
|| existing_folders.len() + new_folders.len() >= DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS.try_into().unwrap();
let total_folders = existing_folders.len() + new_folders.len();
if total_folders
> SCANNER_EXCESS_FOLDERS
.load(std::sync::atomic::Ordering::SeqCst)
.try_into()
.unwrap()
{
let _prefix_name = format!("{}/", folder.name.trim_end_matches('/'));
// todo: notification
}
if !into.compacted && should_compact {
into.compacted = true;
new_folders.extend(existing_folders.clone());
existing_folders.clear();
}
// Transfer existing
if !into.compacted {
for folder in existing_folders.iter() {
let h = hash_path(&folder.name);
self.update_cache
.copy_with_children(&self.old_cache, &h, &Some(folder.parent.clone()));
}
}
// Scan new...
for folder in new_folders.iter() {
let h = hash_path(&folder.name);
if !into.compacted {
let mut found_any = false;
let mut parent = this_hash.clone();
while parent != hash_path(&self.update_cache.info.name) {
let e = self.update_cache.find(&parent.key());
if e.is_none() || e.as_ref().unwrap().compacted {
found_any = true;
break;
}
match self.update_cache.search_parent(&parent) {
Some(next) => {
parent = next;
}
None => {
found_any = true;
break;
}
}
}
if !found_any {
self.update_cache
.replace_hashed(&h, &Some(this_hash.clone()), &DataUsageEntry::default());
}
}
(self.update_current_path)(&folder.name);
scan(folder, into, self).await;
// Add new folders if this is new and we don't have existing.
if !into.compacted {
if let Some(parent) = self.update_cache.find(&this_hash.key()) {
if !parent.compacted {
self.update_cache.delete_recursive(&h);
self.update_cache
.copy_with_children(&self.new_cache, &h, &Some(this_hash.clone()));
}
}
}
}
// Scan existing...
for folder in existing_folders.iter() {
let h = hash_path(&folder.name);
if !into.compacted && self.old_cache.is_compacted(&h) {
if !h.mod_(self.old_cache.info.next_cycle, DATA_USAGE_UPDATE_DIR_CYCLES) {
self.new_cache
.copy_with_children(&self.old_cache, &h, &Some(folder.parent.clone()));
into.add_child(&h);
continue;
}
}
(self.update_current_path)(&folder.name);
scan(folder, into, self).await;
}
// Scan for healing
if abandoned_children.is_empty() || !(self.should_heal)() {
break;
}
if self.disks.is_empty() || self.disks_quorum == 0 {
break;
}
let (bg_seq, found) = GLOBAL_BackgroundHealState
.read()
.await
.get_heal_sequence_by_token(BG_HEALING_UUID)
.await;
if !found {
break;
}
let bg_seq = bg_seq.unwrap();
let mut resolver = MetadataResolutionParams {
dir_quorum: self.disks_quorum,
obj_quorum: self.disks_quorum,
bucket: "".to_string(),
strict: false,
..Default::default()
};
for k in abandoned_children.iter() {
if !(self.should_heal)() {
break;
}
let (bucket, prefix) = path_to_bucket_object(k);
(self.update_current_path)(k);
if bucket != resolver.bucket {
let _ = bg_seq
.clone()
.write()
.await
.queue_heal_task(
HealSource {
bucket: bucket.clone(),
..Default::default()
},
HEAL_ITEM_BUCKET.to_owned(),
)
.await?;
}
resolver.bucket = bucket.clone();
let found_objs = Arc::new(RwLock::new(false));
let found_objs_clone = found_objs.clone();
let (tx, rx) = broadcast::channel(1);
let tx_partial = tx.clone();
let tx_finished = tx.clone();
let update_current_path_agreed = self.update_current_path.clone();
let update_current_path_partial = self.update_current_path.clone();
let should_heal_clone = self.should_heal.clone();
let resolver_clone = resolver.clone();
let bg_seq_clone = bg_seq.clone();
let lopts = ListPathRawOptions {
disks: self.disks.clone(),
bucket: bucket.clone(),
path: prefix.clone(),
recursice: true,
report_not_found: true,
min_disks: self.disks_quorum,
agreed: Some(Box::new(move |entry: MetaCacheEntry| {
Box::pin({
let update_current_path_agreed = update_current_path_agreed.clone();
async move {
update_current_path_agreed(&entry.name);
}
})
})),
partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option<Error>]| {
Box::pin({
let update_current_path_partial = update_current_path_partial.clone();
let should_heal_partial = should_heal_clone.clone();
let tx_partial = tx_partial.clone();
let resolver_partial = resolver_clone.clone();
let bucket_partial = bucket.clone();
let found_objs_clone = found_objs_clone.clone();
let bg_seq_partial = bg_seq_clone.clone();
async move {
if !should_heal_partial() {
let _ = tx_partial.send(true);
return;
}
let entry = match entries.resolve(resolver_partial) {
Ok(Some(entry)) => entry,
_ => match entries.first_found() {
(Some(entry), _) => entry,
_ => return,
},
};
update_current_path_partial(&entry.name);
let mut custom = HashMap::new();
if entry.is_dir() {
return;
}
// We got an entry which we should be able to heal.
let fiv = match entry.file_info_versions(&bucket_partial) {
Ok(fiv) => fiv,
Err(_) => {
if let Err(err) = bg_seq_partial
.write()
.await
.queue_heal_task(
HealSource {
bucket: bucket_partial.clone(),
object: entry.name.clone(),
version_id: "".to_string(),
..Default::default()
},
HEAL_ITEM_OBJECT.to_string(),
)
.await
{
match err.downcast_ref() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => {
info!("{}", err.to_string());
}
}
} else {
let mut w = found_objs_clone.write().await;
*w = *w || true;
}
return;
}
};
custom.insert("versions", fiv.versions.len().to_string());
let (mut success_versions, mut fail_versions) = (0, 0);
for ver in fiv.versions.iter() {
match bg_seq_partial
.write()
.await
.queue_heal_task(
HealSource {
bucket: bucket_partial.clone(),
object: entry.name.clone(),
version_id: "".to_string(),
..Default::default()
},
HEAL_ITEM_OBJECT.to_string(),
)
.await
{
Ok(_) => {
success_versions += 1;
let mut w = found_objs_clone.write().await;
*w = *w || true;
}
Err(_) => fail_versions += 1,
}
}
custom.insert("success_versions", success_versions.to_string());
custom.insert("failed_versions", fail_versions.to_string());
}
})
})),
finished: Some(Box::new(move |err: &[Option<Error>]| {
Box::pin({
let tx_finished = tx_finished.clone();
async move {
let _ = tx_finished.send(true);
}
})
})),
..Default::default()
};
let _ = list_path_raw(rx, lopts).await;
if *found_objs.read().await {
let this = CachedFolder {
name: k.clone(),
parent: this_hash.clone(),
object_heal_prob_div: 1,
};
scan(&this, into, self).await;
}
}
break;
}
if !was_compacted {
self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &into);
}
if !into.compacted && self.new_cache.info.name != folder.name {
let mut flat = self
.new_cache
.size_recursive(&this_hash.key())
.unwrap_or(DataUsageEntry::default());
flat.compacted = true;
let mut compact = false;
if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() {
compact = true;
} else {
// Compact if we only have objects as children...
compact = true;
for k in into.children.iter() {
if let Some(v) = self.new_cache.cache.get(k) {
if !v.children.is_empty() || v.objects > 1 {
compact = false;
break;
}
}
}
}
if compact {
self.new_cache.delete_recursive(&this_hash);
self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat);
let mut total: HashMap<String, String> = HashMap::new();
total.insert("objects".to_string(), flat.objects.to_string());
total.insert("size".to_string(), flat.size.to_string());
if flat.versions > 0 {
total.insert("versions".to_string(), flat.versions.to_string());
}
}
}
// Compact if too many children...
if !into.compacted {
self.new_cache.reduce_children_of(
&this_hash,
DATA_SCANNER_COMPACT_AT_CHILDREN.try_into().unwrap(),
self.new_cache.info.name != folder.name,
);
}
if self.update_cache.cache.contains_key(&this_hash.key()) && !was_compacted {
// Replace if existed before.
if let Some(flat) = self.new_cache.size_recursive(&this_hash.key()) {
self.update_cache.delete_recursive(&this_hash);
self.update_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat);
}
}
Ok(())
}
async fn send_update(&mut self) {
if SystemTime::now().duration_since(self.last_update).unwrap() < Duration::from_secs(60) {
return;
}
if let Some(flat) = self.update_cache.size_recursive(&self.new_cache.info.name) {
let _ = self.updates.send(flat).await;
self.last_update = SystemTime::now();
}
}
}
async fn scan(folder: &CachedFolder, into: &mut DataUsageEntry, folder_scanner: &mut FolderScanner) {
let mut dst = if !into.compacted {
DataUsageEntry::default()
} else {
into.clone()
};
if Box::pin(folder_scanner.scan_folder(folder, &mut dst)).await.is_err() {
return;
}
if !into.compacted {
let h = DataUsageHash(folder.name.clone());
into.add_child(&h);
folder_scanner.update_cache.delete_recursive(&h);
folder_scanner
.update_cache
.copy_with_children(&folder_scanner.new_cache, &h, &Some(folder.parent.clone()));
folder_scanner.send_update().await;
}
}
pub fn has_active_rules(config: &ReplicationConfiguration, prefix: &str, recursive: bool) -> bool {
if config.rules.is_empty() {
return false;
}
for rule in config.rules.iter() {
if rule
.status
.eq(&ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED))
{
continue;
}
if !prefix.is_empty() {
if let Some(filter) = &rule.filter {
if let Some(r_prefix) = &filter.prefix {
if !r_prefix.is_empty() {
// incoming prefix must be in rule prefix
if !recursive && !prefix.starts_with(r_prefix) {
continue;
}
// If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix
// does not match
if recursive && !r_prefix.starts_with(prefix) && !prefix.starts_with(r_prefix) {
continue;
}
}
}
}
}
return true;
}
false
}

View File

@@ -1,10 +1,16 @@
use bytesize::ByteSize;
use http::HeaderMap;
use path_clean::PathClean;
use rand::Rng;
use rmp_serde::Serializer;
use s3s::dto::ReplicationConfiguration;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::u64;
use s3s::{S3Error, S3ErrorCode};
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
@@ -14,21 +20,162 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::error::{Error, Result};
use crate::new_object_layer_fn;
use crate::set_disk::SetDisks;
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions, StorageAPI};
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions};
use super::data_scanner::SizeSummary;
use super::data_usage::DATA_USAGE_ROOT;
// DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals
pub const DATA_USAGE_BUCKET_LEN: usize = 11;
pub const DATA_USAGE_VERSION_LEN: usize = 7;
type DataUsageHashMap = HashSet<String>;
// sizeHistogram is a size histogram.
type SizeHistogram = Vec<u64>;
// versionsHistogram is a histogram of number of versions in an object.
type VersionsHistogram = Vec<u64>;
pub type DataUsageHashMap = HashSet<String>;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ObjectHistogramInterval {
name: &'static str,
start: u64,
end: u64,
}
const OBJECTS_HISTOGRAM_INTERVALS: [ObjectHistogramInterval; DATA_USAGE_BUCKET_LEN] = [
ObjectHistogramInterval {
name: "LESS_THAN_1024_B",
start: 0,
end: ByteSize::kib(1).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_1024_B_AND_64_KB",
start: ByteSize::kib(1).as_u64(),
end: ByteSize::kib(64).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_64_KB_AND_256_KB",
start: ByteSize::kib(64).as_u64(),
end: ByteSize::kib(256).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_256_KB_AND_512_KB",
start: ByteSize::kib(256).as_u64(),
end: ByteSize::kib(512).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_512_KB_AND_1_MB",
start: ByteSize::kib(512).as_u64(),
end: ByteSize::mib(1).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_1024B_AND_1_MB",
start: ByteSize::kib(1).as_u64(),
end: ByteSize::mib(1).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_1_MB_AND_10_MB",
start: ByteSize::mib(1).as_u64(),
end: ByteSize::mib(10).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_10_MB_AND_64_MB",
start: ByteSize::mib(10).as_u64(),
end: ByteSize::mib(64).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_64_MB_AND_128_MB",
start: ByteSize::mib(64).as_u64(),
end: ByteSize::mib(128).as_u64() - 1,
},
ObjectHistogramInterval {
name: "BETWEEN_128_MB_AND_512_MB",
start: ByteSize::mib(128).as_u64(),
end: ByteSize::mib(512).as_u64() - 1,
},
ObjectHistogramInterval {
name: "GREATER_THAN_512_MB",
start: ByteSize::mib(512).as_u64(),
end: u64::MAX,
},
];
const OBJECTS_VERSION_COUNT_INTERVALS: [ObjectHistogramInterval; DATA_USAGE_VERSION_LEN] = [
ObjectHistogramInterval {
name: "UNVERSIONED",
start: 0,
end: 0,
},
ObjectHistogramInterval {
name: "SINGLE_VERSION",
start: 1,
end: 1,
},
ObjectHistogramInterval {
name: "BETWEEN_2_AND_10",
start: 2,
end: 9,
},
ObjectHistogramInterval {
name: "BETWEEN_10_AND_100",
start: 10,
end: 99,
},
ObjectHistogramInterval {
name: "BETWEEN_100_AND_1000",
start: 100,
end: 999,
},
ObjectHistogramInterval {
name: "BETWEEN_1000_AND_10000",
start: 1000,
end: 9999,
},
ObjectHistogramInterval {
name: "GREATER_THAN_10000",
start: 10000,
end: u64::MAX,
},
];
// sizeHistogram is a size histogram.
#[derive(Clone, Serialize, Deserialize)]
pub struct SizeHistogram(Vec<u64>);
impl Default for SizeHistogram {
fn default() -> Self {
Self(vec![0; DATA_USAGE_BUCKET_LEN])
}
}
impl SizeHistogram {
fn add(&mut self, size: u64) {
for (idx, interval) in OBJECTS_HISTOGRAM_INTERVALS.iter().enumerate() {
if size >= interval.start && size <= interval.end {
self.0[idx] += 1;
break;
}
}
}
}
// versionsHistogram is a histogram of number of versions in an object.
#[derive(Clone, Serialize, Deserialize)]
pub struct VersionsHistogram(Vec<u64>);
impl Default for VersionsHistogram {
fn default() -> Self {
Self(vec![0; DATA_USAGE_VERSION_LEN])
}
}
impl VersionsHistogram {
fn add(&mut self, size: u64) {
for (idx, interval) in OBJECTS_VERSION_COUNT_INTERVALS.iter().enumerate() {
if size >= interval.start && size <= interval.end {
self.0[idx] += 1;
break;
}
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ReplicationStats {
pub pending_size: u64,
pub replicated_size: u64,
@@ -48,7 +195,7 @@ impl ReplicationStats {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ReplicationAllStats {
pub targets: HashMap<String, ReplicationStats>,
pub replica_size: u64,
@@ -70,35 +217,117 @@ impl ReplicationAllStats {
}
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct DataUsageEntry {
pub children: DataUsageHashMap,
// These fields do no include any children.
pub size: i64,
pub objects: u64,
pub versions: u64,
pub delete_markers: u64,
pub size: usize,
pub objects: usize,
pub versions: usize,
pub delete_markers: usize,
pub obj_sizes: SizeHistogram,
pub obj_versions: VersionsHistogram,
pub replication_stats: ReplicationAllStats,
pub replication_stats: Option<ReplicationAllStats>,
// Todo: tier
// pub all_tier_stats: ,
pub compacted: bool,
}
impl DataUsageEntry {
pub fn add_child(&mut self, hash: &DataUsageHash) {
if self.children.contains(&hash.key()) {
return;
}
self.children.insert(hash.key());
}
pub fn add_sizes(&mut self, summary: &SizeSummary) {
self.size += summary.total_size;
self.versions += summary.versions;
self.delete_markers += summary.delete_markers;
self.obj_sizes.add(summary.total_size as u64);
self.obj_versions.add(summary.versions as u64);
let replication_stats = if self.replication_stats.is_none() {
self.replication_stats = Some(ReplicationAllStats::default());
self.replication_stats.as_mut().unwrap()
} else {
self.replication_stats.as_mut().unwrap()
};
replication_stats.replica_size += summary.replica_size as u64;
replication_stats.replica_count += summary.replica_count as u64;
for (arn, st) in &summary.repl_target_stats {
let tgt_stat = replication_stats.targets.entry(arn.to_string()).or_insert(ReplicationStats::default());
tgt_stat.pending_size += st.pending_size as u64;
tgt_stat.failed_size += st.failed_size as u64;
tgt_stat.replicated_size += st.replicated_size as u64;
tgt_stat.replicated_count += st.replicated_count as u64;
tgt_stat.failed_count += st.failed_count as u64;
tgt_stat.pending_count += st.pending_count as u64;
}
// Todo:: tiers
}
pub fn merge(&mut self, other: &DataUsageEntry) {
self.objects += other.objects;
self.versions += other.versions;
self.delete_markers += other.delete_markers;
self.size += other.size;
if let Some(o_rep) = &other.replication_stats {
if self.replication_stats.is_none() {
self.replication_stats = Some(ReplicationAllStats::default());
}
let s_rep = self.replication_stats.as_mut().unwrap();
s_rep.targets.clear();
s_rep.replica_size += o_rep.replica_size;
s_rep.replica_count += o_rep.replica_count;
for (arn, stat) in o_rep.targets.iter() {
let st = s_rep.targets.entry(arn.clone()).or_insert(ReplicationStats::default());
*st = ReplicationStats {
pending_size: stat.pending_size + st.pending_size,
failed_size: stat.failed_size + st.failed_size,
replicated_size: stat.replicated_size + st.replicated_size,
pending_count: stat.pending_count + st.pending_count,
failed_count: stat.failed_count + st.failed_count,
replicated_count: stat.replicated_count + st.replicated_count,
..Default::default()
};
}
}
for (i, v) in other.obj_sizes.0.iter().enumerate() {
self.obj_sizes.0[i] += v;
}
for (i, v) in other.obj_versions.0.iter().enumerate() {
self.obj_versions.0[i] += v;
}
// todo: tiers
}
}
#[derive(Clone)]
pub struct DataUsageEntryInfo {
pub name: String,
pub parent: String,
pub entry: DataUsageEntry,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct DataUsageCacheInfo {
pub name: String,
pub next_cycle: usize,
pub next_cycle: u32,
pub last_update: SystemTime,
pub skip_healing: bool,
// todo: life_cycle
// pub life_cycle:
#[serde(skip)]
pub updates: Option<Sender<DataUsageEntry>>,
// Todo: replication
// #[serde(skip_serializing)]
// replication:
#[serde(skip)]
pub replication: Option<ReplicationConfiguration>,
}
impl Default for DataUsageCacheInfo {
@@ -109,6 +338,7 @@ impl Default for DataUsageCacheInfo {
last_update: SystemTime::now(),
skip_healing: Default::default(),
updates: Default::default(),
replication: Default::default(),
}
}
}
@@ -202,6 +432,195 @@ impl DataUsageCache {
pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) {
let hash = hash_path(path);
self.cache.insert(hash.key(), e);
if !parent.is_empty() {
let phash = hash_path(parent);
let p = {
let p = self.cache.entry(phash.key()).or_insert(DataUsageEntry::default());
p.add_child(&hash);
p.clone()
};
self.cache.insert(phash.key(), p);
}
}
pub fn replace_hashed(&mut self, hash: &DataUsageHash, parent: &Option<DataUsageHash>, e: &DataUsageEntry) {
self.cache.insert(hash.key(), e.clone());
if let Some(parent) = parent {
self.cache.entry(parent.key()).or_insert(DataUsageEntry::default()).add_child(hash);
}
}
pub fn find(&self, path: &str) -> Option<DataUsageEntry> {
self.cache.get(&hash_path(path).key()).cloned()
}
pub fn find_children_copy(&mut self, h: DataUsageHash) -> DataUsageHashMap {
self.cache.entry(h.string()).or_insert(DataUsageEntry::default()).children.clone()
}
pub fn flatten(&self, root: &DataUsageEntry) -> DataUsageEntry {
let mut root = root.clone();
for id in root.children.clone().iter() {
if let Some(e) = self.cache.get(id) {
let mut e = e.clone();
if !e.children.is_empty() {
e = self.flatten(&e);
}
root.merge(&e);
}
}
root.children.clear();
root
}
pub fn copy_with_children(&mut self, src: &DataUsageCache, hash: &DataUsageHash, parent: &Option<DataUsageHash>) {
match src.cache.get(&hash.string()) {
Some(e) => {
self.cache.insert(hash.key(), e.clone());
for ch in e.children.iter() {
if *ch == hash.key() {
return;
}
self.copy_with_children(src, &DataUsageHash(ch.to_string()), &Some(hash.clone()));
}
if let Some(parent) = parent {
let p = self.cache.entry(parent.key()).or_insert(DataUsageEntry::default());
p.add_child(hash);
}
},
None => return,
}
}
pub fn delete_recursive(&mut self, hash: &DataUsageHash) {
let mut need_remove = Vec::new();
if let Some(v) = self.cache.get(&hash.string()) {
for child in v.children.iter() {
need_remove.push(child.clone());
}
}
self.cache.remove(&hash.string());
need_remove.iter().for_each(|child| {
self.delete_recursive(&DataUsageHash(child.to_string()));
});
}
pub fn size_recursive(&self, path: &str) -> Option<DataUsageEntry> {
match self.find(path) {
Some(root) => {
if root.children.is_empty() {
return None;
}
let mut flat = self.flatten(&root);
if flat.replication_stats.is_some() && flat.replication_stats.as_ref().unwrap().empty() {
flat.replication_stats = None;
}
return Some(flat);
},
None => None
}
}
pub fn search_parent(&self, hash: &DataUsageHash) -> Option<DataUsageHash> {
let want = hash.key();
if let Some(last_index) = want.rfind('/') {
if let Some(v) = self.find(&want[0..last_index]) {
if v.children.contains(&want) {
let found = hash_path(&want[0..last_index]);
return Some(found);
}
}
}
for (k, v) in self.cache.iter() {
if v.children.contains(&want) {
let found = DataUsageHash(k.clone());
return Some(found);
}
}
None
}
pub fn is_compacted(&self, hash: &DataUsageHash) -> bool {
match self.cache.get(&hash.key()) {
Some(due) => {
due.compacted
},
None => false
}
}
pub fn reduce_children_of(&mut self, path: &DataUsageHash, limit: usize, compact_self: bool) {
let e = match self.cache.get(&path.key()) {
Some(e) => e,
None => return,
};
if e.compacted {
return;
}
if e.children.len() > limit && compact_self {
let mut flat = self.size_recursive(&path.key()).unwrap_or(DataUsageEntry::default());
flat.compacted = true;
self.delete_recursive(path);
self.replace_hashed(path, &None, &flat);
return;
}
let total = self.total_children_rec(&path.key());
if total < limit {
return;
}
let mut leaves = Vec::new();
let mut remove = total - limit;
add(self, path, &mut leaves);
leaves.sort_by(|a, b| {
a.objects.cmp(&b.objects)
});
while remove > 0 && leaves.len() > 0 {
let e = leaves.first().unwrap();
let candidate = e.path.clone();
if candidate == *path && !compact_self {
break;
}
let removing = self.total_children_rec(&candidate.key());
let mut flat = match self.size_recursive(&candidate.key()) {
Some(flat) => flat,
None => {
leaves.remove(0);
continue;
}
};
flat.compacted = true;
self.delete_recursive(&candidate);
self.replace_hashed(&candidate, &None, &flat);
remove -= removing;
leaves.remove(0);
}
}
pub fn total_children_rec(&self, path: &str) -> usize {
let root = self.find(path);
if root.is_none() {
return 0;
}
let root = root.unwrap();
if root.children.is_empty() {
return 0;
}
let mut n = root.children.len();
for ch in root.children.iter() {
n += self.total_children_rec(&ch);
}
n
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
@@ -218,10 +637,63 @@ impl DataUsageCache {
}
}
struct DataUsageHash(String);
#[derive(Default, Clone)]
struct Inner {
objects: usize,
path: DataUsageHash,
}
fn add(data_usage_cache: &DataUsageCache, path: &DataUsageHash, leaves: &mut Vec<Inner>) {
let e = match data_usage_cache.cache.get(&path.key()) {
Some(e) => e,
None => return,
};
if !e.children.is_empty() {
return;
}
let sz = data_usage_cache.size_recursive(&path.key()).unwrap_or(DataUsageEntry::default());
leaves.push(Inner{objects: sz.objects, path: path.clone()});
for ch in e.children.iter() {
add(data_usage_cache, &DataUsageHash(ch.clone()), leaves);
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DataUsageHash(pub String);
impl DataUsageHash {
pub fn string(&self) -> String {
self.0.clone()
}
pub fn key(&self) -> String {
self.0.clone()
}
pub fn mod_(&self, cycle: u32, cycles: u32) -> bool {
if cycles <= 1 {
return cycles == 1;
}
let hash = self.calculate_hash();
hash as u32 % cycles == cycle % cycles
}
pub fn mod_alt(&self, cycle: u32, cycles: u32) -> bool {
if cycles <= 1 {
return cycles == 1;
}
let hash = self.calculate_hash();
(hash >> 32) as u32 % cycles == cycle % cycles
}
fn calculate_hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.0.hash(&mut hasher);
hasher.finish()
}
}
pub fn hash_path(data: &str) -> DataUsageHash {
@@ -229,6 +701,5 @@ pub fn hash_path(data: &str) -> DataUsageHash {
if data != DATA_USAGE_ROOT {
data = data.trim_matches('/');
}
Path::new(&data);
todo!()
DataUsageHash(Path::new(&data).clean().to_string_lossy().to_string())
}

View File

@@ -0,0 +1 @@
pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage";

View File

@@ -60,12 +60,13 @@ pub struct HealSequenceStatus {
pub items: Vec<HealResultItem>,
}
#[derive(Debug, Default)]
pub struct HealSource {
pub bucket: String,
pub object: String,
pub version_id: String,
pub no_wait: bool,
opts: Option<HealOpts>,
pub opts: Option<HealOpts>,
}
#[derive(Clone, Debug)]
@@ -245,7 +246,7 @@ impl HealSequence {
Ok(())
}
async fn queue_heal_task(&mut self, source: HealSource, heal_type: HealItemType) -> Result<()> {
pub async fn queue_heal_task(&mut self, source: HealSource, heal_type: HealItemType) -> Result<()> {
let mut task = HealTask::new(&source.bucket, &source.object, &source.version_id, &self.setting);
if let Some(opts) = source.opts {
task.opts = opts;
@@ -348,7 +349,7 @@ pub async fn heal_sequence_start(h: Arc<HealSequence>) {
pub struct AllHealState {
mu: RwLock<bool>,
heal_seq_map: HashMap<String, HealSequence>,
heal_seq_map: HashMap<String, Arc<RwLock<HealSequence>>>,
heal_local_disks: HashMap<Endpoint, bool>,
heal_status: HashMap<String, HealingTracker>,
}
@@ -449,7 +450,8 @@ impl AllHealState {
let mut keys_to_reomve = Vec::new();
for (k, v) in self.heal_seq_map.iter() {
if v.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(v.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now {
let r = v.read().await;
if r.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(r.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now {
keys_to_reomve.push(k.clone())
}
}
@@ -458,11 +460,12 @@ impl AllHealState {
}
}
async fn get_heal_sequence_by_token(&self, token: &str) -> (Option<HealSequence>, bool) {
pub async fn get_heal_sequence_by_token(&self, token: &str) -> (Option<Arc<RwLock<HealSequence>>>, bool) {
let _ = self.mu.read().await;
for v in self.heal_seq_map.values() {
if v.client_token == token {
let r = v.read().await;
if r.client_token == token {
return (Some(v.clone()), true);
}
}
@@ -470,7 +473,7 @@ impl AllHealState {
(None, false)
}
async fn get_heal_sequence(&self, path: &str) -> Option<HealSequence> {
async fn get_heal_sequence(&self, path: &str) -> Option<Arc<RwLock<HealSequence>>> {
let _ = self.mu.read().await;
self.heal_seq_map.get(path).cloned()
@@ -479,6 +482,7 @@ impl AllHealState {
async fn stop_heal_sequence(&mut self, path: &str) -> Result<Vec<u8>> {
let mut hsp = HealStopSuccess::default();
if let Some(he) = self.get_heal_sequence(path).await {
let he = he.read().await;
let client_token = he.client_token.clone();
if *GLOBAL_IsDistErasure.read().await {
// TODO: proxy
@@ -525,7 +529,7 @@ impl AllHealState {
self.stop_heal_sequence(path_s).await?;
} else {
if let Some(hs) = self.get_heal_sequence(path_s).await {
if !hs.has_ended().await {
if !hs.read().await.has_ended().await {
return Err(Error::from_string(format!("Heal is already running on the given path (use force-start option to stop and start afresh). The heal was started by IP {} at {}, token is {}", heal_sequence.client_address, heal_sequence.start_time, heal_sequence.client_token)));
}
}
@@ -534,7 +538,7 @@ impl AllHealState {
let _ = self.mu.write().await;
for (k, v) in self.heal_seq_map.iter() {
if !v.has_ended().await && (has_profix(k, path_s) || has_profix(path_s, k)) {
if !v.read().await.has_ended().await && (has_profix(&k, path_s) || has_profix(path_s, &k)) {
return Err(Error::from_string(format!(
"The provided heal sequence path overlaps with an existing heal path: {}",
k
@@ -542,7 +546,7 @@ impl AllHealState {
}
}
self.heal_seq_map.insert(path_s.to_string(), heal_sequence.clone());
self.heal_seq_map.insert(path_s.to_string(), Arc::new(RwLock::new(heal_sequence.clone())));
let client_token = heal_sequence.client_token.clone();
if *GLOBAL_IsDistErasure.read().await {

View File

@@ -2,6 +2,7 @@ pub mod background_heal_ops;
pub mod data_scanner;
pub mod data_scanner_metric;
pub mod data_usage;
pub mod error;
pub mod heal_commands;
pub mod heal_ops;
pub mod data_usage_cache;

View File

@@ -528,7 +528,7 @@ fn is_reserved_bucket(bucket_name: &str) -> bool {
}
// 检查桶名是否为保留名或无效名
fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool {
pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool {
if bucket_entry.is_empty() {
return true;
}

View File

@@ -69,6 +69,19 @@ pub fn path_join(elem: &[PathBuf]) -> PathBuf {
joined_path
}
pub fn path_to_bucket_object_with_base_path(bash_path: &str, path: &str) -> (String, String) {
let path = path.trim_start_matches(bash_path).trim_start_matches(SLASH_SEPARATOR);
if let Some(m) = path.find(SLASH_SEPARATOR) {
return (path[..m].to_string(), path[m + SLASH_SEPARATOR.len()..].to_string());
}
(path.to_string(), "".to_string())
}
pub fn path_to_bucket_object(s: &str) -> (String, String) {
path_to_bucket_object_with_base_path("", s)
}
pub fn base_dir_from_prefix(prefix: &str) -> String {
let mut base_dir = dir(prefix).to_owned();
if base_dir == "." || base_dir == "./" || base_dir == "/" {