Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-11-16 12:57:31 +08:00
parent 2defaa801b
commit 7eed1a6db3
13 changed files with 2289 additions and 614 deletions

View File

@@ -0,0 +1,106 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Clone, Debug, Default)]
pub struct AccElem {
pub total: u64,
pub size: u64,
pub n: u64,
}
impl AccElem {
pub fn add(&mut self, dur: &Duration) {
let dur = dur.as_secs();
self.total += dur;
self.n += 1;
}
pub fn merge(&mut self, b: &AccElem) {
self.n += b.n;
self.total += b.total;
self.size += b.size;
}
pub fn avg(&self) -> Duration {
if self.n >= 1 && self.total > 0 {
return Duration::from_secs(self.total / self.n);
}
Duration::from_secs(0)
}
}
#[derive(Clone)]
pub struct LastMinuteLatency {
pub totals: Vec<AccElem>,
pub last_sec: u64,
}
impl Default for LastMinuteLatency {
fn default() -> Self {
Self {
totals: vec![AccElem::default(); 60],
last_sec: Default::default(),
}
}
}
impl LastMinuteLatency {
pub fn merge(&mut self, o: &mut LastMinuteLatency) -> LastMinuteLatency {
let mut merged = LastMinuteLatency::default();
if self.last_sec > o.last_sec {
o.forward_to(self.last_sec);
merged.last_sec = self.last_sec;
} else {
self.forward_to(o.last_sec);
merged.last_sec = o.last_sec;
}
for i in 0..merged.totals.len() {
merged.totals[i] = AccElem {
total: self.totals[i].total + o.totals[i].total,
n: self.totals[i].n + o.totals[i].n,
size: self.totals[i].size + o.totals[i].size,
}
}
merged
}
pub fn add(&mut self, t: &Duration) {
let sec = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
self.forward_to(sec);
let win_idx = sec % 60;
self.totals[win_idx as usize].add(t);
self.last_sec = sec;
}
pub fn add_all(&mut self, sec: u64, a: &AccElem) {
self.forward_to(sec);
let win_idx = sec % 60;
self.totals[win_idx as usize].merge(a);
self.last_sec = sec;
}
pub fn get_total(&mut self) -> AccElem {
let mut res = AccElem::default();
let sec = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
self.forward_to(sec);
for elem in self.totals.iter() {
res.merge(elem);
}
res
}
pub fn forward_to(&mut self, t: u64) {
if self.last_sec >= t {
return;
}
if t - self.last_sec >= 60 {
self.totals = vec![AccElem::default(); 60];
return;
}
while self.last_sec != t {
let idx = (self.last_sec + 1) % 60;
self.totals[idx as usize] = AccElem::default();
self.last_sec += 1;
}
}
}

View File

