scanner(2)

Signed-off-by: mujunxiang <1948535941@qq.com>
This commit is contained in:
mujunxiang
2024-12-03 17:05:23 +08:00
committed by weisd
parent a7a2ae2781
commit 9533e312b1
4 changed files with 54 additions and 42 deletions

View File

@@ -2,6 +2,7 @@ use std::time::{Duration, Instant};
use rand::Rng;
use tokio::{sync::RwLock, time::sleep};
use tracing::info;
#[derive(Debug, Default)]
pub struct LRWMutex {
@@ -87,14 +88,14 @@ impl LRWMutex {
pub async fn un_lock(&self) {
let is_write = true;
if !self.unlock(is_write).await {
panic!("Trying to un_lock() while no Lock() is active")
info!("Trying to un_lock() while no Lock() is active")
}
}
pub async fn un_r_lock(&self) {
let is_write = false;
if !self.unlock(is_write).await {
panic!("Trying to un_r_lock() while no Lock() is active")
info!("Trying to un_r_lock() while no Lock() is active")
}
}

View File

@@ -12,7 +12,6 @@ use std::{
time::{Duration, SystemTime},
};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use rand::Rng;
@@ -113,33 +112,15 @@ async fn run_data_scanner() {
return;
};
let mut cycle_info = CurrentScannerCycle::default();
let mut buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
.await
.map_or(Vec::new(), |buf| buf);
match buf.len().cmp(&8) {
std::cmp::Ordering::Less => {}
std::cmp::Ordering::Equal => {
cycle_info.next = match Cursor::new(buf).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
}
std::cmp::Ordering::Greater => {
cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
let _ = cycle_info.unmarshal_msg(&buf.split_off(8));
}
}
let mut buf_t = Deserializer::new(Cursor::new(buf));
let mut cycle_info: CurrentScannerCycle = match Deserialize::deserialize(&mut buf_t) {
Ok(info) => info,
Err(_) => CurrentScannerCycle::default(),
};
loop {
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
@@ -176,18 +157,12 @@ async fn run_data_scanner() {
cycle_info.current = 0;
cycle_info.cycle_completed.push(Utc::now());
if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize {
cycle_info.cycle_completed = cycle_info.cycle_completed
[cycle_info.cycle_completed.len() - DATA_USAGE_UPDATE_DIR_CYCLES as usize..]
.to_vec();
let _ = cycle_info.cycle_completed.remove(0);
}
globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await;
let mut tmp = Vec::new();
tmp.write_u64::<LittleEndian>(cycle_info.next).unwrap();
if let Ok(data) = cycle_info.marshal_msg(&tmp) {
let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &data).await;
} else {
let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await;
}
let mut wr = Vec::new();
cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap();
let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &wr).await;
}
Err(err) => {
info!("ns_scanner failed: {:?}", err);
@@ -261,7 +236,7 @@ async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot
HEAL_NORMAL_SCAN
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CurrentScannerCycle {
pub current: u64,
pub next: u64,
@@ -1081,3 +1056,34 @@ pub async fn scan_data_folder(
}
// pub fn eval_action_from_lifecycle(lc: &BucketLifecycleConfiguration, lr: &ObjectLockConfiguration, rcfg: &ReplicationConfiguration obj: &ObjectInfo)
#[cfg(test)]
mod tests {
use std::io::Cursor;
use chrono::Utc;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use super::CurrentScannerCycle;
#[test]
fn test_current_cycle() {
let cycle_info = CurrentScannerCycle {
current: 0,
next: 1,
started: Utc::now(),
cycle_completed: vec![Utc::now(), Utc::now()],
};
println!("{cycle_info:?}");
let mut wr = Vec::new();
cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap();
let mut buf_t = Deserializer::new(Cursor::new(wr));
let c: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap();
println!("{c:?}");
}
}

View File

@@ -218,6 +218,7 @@ impl ScannerMetrics {
pub async fn report(&self) -> M_ScannerMetrics {
let mut m = M_ScannerMetrics::default();
if let Some(cycle) = self.get_cycle().await {
info!("cycle: {cycle:?}");
m.current_cycle = cycle.current;
m.cycles_completed_at = cycle.cycle_completed;
m.current_started = cycle.started;

View File

@@ -2768,9 +2768,9 @@ impl SetDisks {
let buckets_results_tx_clone = buckets_results_tx.clone();
futures.push(async move {
loop {
match bucket_rx_clone.write().await.recv().await {
None => return,
Some(bucket_info) => {
match bucket_rx_clone.write().await.try_recv() {
Err(_) => return,
Ok(bucket_info) => {
let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME);
let mut cache = match DataUsageCache::load(self, &cache_name.to_string_lossy()).await {
Ok(cache) => cache,
@@ -2841,10 +2841,14 @@ impl SetDisks {
let _ = cache.save(&cache_name.to_string_lossy()).await;
}
}
info!("continue scanner");
}
});
}
info!("ns_scanner start");
let _ = join_all(futures).await;
drop(buckets_results_tx);
info!("1");
let _ = task.await;
info!("ns_scanner completed");
Ok(())