From 9533e312b1e7d64363554a2615a690a3cd437e8f Mon Sep 17 00:00:00 2001 From: mujunxiang <1948535941@qq.com> Date: Tue, 3 Dec 2024 17:05:23 +0800 Subject: [PATCH] scanner(2) Signed-off-by: mujunxiang <1948535941@qq.com> --- common/lock/src/lrwmutex.rs | 5 +- ecstore/src/heal/data_scanner.rs | 80 +++++++++++++------------ ecstore/src/heal/data_scanner_metric.rs | 1 + ecstore/src/set_disk.rs | 10 +++- 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/common/lock/src/lrwmutex.rs b/common/lock/src/lrwmutex.rs index fa65f1a7..9bc3415e 100644 --- a/common/lock/src/lrwmutex.rs +++ b/common/lock/src/lrwmutex.rs @@ -2,6 +2,7 @@ use std::time::{Duration, Instant}; use rand::Rng; use tokio::{sync::RwLock, time::sleep}; +use tracing::info; #[derive(Debug, Default)] pub struct LRWMutex { @@ -87,14 +88,14 @@ impl LRWMutex { pub async fn un_lock(&self) { let is_write = true; if !self.unlock(is_write).await { - panic!("Trying to un_lock() while no Lock() is active") + info!("Trying to un_lock() while no Lock() is active") } } pub async fn un_r_lock(&self) { let is_write = false; if !self.unlock(is_write).await { - panic!("Trying to un_r_lock() while no Lock() is active") + info!("Trying to un_r_lock() while no Lock() is active") } } diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 16ff8cc0..721a09f8 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -12,7 +12,6 @@ use std::{ time::{Duration, SystemTime}, }; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use chrono::{DateTime, Utc}; use lazy_static::lazy_static; use rand::Rng; @@ -113,33 +112,15 @@ async fn run_data_scanner() { return; }; - let mut cycle_info = CurrentScannerCycle::default(); - - let mut buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) + let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) .await .map_or(Vec::new(), |buf| buf); - match buf.len().cmp(&8) { - std::cmp::Ordering::Less => {} - std::cmp::Ordering::Equal => { - cycle_info.next = match Cursor::new(buf).read_u64::() { - Ok(buf) => buf, - Err(_) => { - error!("can not decode DATA_USAGE_BLOOM_NAME_PATH"); - return; - } - }; - } - std::cmp::Ordering::Greater => { - cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::() { - Ok(buf) => buf, - Err(_) => { - error!("can not decode DATA_USAGE_BLOOM_NAME_PATH"); - return; - } - }; - let _ = cycle_info.unmarshal_msg(&buf.split_off(8)); - } - } + + let mut buf_t = Deserializer::new(Cursor::new(buf)); + let mut cycle_info: CurrentScannerCycle = match Deserialize::deserialize(&mut buf_t) { + Ok(info) => info, + Err(_) => CurrentScannerCycle::default(), + }; loop { let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); @@ -176,18 +157,12 @@ async fn run_data_scanner() { cycle_info.current = 0; cycle_info.cycle_completed.push(Utc::now()); if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { - cycle_info.cycle_completed = cycle_info.cycle_completed - [cycle_info.cycle_completed.len() - DATA_USAGE_UPDATE_DIR_CYCLES as usize..] - .to_vec(); + let _ = cycle_info.cycle_completed.remove(0); } globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - let mut tmp = Vec::new(); - tmp.write_u64::(cycle_info.next).unwrap(); - if let Ok(data) = cycle_info.marshal_msg(&tmp) { - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &data).await; - } else { - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await; - } + let mut wr = Vec::new(); + cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); + let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &wr).await; } Err(err) => { info!("ns_scanner failed: {:?}", err); @@ -261,7 +236,7 @@ async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot HEAL_NORMAL_SCAN } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct CurrentScannerCycle { pub current: u64, pub next: u64, @@ -1081,3 +1056,34 @@ pub async fn scan_data_folder( } // pub fn eval_action_from_lifecycle(lc: &BucketLifecycleConfiguration, lr: &ObjectLockConfiguration, rcfg: &ReplicationConfiguration, obj: &ObjectInfo) + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use chrono::Utc; + use rmp_serde::{Deserializer, Serializer}; + use serde::{Deserialize, Serialize}; + + use super::CurrentScannerCycle; + + #[test] + fn test_current_cycle() { + let cycle_info = CurrentScannerCycle { + current: 0, + next: 1, + started: Utc::now(), + cycle_completed: vec![Utc::now(), Utc::now()], + }; + + println!("{cycle_info:?}"); + + let mut wr = Vec::new(); + cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); + + let mut buf_t = Deserializer::new(Cursor::new(wr)); + let c: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap(); + + println!("{c:?}"); + } +} diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index 77b97c1d..d96ad89c 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -218,6 +218,7 @@ impl ScannerMetrics { pub async fn report(&self) -> M_ScannerMetrics { let mut m = M_ScannerMetrics::default(); if let Some(cycle) = self.get_cycle().await { + info!("cycle: {cycle:?}"); m.current_cycle = cycle.current; m.cycles_completed_at = cycle.cycle_completed; m.current_started = cycle.started; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 31c23def..152f2063 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -2768,9 +2768,9 @@ impl SetDisks { let buckets_results_tx_clone = buckets_results_tx.clone(); futures.push(async move { loop { - match bucket_rx_clone.write().await.recv().await { - None => return, - Some(bucket_info) => { + match bucket_rx_clone.write().await.try_recv() { + Err(_) => return, + Ok(bucket_info) => { let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME); let mut cache = match DataUsageCache::load(self, &cache_name.to_string_lossy()).await { Ok(cache) => cache, @@ -2841,10 +2841,14 @@ impl SetDisks { let _ = cache.save(&cache_name.to_string_lossy()).await; } } + info!("continue scanner"); } }); } + info!("ns_scanner start"); let _ = join_all(futures).await; + drop(buckets_results_tx); + info!("1"); let _ = task.await; info!("ns_scanner completed"); Ok(())