@@ -1,5 +1,6 @@
pub mod error;
pub mod globals;
pub mod last_minute;
/// Defers evaluation of a block of code until the end of the scope.
#[macro_export]

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -19,15 +19,17 @@ use crate::disk::os::{check_path_length, is_empty_dir};
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::error::{Error, Result};
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
use crate::heal::data_scanner::has_active_rules;
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, SizeSummary};
use crate::heal::data_scanner_metric::{globalScannerMetrics, ScannerMetric, ScannerMetrics};
use crate::heal::data_usage_cache::{self, DataUsageCache, DataUsageEntry};
use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE};
use crate::heal::heal_commands::HealScanMode;
use crate::new_object_layer_fn;
use crate::set_disk::{
conv_part_err_to_int, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN,
CHECK_PART_VOLUME_NOT_FOUND,
};
use crate::store_api::BitrotAlgorithm;
use crate::store_api::{BitrotAlgorithm, StorageAPI};
use crate::utils::fs::{access, lstat, O_APPEND, O_CREATE, O_RDONLY, O_WRONLY};
use crate::utils::os::get_info;
use crate::utils::path::{clean, has_suffix, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR};
@@ -39,13 +41,13 @@ use crate::{
use common::defer;
use path_absolutize::Absolutize;
use s3s::dto::{ReplicationConfiguration, ReplicationRuleStatus};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::io::Cursor;
use std::os::unix::fs::MetadataExt;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use std::{
fs::Metadata,
path::{Path, PathBuf},
@@ -1878,7 +1880,7 @@ impl DiskAPI for LocalDisk {
}
async fn ns_scanner(
&self,
self: Arc<Self>,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
@@ -1904,7 +1906,112 @@ impl DiskAPI for LocalDisk {
Some(s) => s,
None => return Err(Error::msg("errServerNotInitialized")),
};
todo!()
let loc = self.get_disk_location();
let disks = store.get_disks(loc.pool_idx.unwrap(), loc.disk_idx.unwrap()).await?;
let disk = self.clone();
let disk_clone = disk.clone();
let mut cache = cache.clone();
cache.info.updates = Some(updates.clone());
let mut data_usage_info = scan_data_folder(
&disks,
disk,
&cache,
Box::new(move |item: &ScannerItem| {
let mut item = item.clone();
let disk = disk_clone.clone();
Box::pin(async move {
if item.path.ends_with(&format!("{}{}", SLASH_SEPARATOR, STORAGE_FORMAT_FILE)) {
return Err(Error::from_string(ERR_SKIP_FILE));
}
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject);
let mut res = HashMap::new();
let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata);
let buf = match disk.read_metadata(item.path.clone()).await {
Ok(buf) => buf,
Err(err) => {
res.insert("err".to_string(), err.to_string());
stop_fn(&res).await;
return Err(Error::from_string(ERR_SKIP_FILE));
}
};
done_sz(buf.len() as u64).await;
res.insert("metasize".to_string(), buf.len().to_string());
item.transform_meda_dir();
let meta_cache = MetaCacheEntry {
name: item.object_path().to_string_lossy().to_string(),
metadata: buf,
..Default::default()
};
let fivs = match meta_cache.file_info_versions(&item.bucket) {
Ok(fivs) => fivs,
Err(err) => {
res.insert("err".to_string(), err.to_string());
stop_fn(&res).await;
return Err(Error::from_string(ERR_SKIP_FILE));
}
};
let mut size_s = SizeSummary::default();
let done = ScannerMetrics::time(ScannerMetric::ApplyAll);
let obj_infos = match item.apply_versions_actions(&fivs.versions).await {
Ok(obj_infos) => obj_infos,
Err(err) => {
res.insert("err".to_string(), err.to_string());
stop_fn(&res).await;
return Err(Error::from_string(ERR_SKIP_FILE));
}
};
let versioned = false;
let mut obj_deleted = false;
for info in obj_infos.iter() {
let done = ScannerMetrics::time(ScannerMetric::ApplyVersion);
let mut sz = 0;
(obj_deleted, sz) = item.apply_actions(&info, &size_s).await;
done().await;
if obj_deleted {
break;
}
let actual_sz = match info.get_actual_size() {
Ok(size) => size,
Err(_) => continue,
};
if info.delete_marker {
size_s.delete_markers += 1;
}
if info.version_id.is_some() && sz == actual_sz {
size_s.versions += 1;
}
size_s.total_size += sz;
if info.delete_marker {
continue;
}
}
for frer_version in fivs.free_versions.iter() {
let _obj_info = frer_version.to_object_info(&item.bucket, &item.object_path().to_string_lossy(), versioned);
let done = ScannerMetrics::time(ScannerMetric::TierObjSweep);
done().await;
}
// todo: global trace
if obj_deleted {
return Err(Error::from_string(ERR_IGNORE_FILE_CONTRIB));
}
done().await;
Ok(size_s)
})
}),
scan_mode,
)
.await?;
data_usage_info.info.last_update = Some(SystemTime::now());
Ok(data_usage_info)
}
}

View File

@@ -16,7 +16,7 @@ const STORAGE_FORMAT_FILE: &str = "xl.meta";
use crate::{
erasure::Writer,
error::{Error, Result},
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion},
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion, FileMetaVersion},
heal::{
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::HealScanMode,
@@ -344,14 +344,14 @@ impl DiskAPI for Disk {
}
async fn ns_scanner(
&self,
self: Arc<Self>,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache> {
match self {
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await,
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await,
match &*self {
Disk::Local(local_disk) => Arc::new(local_disk).ns_scanner(cache, updates, scan_mode).await,
Disk::Remote(remote_disk) => Arc::new(remote_disk).ns_scanner(cache, updates, scan_mode).await,
}
}
}
@@ -453,7 +453,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>>;
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo>;
async fn ns_scanner(
&self,
self: Arc<Self>,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
use futures::lock::Mutex;
use protos::{
@@ -736,7 +736,7 @@ impl DiskAPI for RemoteDisk {
}
async fn ns_scanner(
&self,
self: Arc<Self>,
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,

View File

@@ -12,7 +12,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};
use byteorder::{LittleEndian, ReadBytesExt};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use lazy_static::lazy_static;
use rand::Rng;
use rmp_serde::{Deserializer, Serializer};
@@ -29,12 +29,11 @@ use tokio::{
use tracing::{error, info};
use super::{
data_scanner_metric::globalScannerMetrics,
data_scanner_metric::{globalScannerMetrics, ScannerMetric, ScannerMetrics},
data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH},
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
};
use crate::disk::DiskAPI;
use crate::heal::data_scanner_metric::current_path_updater;
use crate::heal::data_usage::DATA_USAGE_ROOT;
use crate::{
@@ -58,6 +57,10 @@ use crate::{
store::{ECStore, ListPathOptions},
utils::path::{path_join, path_to_bucket_object, path_to_bucket_object_with_base_path, SLASH_SEPARATOR},
};
use crate::{
disk::DiskAPI,
store_api::{FileInfo, ObjectInfo},
};
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
@@ -132,6 +135,7 @@ async fn run_data_scanner() {
}
loop {
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
cycle_info.current = cycle_info.next;
cycle_info.started = SystemTime::now();
{
@@ -155,6 +159,26 @@ async fn run_data_scanner() {
tokio::spawn(async {
store_data_usage_in_backend(rx).await;
});
let mut res = HashMap::new();
res.insert("cycle".to_string(), cycle_info.current.to_string());
match store.ns_scanner(tx, cycle_info.current as usize, scan_mode).await {
Ok(_) => {
cycle_info.next += 1;
cycle_info.current = 0;
cycle_info.cycle_completed.push(SystemTime::now());
if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize {
cycle_info.cycle_completed = cycle_info.cycle_completed[cycle_info.cycle_completed.len() - DATA_USAGE_UPDATE_DIR_CYCLES as usize..].to_vec();
}
globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await;
let mut tmp = Vec::new();
tmp.write_u64::<LittleEndian>(cycle_info.next).unwrap();
let _ = save_config(store, &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await;
},
Err(err) => {
res.insert("error".to_string(), err.to_string());
},
}
stop_fn(&res).await;
sleep(Duration::from_secs(SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst))).await;
}
}
@@ -241,7 +265,7 @@ impl Default for CurrentScannerCycle {
}
impl CurrentScannerCycle {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
pub fn marshal_msg(&self, buf: &[u8]) -> Result<Vec<u8>> {
let len: u32 = 4;
let mut wr = Vec::new();
@@ -267,8 +291,9 @@ impl CurrentScannerCycle {
.serialize(&mut Serializer::new(&mut buf))
.expect("Serialization failed");
rmp::encode::write_bin(&mut wr, &buf)?;
Ok(wr)
let mut result = buf.to_vec();
result.extend(wr.iter());
Ok(result)
}
#[tracing::instrument]
@@ -295,10 +320,10 @@ impl CurrentScannerCycle {
self.current = u;
}
"next" => {
let u: u64 = rmp::decode::read_int(&mut cur)?;
self.next = u;
}
// "next" => {
// let u: u64 = rmp::decode::read_int(&mut cur)?;
// self.next = u;
// }
"started" => {
let u: u64 = rmp::decode::read_int(&mut cur)?;
let started = timestamp_to_system_time(u);
@@ -329,22 +354,23 @@ fn timestamp_to_system_time(timestamp: u64) -> SystemTime {
UNIX_EPOCH + std::time::Duration::new(timestamp, 0)
}
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
struct Heal {
enabled: bool,
bitrot: bool,
}
#[derive(Clone)]
pub struct ScannerItem {
path: String,
bucket: String,
prefix: String,
object_name: String,
replication: Option<ReplicationConfiguration>,
pub path: String,
pub bucket: String,
pub prefix: String,
pub object_name: String,
pub replication: Option<ReplicationConfiguration>,
// todo: lifecycle
// typ: fs::Permissions,
heal: Heal,
debug: bool,
pub heal: Heal,
pub debug: bool,
}
impl ScannerItem {
@@ -365,6 +391,43 @@ impl ScannerItem {
pub fn object_path(&self) -> PathBuf {
path_join(&[PathBuf::from(self.prefix.clone()), PathBuf::from(self.object_name.clone())])
}
pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
let obj_infos = self.apply_newer_noncurrent_version_limit(fivs).await?;
if obj_infos.len() >= SCANNER_EXCESS_OBJECT_VERSIONS.load(Ordering::SeqCst).try_into().unwrap() {
// todo
}
let mut cumulative_size = 0;
for obj_info in obj_infos.iter() {
cumulative_size += obj_info.size;
}
if cumulative_size >= SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE.load(Ordering::SeqCst).try_into().unwrap() {
//todo
}
Ok(obj_infos)
}
pub async fn apply_newer_noncurrent_version_limit(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
let done = ScannerMetrics::time(ScannerMetric::ApplyNonCurrent);
let mut object_infos = Vec::new();
for info in fivs.iter() {
object_infos.push(info.to_object_info(&self.bucket, &self.object_path().to_string_lossy(), false));
}
done().await;
Ok(object_infos)
}
pub async fn apply_actions(&self, oi: &ObjectInfo, size_s: &SizeSummary) -> (bool, usize) {
let done = ScannerMetrics::time(ScannerMetric::Ilm);
//todo: lifecycle
done().await;
(false, 0)
}
}
#[derive(Debug, Default)]
@@ -420,7 +483,7 @@ struct FolderScanner {
last_update: SystemTime,
update_current_path: UpdateCurrentPathFn,
skip_heal: AtomicBool,
drive: DiskStore,
drive: LocalDrive,
}
impl FolderScanner {
@@ -951,9 +1014,10 @@ pub fn has_active_rules(config: &ReplicationConfiguration, prefix: &str, recursi
false
}
pub type LocalDrive = Arc<dyn DiskAPI>;
pub async fn scan_data_folder(
disks: &[Option<DiskStore>],
drive: &DiskStore,
drive: LocalDrive,
cache: &DataUsageCache,
get_size_fn: GetSizeFn,
heal_scan_mode: HealScanMode,
@@ -964,6 +1028,11 @@ pub async fn scan_data_folder(
let base_path = drive.to_string();
let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name);
let skip_heal = if *GLOBAL_IsErasure.read().await || cache.info.skip_healing {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
};
let mut s = FolderScanner {
root: base_path,
get_size: get_size_fn,
@@ -978,11 +1047,7 @@ pub async fn scan_data_folder(
update_current_path: update_path,
disks: disks.to_vec(),
disks_quorum: disks.len() / 2,
skip_heal: if *GLOBAL_IsErasure.read().await || cache.info.skip_healing {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
},
skip_heal,
drive: drive.clone(),
};
@@ -1002,7 +1067,7 @@ pub async fn scan_data_folder(
}
s.new_cache
.force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN.try_into().unwrap());
s.new_cache.info.last_update = SystemTime::now();
s.new_cache.info.last_update = Some(SystemTime::now());
s.new_cache.info.next_cycle = cache.info.next_cycle;
close_disk().await;
Ok(s.new_cache)

View File

@@ -1,6 +1,10 @@
use common::last_minute::{AccElem, LastMinuteLatency};
use lazy_static::lazy_static;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::Once;
use std::time::{Duration, UNIX_EPOCH};
use std::{
collections::HashMap,
sync::{
@@ -53,8 +57,75 @@ pub enum ScannerMetric {
Last,
}
static INIT: Once = Once::new();
#[derive(Default)]
pub struct LockedLastMinuteLatency {
cached_sec: AtomicU64,
cached: AccElem,
mu: RwLock<bool>,
latency: LastMinuteLatency,
}
impl Clone for LockedLastMinuteLatency {
fn clone(&self) -> Self {
Self {
cached_sec: AtomicU64::new(0),
cached: self.cached.clone(),
mu: RwLock::new(true),
latency: self.latency.clone(),
}
}
}
impl LockedLastMinuteLatency {
pub fn add(&mut self, value: &Duration) {
self.add_size(value, 0);
}
pub async fn add_size(&mut self, value: &Duration, sz: u64) {
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
INIT.call_once(|| {
self.cached = AccElem::default();
self.cached_sec.store(t, Ordering::SeqCst);
});
let last_t = self.cached_sec.load(Ordering::SeqCst);
if last_t != t
&& self
.cached_sec
.compare_exchange(last_t, t, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let old = self.cached.clone();
self.cached = AccElem::default();
let mut a = AccElem::default();
a.size = old.size;
a.total = old.total;
a.n = old.n;
self.mu.write().await;
self.latency.add_all(t - 1, &a);
}
self.cached.n += 1;
self.cached.total += value.as_secs();
self.cached.size != sz;
}
pub async fn total(&mut self) -> AccElem {
self.mu.read().await;
self.latency.get_total()
}
}
pub type LogFn = Arc<dyn Fn(&HashMap<String, String>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type TimeSizeFn = Arc<dyn Fn(u64) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type TimeFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub struct ScannerMetrics {
operations: Vec<AtomicU32>,
latency: Vec<LockedLastMinuteLatency>,
cycle_info: RwLock<Option<CurrentScannerCycle>>,
current_paths: HashMap<String, String>,
}
@@ -63,23 +134,63 @@ impl ScannerMetrics {
pub fn new() -> Self {
Self {
operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU32::new(0)).collect(),
latency: vec![LockedLastMinuteLatency::default(); ScannerMetric::LastRealtime as usize],
cycle_info: RwLock::new(None),
current_paths: HashMap::new(),
}
}
pub fn log(&mut self, s: ScannerMetric, _paths: &[String], _custom: &HashMap<String, String>, _start_time: SystemTime) {
// let duration = start_time.duration_since(start_time);
self.operations[s.clone() as usize].fetch_add(1, Ordering::SeqCst);
// Dodo
}
pub async fn set_cycle(&mut self, c: Option<CurrentScannerCycle>) {
*self.cycle_info.write().await = c;
}
pub fn log(s: ScannerMetric) -> LogFn {
let start = SystemTime::now();
let s_clone = s as usize;
Arc::new(move |custom: &HashMap<String, String>| {
Box::pin(async move {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
let mut sm_w = globalScannerMetrics.write().await;
sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst);
if s_clone < ScannerMetric::LastRealtime as usize {
sm_w.latency[s_clone].add(&duration);
}
})
})
}
pub fn time_size(s: ScannerMetric) -> TimeSizeFn {
let start = SystemTime::now();
let s_clone = s as usize;
Arc::new(move |sz: u64| {
Box::pin(async move {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
let mut sm_w = globalScannerMetrics.write().await;
sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst);
if s_clone < ScannerMetric::LastRealtime as usize {
sm_w.latency[s_clone].add_size(&duration, sz);
}
})
})
}
pub fn time(s: ScannerMetric) -> TimeFn {
let start = SystemTime::now();
let s_clone = s as usize;
Arc::new(move || {
Box::pin(async move {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
let mut sm_w = globalScannerMetrics.write().await;
sm_w.operations[s_clone].fetch_add(1, Ordering::SeqCst);
if s_clone < ScannerMetric::LastRealtime as usize {
sm_w.latency[s_clone].add(&duration);
}
})
})
}
}
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let disk_1 = disk.to_string();
let disk_2 = disk.to_string();

View File

@@ -34,13 +34,13 @@ lazy_static! {
// - replica failed count
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketTargetUsageInfo {
replication_pending_size: u64,
replication_failed_size: u64,
replicated_size: u64,
replica_size: u64,
replication_pending_count: u64,
replication_failed_count: u64,
replicated_count: u64,
pub replication_pending_size: u64,
pub replication_failed_size: u64,
pub replicated_size: u64,
pub replica_size: u64,
pub replication_pending_count: u64,
pub replication_failed_count: u64,
pub replicated_count: u64,
}
// BucketUsageInfo - bucket usage info provides
@@ -49,82 +49,63 @@ pub struct BucketTargetUsageInfo {
// - object size histogram per bucket
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketUsageInfo {
size: u64,
pub size: u64,
// Following five fields suffixed with V1 are here for backward compatibility
// Total Size for objects that have not yet been replicated
replication_pending_size_v1: u64,
pub replication_pending_size_v1: u64,
// Total size for objects that have witness one or more failures and will be retried
replication_failed_size_v1: u64,
pub replication_failed_size_v1: u64,
// Total size for objects that have been replicated to destination
replicated_size_v1: u64,
pub replicated_size_v1: u64,
// Total number of objects pending replication
replication_pending_count_v1: u64,
pub replication_pending_count_v1: u64,
// Total number of objects that failed replication
replication_failed_count_v1: u64,
pub replication_failed_count_v1: u64,
objects_count: u64,
object_size_histogram: HashMap<String, u64>,
object_versions_histogram: HashMap<String, u64>,
versions_count: u64,
delete_markers_count: u64,
replica_size: u64,
replica_count: u64,
replication_info: HashMap<String, BucketTargetUsageInfo>,
pub objects_count: u64,
pub object_size_histogram: HashMap<String, u64>,
pub object_versions_histogram: HashMap<String, u64>,
pub versions_count: u64,
pub delete_markers_count: u64,
pub replica_size: u64,
pub replica_count: u64,
pub replication_info: HashMap<String, BucketTargetUsageInfo>,
}
// DataUsageInfo represents data usage stats of the underlying Object API
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct DataUsageInfo {
total_capacity: u64,
total_used_capacity: u64,
total_free_capacity: u64,
pub total_capacity: u64,
pub total_used_capacity: u64,
pub total_free_capacity: u64,
// LastUpdate is the timestamp of when the data usage info was last updated.
// This does not indicate a full scan.
last_update: SystemTime,
pub last_update: Option<SystemTime>,
// Objects total count across all buckets
objects_total_count: u64,
pub objects_total_count: u64,
// Versions total count across all buckets
versions_total_count: u64,
pub versions_total_count: u64,
// Delete markers total count across all buckets
delete_markers_total_count: u64,
pub delete_markers_total_count: u64,
// Objects total size across all buckets
objects_total_size: u64,
replication_info: HashMap<String, BucketTargetUsageInfo>,
pub objects_total_size: u64,
pub replication_info: HashMap<String, BucketTargetUsageInfo>,
// Total number of buckets in this cluster
buckets_count: u64,
pub buckets_count: u64,
// Buckets usage info provides following information across all buckets
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
buckets_usage: HashMap<String, BucketUsageInfo>,
pub buckets_usage: HashMap<String, BucketUsageInfo>,
// Deprecated kept here for backward compatibility reasons.
bucket_sizes: HashMap<String, u64>,
pub bucket_sizes: HashMap<String, u64>,
// Todo: TierStats
// TierStats contains per-tier stats of all configured remote tiers
}
impl Default for DataUsageInfo {
fn default() -> Self {
Self {
total_capacity: Default::default(),
total_used_capacity: Default::default(),
total_free_capacity: Default::default(),
last_update: SystemTime::now(),
objects_total_count: Default::default(),
versions_total_count: Default::default(),
delete_markers_total_count: Default::default(),
objects_total_size: Default::default(),
replication_info: Default::default(),
buckets_count: Default::default(),
buckets_usage: Default::default(),
bucket_sizes: Default::default(),
}
}
}
pub async fn store_data_usage_in_backend(mut rx: Receiver<DataUsageInfo>) {
let layer = new_object_layer_fn();
let lock = layer.read().await;

View File

@@ -4,7 +4,8 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::error::{Error, Result};
use crate::new_object_layer_fn;
use crate::set_disk::SetDisks;
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions};
use crate::store_api::{BucketInfo, HTTPRangeSpec, ObjectIO, ObjectOptions};
use bytes::Bytes;
use bytesize::ByteSize;
use http::HeaderMap;
use path_clean::PathClean;
@@ -23,7 +24,7 @@ use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use super::data_scanner::{SizeSummary, DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS};
use super::data_usage::DATA_USAGE_ROOT;
use super::data_usage::{BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, DATA_USAGE_ROOT};
// DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals
pub const DATA_USAGE_BUCKET_LEN: usize = 11;
@@ -152,6 +153,22 @@ impl SizeHistogram {
}
}
}
pub fn to_map(&self) -> HashMap<String, u64> {
let mut res = HashMap::new();
let mut spl_count = 0;
for (count, oh) in self.0.iter().zip(OBJECTS_HISTOGRAM_INTERVALS.iter()) {
if ByteSize::kib(1).as_u64() == oh.start && oh.end == ByteSize::mib(1).as_u64() - 1 {
res.insert(oh.name.to_string(), spl_count);
} else if ByteSize::kib(1).as_u64() <= oh.start && oh.end <= ByteSize::mib(1).as_u64() - 1 {
spl_count += count;
res.insert(oh.name.to_string(), *count);
} else {
res.insert(oh.name.to_string(), *count);
}
}
res
}
}
// versionsHistogram is a histogram of number of versions in an object.
@@ -173,6 +190,14 @@ impl VersionsHistogram {
}
}
}
pub fn to_map(&self) -> HashMap<String, u64> {
let mut res = HashMap::new();
for (count, ov) in self.0.iter().zip(OBJECTS_VERSION_COUNT_INTERVALS.iter()) {
res.insert(ov.name.to_string(), *count);
}
res
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -319,11 +344,11 @@ pub struct DataUsageEntryInfo {
pub entry: DataUsageEntry,
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct DataUsageCacheInfo {
pub name: String,
pub next_cycle: u32,
pub last_update: SystemTime,
pub last_update: Option<SystemTime>,
pub skip_healing: bool,
// todo: life_cycle
// pub life_cycle:
@@ -333,18 +358,18 @@ pub struct DataUsageCacheInfo {
pub replication: Option<ReplicationConfiguration>,
}
impl Default for DataUsageCacheInfo {
fn default() -> Self {
Self {
name: Default::default(),
next_cycle: Default::default(),
last_update: SystemTime::now(),
skip_healing: Default::default(),
updates: Default::default(),
replication: Default::default(),
}
}
}
// impl Default for DataUsageCacheInfo {
// fn default() -> Self {
// Self {
// name: Default::default(),
// next_cycle: Default::default(),
// last_update: SystemTime::now(),
// skip_healing: Default::default(),
// updates: Default::default(),
// replication: Default::default(),
// }
// }
// }
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct DataUsageCache {
@@ -410,8 +435,11 @@ impl DataUsageCache {
},
}
retries += 1;
let mut rng = rand::thread_rng();
sleep(Duration::from_millis(rng.gen_range(0..1_000))).await;
let dur = {
let mut rng = rand::thread_rng();
rng.gen_range(0..1_000)
};
sleep(Duration::from_millis(dur)).await;
}
Ok(d)
}
@@ -655,6 +683,100 @@ impl DataUsageCache {
n
}
pub fn merge(&mut self, o: &DataUsageCache) {
let mut existing_root = self.root();
let other_root = o.root();
if existing_root.is_none() && other_root.is_none() {
return;
}
if other_root.is_none() {
return;
}
if existing_root.is_none() {
*self = o.clone();
return;
}
if o.info.last_update.gt(&self.info.last_update) {
self.info.last_update = o.info.last_update;
}
existing_root.as_mut().unwrap().merge(other_root.as_ref().unwrap());
self.cache.insert(hash_path(&self.info.name).key(), existing_root.unwrap());
let e_hash = self.root_hash();
for key in other_root.as_ref().unwrap().children.iter() {
let entry = &o.cache[key];
let flat = o.flatten(entry);
let mut existing = self.cache[key].clone();
existing.merge(&flat);
self.replace_hashed(&DataUsageHash(key.clone()), &Some(e_hash.clone()), &existing);
}
}
pub fn root_hash(&self) -> DataUsageHash {
hash_path(&self.info.name)
}
pub fn root(&self) -> Option<DataUsageEntry> {
self.find(&self.info.name)
}
pub fn dui(&self, path: &str, buckets: &[BucketInfo]) -> DataUsageInfo {
let e = match self.find(path) {
Some(e) => e,
None => return DataUsageInfo::default(),
};
let flat = self.flatten(&e);
let dui = DataUsageInfo {
last_update: self.info.last_update,
objects_total_count: flat.objects as u64,
versions_total_count: flat.versions as u64,
delete_markers_total_count: flat.delete_markers as u64,
objects_total_size: flat.size as u64,
buckets_count: e.children.len() as u64,
buckets_usage: self.buckets_usage_info(buckets),
..Default::default()
};
dui
}
pub fn buckets_usage_info(&self, buckets: &[BucketInfo]) -> HashMap<String, BucketUsageInfo> {
let mut dst = HashMap::new();
for bucket in buckets.iter() {
let e = match self.find(&bucket.name) {
Some(e) => e,
None => continue,
};
let flat = self.flatten(&e);
let mut bui = BucketUsageInfo {
size: flat.size as u64,
versions_count: flat.versions as u64,
objects_count: flat.objects as u64,
delete_markers_count: flat.delete_markers as u64,
object_size_histogram: flat.obj_sizes.to_map(),
object_versions_histogram: flat.obj_versions.to_map(),
..Default::default()
};
if let Some(rs) = &flat.replication_stats {
bui.replica_size = rs.replica_size;
bui.replica_count = rs.replica_count;
for (arn, stat) in rs.targets.iter() {
bui.replication_info.insert(arn.clone(), BucketTargetUsageInfo {
replication_pending_size: stat.pending_size,
replicated_size: stat.replicated_size,
replication_failed_size: stat.failed_size,
replication_pending_count: stat.pending_count,
replication_failed_count: stat.failed_count,
replicated_count: stat.replicated_count,
..Default::default()
});
}
}
dst.insert(bucket.name.clone(), bui);
}
dst
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();

View File

@@ -1 +1,2 @@
pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage";
pub const ERR_SKIP_FILE: &str = "skip this file";

View File

@@ -10,7 +10,7 @@ use crate::global::{
is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION,
DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES,
};
use crate::heal::data_usage::DataUsageInfo;
use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT};
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
use crate::heal::heal_ops::HealObjectFn;
use crate::new_object_layer_fn;
@@ -44,19 +44,21 @@ use http::HeaderMap;
use lazy_static::lazy_static;
use rand::Rng;
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
use tokio::time::interval;
use std::cmp::Ordering;
use std::slice::Iter;
use std::time::SystemTime;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use time::OffsetDateTime;
use tokio::fs;
use tokio::{fs, select};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, RwLock, Semaphore};
use tokio::sync::{broadcast, mpsc, RwLock, Semaphore};
use crate::heal::data_usage_cache::DataUsageCache;
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
use tracing::{debug, info, warn};
use uuid::Uuid;
@@ -509,11 +511,16 @@ impl ECStore {
total_results += pool.disk_set.len();
});
let mut results = Arc::new(RwLock::new(vec![DataUsageCache::default(); total_results]));
let (cancel, _) = broadcast::channel(100);
let first_err = Arc::new(RwLock::new(None));
let mut futures = Vec::new();
for pool in self.pools.iter() {
for set in pool.disk_set.iter() {
let index = result_index;
let results_clone = results.clone();
let first_err_clone = first_err.clone();
let cancel_clone = cancel.clone();
let all_buckets_clone = all_buckets.clone();
futures.push(async move {
let (tx, mut rx) = mpsc::channel(100);
let task = tokio::spawn(async move {
@@ -528,12 +535,61 @@ impl ECStore {
}
}
});
if let Err(err) = set.ns_scanner(&all_buckets_clone, want_cycle.try_into().unwrap(), tx, heal_scan_mode).await {
let mut f_w = first_err_clone.write().await;
if f_w.is_none() {
*f_w = Some(err);
}
let _ = cancel_clone.send(true);
return ;
}
let _ = task.await;
});
result_index += 1;
}
}
let (update_closer_tx, mut update_close_rx) = mpsc::channel(10);
let mut ctx_clone = cancel.subscribe();
let all_buckets_clone = all_buckets.clone();
let task = tokio::spawn(async move {
let mut last_update = None;
let mut interval = interval(Duration::from_secs(30));
let all_merged = Arc::new(RwLock::new(DataUsageCache::default()));
loop {
select! {
_ = ctx_clone.recv() => {
return;
}
_ = update_close_rx.recv() => {
last_update = match tokio::spawn(update_scan(all_merged.clone(), results.clone(), last_update.clone(), all_buckets_clone.clone(), updates.clone())).await {
Ok(v) => v,
Err(_) => return,
};
return;
}
_ = interval.tick() => {
last_update = match tokio::spawn(update_scan(all_merged.clone(), results.clone(), last_update.clone(), all_buckets_clone.clone(), updates.clone())).await {
Ok(v) => v,
Err(_) => return,
};
}
}
}
});
let _ = join_all(futures).await;
let mut ctx_closer = cancel.subscribe();
select! {
_ = update_closer_tx.send(true) => {
}
_ = ctx_closer.recv() => {
}
}
let _ = task.await;
if let Some(err) = first_err.read().await.as_ref() {
return Err(err.clone());
}
Ok(())
}
@@ -632,6 +688,28 @@ impl ECStore {
}
}
async fn update_scan(all_merged: Arc<RwLock<DataUsageCache>>, results: Arc<RwLock<Vec<DataUsageCache>>>, last_update: Option<SystemTime>, all_buckets: Vec<BucketInfo>, updates: Sender<DataUsageInfo>) -> Option<SystemTime> {
let mut w = all_merged.write().await;
*w = DataUsageCache {
info: DataUsageCacheInfo {
name: DATA_USAGE_ROOT.to_string(),
..Default::default()
},
..Default::default()
};
for info in results.read().await.iter() {
if info.info.last_update.is_none() {
return last_update;
}
w.merge(info);
}
if w.info.last_update > last_update && w.root().is_none() {
let _ = updates.send(w.dui(&w.info.name, &all_buckets)).await;
return w.info.last_update;
}
last_update
}
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
let disk_path = match fs::canonicalize(disk_path).await {
Ok(disk_path) => disk_path,