From fe50ccd39f53b697908cc4e5ddac2fdfdad49f3d Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Mon, 11 Nov 2024 17:33:25 +0800 Subject: [PATCH] tmp --- ecstore/Cargo.toml | 1 + ecstore/src/config/heal.rs | 65 ++++++ ecstore/src/config/mod.rs | 1 + ecstore/src/disk/error.rs | 4 + ecstore/src/disk/format.rs | 12 +- ecstore/src/disk/mod.rs | 6 +- ecstore/src/endpoints.rs | 10 + ecstore/src/erasure.rs | 45 ++++ ecstore/src/global.rs | 6 +- ecstore/src/heal/background_heal_ops.rs | 101 ++++++-- ecstore/src/heal/data_scanner.rs | 293 ++++++++++++++++++++++++ ecstore/src/heal/data_scanner_metric.rs | 74 ++++++ ecstore/src/heal/data_usage.rs | 158 +++++++++++++ ecstore/src/heal/data_usage_cache.rs | 234 +++++++++++++++++++ ecstore/src/heal/heal_commands.rs | 15 +- ecstore/src/heal/heal_ops.rs | 63 ++--- ecstore/src/heal/mod.rs | 4 + ecstore/src/lib.rs | 2 +- ecstore/src/sets.rs | 222 +++++++++++++++++- ecstore/src/store.rs | 134 +++++++++-- ecstore/src/store_api.rs | 22 +- ecstore/src/store_init.rs | 32 ++- ecstore/src/utils/bool_flag.rs | 15 ++ ecstore/src/utils/mod.rs | 1 + rustfs/src/main.rs | 5 + 25 files changed, 1413 insertions(+), 112 deletions(-) create mode 100644 ecstore/src/config/heal.rs create mode 100644 ecstore/src/heal/data_scanner.rs create mode 100644 ecstore/src/heal/data_scanner_metric.rs create mode 100644 ecstore/src/heal/data_usage.rs create mode 100644 ecstore/src/heal/data_usage_cache.rs create mode 100644 ecstore/src/utils/bool_flag.rs diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 79370b32..1208304d 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -41,6 +41,7 @@ protos.workspace = true rmp-serde = "1.3.0" tokio-util = { version = "0.7.12", features = ["io", "compat"] } crc32fast = "1.4.2" +rand = "0.8.5" siphasher = "1.0.1" base64-simd = "0.8.0" sha2 = { version = "0.11.0-pre.4" } diff --git a/ecstore/src/config/heal.rs b/ecstore/src/config/heal.rs new file mode 100644 index 00000000..20e910a6 --- /dev/null +++ b/ecstore/src/config/heal.rs @@ -0,0 +1,65 @@ +use std::time::Duration; + +use crate::{ + error::{Error, Result}, + utils::bool_flag::parse_bool, +}; + +#[derive(Debug, Default)] +pub struct Config { + pub bitrot: String, + pub sleep: Duration, + pub io_count: usize, + pub drive_workers: usize, + pub cache: Duration, +} + +impl Config { + pub fn bitrot_scan_cycle(&self) -> Duration { + self.cache + } + + pub fn get_workers(&self) -> usize { + self.drive_workers + } + + pub fn update(&mut self, nopts: &Config) { + self.bitrot = nopts.bitrot.clone(); + self.io_count = nopts.io_count; + self.sleep = nopts.sleep; + self.drive_workers = nopts.drive_workers; + } +} + +const RUSTFS_BITROT_CYCLE_IN_MONTHS: u64 = 1; + +fn parse_bitrot_config(s: &str) -> Result { + match parse_bool(s) { + Ok(enabled) => { + if enabled { + return Ok(Duration::from_secs_f64(0.0)); + } else { + return Ok(Duration::from_secs_f64(-1.0)); + } + } + Err(_) => { + if !s.ends_with("m") { + return Err(Error::from_string("unknown format")); + } + + match s.trim_end_matches('m').parse::() { + Ok(months) => { + if months < RUSTFS_BITROT_CYCLE_IN_MONTHS { + return Err(Error::from_string(format!( + "minimum bitrot cycle is {} month(s)", + RUSTFS_BITROT_CYCLE_IN_MONTHS + ))); + } + + Ok(Duration::from_secs(months * 30 * 24 * 60)) + } + Err(err) => Err(err.into()), + } + } + } +} diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs index 7da3f846..030d15a1 100644 --- a/ecstore/src/config/mod.rs +++ b/ecstore/src/config/mod.rs @@ -1,5 +1,6 @@ pub mod common; pub mod error; +pub mod heal; pub mod storageclass; use crate::error::Result; diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index fd20a91d..f114210c 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -106,6 +106,9 @@ pub enum DiskError { #[error("part missing or corrupt")] PartMissingOrCorrupt, + + #[error("No healing is required")] + NoHealRequired, } impl DiskError { @@ -210,6 +213,7 @@ pub fn clone_disk_err(e: &DiskError) -> Error { DiskError::MoreData => Error::new(DiskError::MoreData), DiskError::OutdatedXLMeta => Error::new(DiskError::OutdatedXLMeta), DiskError::PartMissingOrCorrupt => Error::new(DiskError::PartMissingOrCorrupt), + DiskError::NoHealRequired => Error::new(DiskError::NoHealRequired), } } diff --git a/ecstore/src/disk/format.rs b/ecstore/src/disk/format.rs index f2a9775c..602ea629 100644 --- a/ecstore/src/disk/format.rs +++ b/ecstore/src/disk/format.rs @@ -1,4 +1,4 @@ -use super::error::DiskError; +use super::{error::DiskError, DiskInfo}; use crate::error::{Error, Result}; use serde::{Deserialize, Serialize}; use serde_json::Error as JsonError; @@ -31,7 +31,7 @@ pub enum FormatBackend { /// /// The V3 format to support "large bucket" support where a bucket /// can span multiple erasure sets. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FormatErasureV3 { /// Version of 'xl' format. pub version: FormatErasureVersion, @@ -88,7 +88,7 @@ pub enum DistributionAlgoVersion { /// /// Ideally we will never have a situation where we will have to change the /// fields of this struct and deal with related migration. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FormatV3 { /// Version of the format config. pub version: FormatMetaVersion, @@ -103,8 +103,8 @@ pub struct FormatV3 { pub erasure: FormatErasureV3, // /// DiskInfo is an extended type which returns current // /// disk usage per path. - // #[serde(skip)] - // pub disk_info: Option, + #[serde(skip)] + pub disk_info: Option, } impl TryFrom<&[u8]> for FormatV3 { @@ -146,7 +146,7 @@ impl FormatV3 { format, id: Uuid::new_v4(), erasure, - // disk_info: None, + disk_info: None, } } diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index e5754305..3a59fa25 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -467,7 +467,7 @@ pub struct DiskInfoOptions { pub noop: bool, } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct DiskInfo { pub total: u64, pub free: u64, @@ -489,7 +489,7 @@ pub struct DiskInfo { pub error: String, } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct DiskMetrics { api_calls: HashMap, total_waiting: u32, @@ -835,7 +835,7 @@ impl MetaCacheEntries { } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct DiskOption { pub cleanup: bool, pub health_check: bool, diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index c5ff9cbf..0329f88f 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -109,6 +109,16 @@ impl Endpoints { pub fn into_inner(self) -> Vec { self.0 } + + // GetString - returns endpoint string of i-th endpoint (0-based), + // and empty string for invalid indexes. + pub fn get_string(&self, i: usize) -> String { + if i < 0 || i >= self.0.len() { + return "".to_string(); + } + + self.0[i].to_string() + } } #[derive(Debug)] diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index f0983aac..769995d6 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -19,6 +19,7 @@ use uuid::Uuid; // use crate::chunk_stream::ChunkedStream; use crate::disk::error::DiskError; +#[derive(Default)] pub struct Erasure { data_shards: usize, parity_shards: usize, @@ -417,6 +418,50 @@ impl Erasure { till_offset } + + pub async fn heal(&self, writers: &mut [Option], readers: Vec>, total_length: usize, prefer: &[bool]) -> Result<()> { + if writers.len() != self.parity_shards + self.data_shards { + return Err(Error::from_string("invalid argument")); + } + let mut reader = ShardReader::new(readers, self, 0, total_length); + + let start_block = 0; + let mut end_block = total_length / self.block_size; + if total_length % self.block_size != 0 { + end_block += 1; + } + + let mut bytes_writed = 0; + + let mut errs = Vec::new(); + for _ in start_block..=end_block { + let mut bufs = reader.read().await?; + + if self.parity_shards > 0 { + self.encoder.as_ref().unwrap().reconstruct(&mut bufs)?; + } + + let shards = bufs.into_iter().filter_map(|x| x).collect::>(); + if shards.len() != self.parity_shards + self.data_shards { + return Err(Error::from_string("can not reconstruct data")); + } + + for (i, w) in writers.iter_mut().enumerate() { + if w.is_none() { + continue; + } + match w.as_mut().unwrap().write(shards[i].as_ref()).await { + Ok(_) => {}, + Err(e) => errs.push(e), + } + } + } + if !errs.is_empty() { + return Err(errs[0].clone()); + } + + Ok(()) + } } #[async_trait::async_trait] diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index 16ac35be..a602fc72 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -4,9 +4,7 @@ use tokio::sync::RwLock; use uuid::Uuid; use crate::{ - disk::DiskStore, - endpoints::{EndpointServerPools, PoolEndpoints, SetupType}, - store::ECStore, + disk::DiskStore, endpoints::{EndpointServerPools, PoolEndpoints, SetupType}, heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState}, store::ECStore }; pub const DISK_ASSUME_UNKNOWN_SIZE: u64 = 1 << 30; @@ -24,6 +22,8 @@ lazy_static! { pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); pub static ref GLOBAL_Endpoints: RwLock = RwLock::new(EndpointServerPools(Vec::new())); pub static ref GLOBAL_RootDiskThreshold: RwLock = RwLock::new(0); + pub static ref GLOBAL_BackgroundHealRoutine: Arc> = HealRoutine::new(); + pub static ref GLOBAL_BackgroundHealState: Arc> = AllHealState::new(false); static ref globalDeploymentIDPtr: RwLock = RwLock::new(Uuid::nil()); } diff --git a/ecstore/src/heal/background_heal_ops.rs b/ecstore/src/heal/background_heal_ops.rs index 30f9e056..f0de4732 100644 --- a/ecstore/src/heal/background_heal_ops.rs +++ b/ecstore/src/heal/background_heal_ops.rs @@ -1,14 +1,16 @@ -use std::sync::Arc; +use std::{env, sync::Arc}; use tokio::{ select, sync::{ broadcast::Receiver as B_Receiver, - mpsc::{self, Receiver, Sender}, + mpsc::{self, Receiver, Sender}, RwLock, }, }; -use crate::{error::Error, heal::heal_ops::NOP_HEAL, utils::path::SLASH_SEPARATOR}; +use crate::{ + disk::error::DiskError, error::{Error, Result}, heal::heal_ops::NOP_HEAL, new_object_layer_fn, store_api::StorageAPI, utils::path::SLASH_SEPARATOR +}; use super::{ heal_commands::{HealOpts, HealResultItem}, @@ -21,8 +23,8 @@ pub struct HealTask { pub object: String, pub version_id: String, pub opts: HealOpts, - pub resp_tx: Arc>, - pub resp_rx: Arc>, + pub resp_tx: Option>>, + pub resp_rx: Option>>, } impl HealTask { @@ -33,15 +35,15 @@ impl HealTask { object: object.to_string(), version_id: version_id.to_string(), opts: opts.clone(), - resp_tx: tx.into(), - resp_rx: rx.into(), + resp_tx: Some(tx.into()), + resp_rx: Some(rx.into()), } } } pub struct HealResult { pub result: HealResultItem, - err: Error, + err: Option, } pub struct HealRoutine { @@ -51,18 +53,78 @@ pub struct HealRoutine { } impl HealRoutine { - pub async fn add_worker(&mut self, mut ctx: B_Receiver, bgseq: &HealSequence) { + pub fn new() -> Arc> { + let mut workers = num_cpus::get() / 2; + if let Ok(env_heal_workers) = env::var("_RUSTFS_HEAL_WORKERS") { + if let Ok(num_healers) = env_heal_workers.parse::() { + workers = num_healers; + } + } + + if workers == 0 { + workers = 4; + } + + let (tx, rx) = mpsc::channel(100); + Arc::new(RwLock::new(Self { + tasks_tx: tx, + tasks_rx: rx, + workers, + })) + } + + pub async fn add_worker(&mut self, mut ctx: B_Receiver, bgseq: &mut HealSequence) { loop { select! { task = self.tasks_rx.recv() => { - let mut res = HealResultItem::default(); - let mut err: Error; + let mut d_res = HealResultItem::default(); + let d_err: Option; match task { Some(task) => { if task.bucket == NOP_HEAL { - err = Error::from_string("skip file"); + d_err = Some(Error::from_string("skip file")); } else if task.bucket == SLASH_SEPARATOR { - (res, err) = heal_disk_format(task.opts).await; + match heal_disk_format(task.opts).await { + Ok((res, err)) => { + d_res = res; + d_err = err; + }, + Err(err) => {d_err = Some(err)}, + } + } else { + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = lock + .as_ref() + .expect("Not init"); + if task.object.is_empty() { + match store.heal_object(&task.bucket, &task.object, &task.version_id, &task.opts).await { + Ok((res, err)) => { + d_res = res; + d_err = err; + }, + Err(err) => {d_err = Some(err)}, + } + } else { + match store.heal_object(&task.bucket, &task.object, &task.version_id, &task.opts).await { + Ok((res, err)) => { + d_res = res; + d_err = err; + }, + Err(err) => {d_err = Some(err)}, + } + } + } + if let Some(resp_tx) = task.resp_tx { + let _ = resp_tx.send(HealResult{result: d_res, err: d_err}).await; + } else { + // when respCh is not set caller is not waiting but we + // update the relevant metrics for them + if d_err.is_none() { + bgseq.count_healed(d_res.heal_item_type); + } else { + bgseq.count_failed(d_res.heal_item_type); + } } }, None => return, @@ -80,6 +142,15 @@ impl HealRoutine { // } -async fn heal_disk_format(opts: HealOpts) -> (HealResultItem, Error) { - todo!() +async fn heal_disk_format(opts: HealOpts) -> Result<(HealResultItem, Option)> { + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = lock.as_ref().expect("Not init"); + let (res, err) = store.heal_format(opts.dry_run).await?; + // return any error, ignore error returned when disks have + // already healed. + if err.is_some() { + return Ok((HealResultItem::default(), err)); + } + return Ok((res, err)); } diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs new file mode 100644 index 00000000..ed290818 --- /dev/null +++ b/ecstore/src/heal/data_scanner.rs @@ -0,0 +1,293 @@ +use std::{ + io::{Cursor, Read}, + sync::{atomic::{AtomicU32, AtomicU64}, Arc}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use byteorder::{LittleEndian, ReadBytesExt}; +use lazy_static::lazy_static; +use rand::Rng; +use rmp_serde::{Deserializer, Serializer}; +use serde::{Deserialize, Serialize}; +use tokio::{sync::{mpsc, RwLock}, time::sleep}; +use tracing::{error, info}; + +use crate::{ + config::{common::{read_config, save_config}, heal::Config}, + error::{Error, Result}, + global::GLOBAL_IsErasureSD, + heal::data_usage::BACKGROUND_HEAL_INFO_PATH, + new_object_layer_fn, + store::ECStore, +}; + +use super::{data_scanner_metric::globalScannerMetrics, data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}}; + +const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders. +const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles. +const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch. +const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch. +const DATA_SCANNER_COMPACT_AT_FOLDERS: u64 = DATA_SCANNER_COMPACT_AT_CHILDREN / 4; // Compact when this many subfolders in a single folder. +const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: u64 = 250_000; // Compact when this many subfolders in a single folder (even top level). +const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to wait on startup and between cycles. + +const HEAL_DELETE_DANGLING: bool = true; +const HEAL_OBJECT_SELECT_PROB: u64 = 1024; // Overall probability of a file being scanned; one in n. + +// static SCANNER_SLEEPER: () = new_dynamic_sleeper(2, Duration::from_secs(1), true); // Keep defaults same as config defaults +static SCANNER_CYCLE: AtomicU64 = AtomicU64::new(DATA_SCANNER_START_DELAY.as_secs()); +static SCANNER_IDLE_MODE: AtomicU32 = AtomicU32::new(0); // default is throttled when idle +static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100); +static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB +static SCANNER_EXCESS_FOLDERS: AtomicU64 = AtomicU64::new(50_000); + +lazy_static! { + pub static ref globalHealConfig: Arc> = Arc::new(RwLock::new(Config::default())); +} + +pub async fn init_data_scanner() { + let mut r = rand::thread_rng(); + let random = r.gen_range(0.0..1.0); + tokio::spawn(async move { + loop { + run_data_scanner().await; + let duration = Duration::from_secs_f64(random * (SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst) as f64)); + let sleep_duration = if duration < Duration::new(1, 0) { + Duration::new(1, 0) + } else { + duration + }; + sleep(sleep_duration).await; + } + }); +} + +async fn run_data_scanner() { + let mut cycle_info = CurrentScannerCycle::default(); + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => { + info!("errServerNotInitialized"); + return; + } + }; + let mut buf = read_config(store, &DATA_USAGE_BLOOM_NAME_PATH) + .await + .map_or(Vec::new(), |buf| buf); + if buf.len() == 8 { + cycle_info.next = match Cursor::new(buf).read_u64::() { + Ok(buf) => buf, + Err(_) => { + error!("can not decode DATA_USAGE_BLOOM_NAME_PATH"); + return; + } + }; + } else if buf.len() > 8 { + cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::() { + Ok(buf) => buf, + Err(_) => { + error!("can not decode DATA_USAGE_BLOOM_NAME_PATH"); + return; + } + }; + let _ = cycle_info.unmarshal_msg(&buf.split_off(8)); + } + + loop { + cycle_info.current = cycle_info.next; + cycle_info.started = SystemTime::now(); + { + globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; + } + + let bg_heal_info = read_background_heal_info(store).await; + let scan_mode = get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await; + if bg_heal_info.current_scan_mode != scan_mode { + let mut new_heal_info = bg_heal_info; + new_heal_info.current_scan_mode = scan_mode; + if scan_mode == HEAL_DEEP_SCAN { + new_heal_info.bitrot_start_time = SystemTime::now(); + new_heal_info.bitrot_start_cycle = cycle_info.current; + } + save_background_heal_info(store, &new_heal_info).await; + } + // Wait before starting next cycle and wait on startup. + let (tx, rx) = mpsc::channel(100); + tokio::spawn(async { + store_data_usage_in_backend(rx).await; + }); + sleep(Duration::from_secs(SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst))).await; + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct BackgroundHealInfo { + bitrot_start_time: SystemTime, + bitrot_start_cycle: u64, + current_scan_mode: HealScanMode, +} + +impl Default for BackgroundHealInfo { + fn default() -> Self { + Self { + bitrot_start_time: SystemTime::now(), + bitrot_start_cycle: Default::default(), + current_scan_mode: Default::default(), + } + } +} + +async fn read_background_heal_info(store: &ECStore) -> BackgroundHealInfo { + if *GLOBAL_IsErasureSD.read().await { + return BackgroundHealInfo::default(); + } + + let buf = read_config(store, &BACKGROUND_HEAL_INFO_PATH) + .await + .map_or(Vec::new(), |buf| buf); + if buf.is_empty() { + return BackgroundHealInfo::default(); + } + serde_json::from_slice::(&buf).map_or(BackgroundHealInfo::default(), |b| b) +} + +async fn save_background_heal_info(store: &ECStore, info: &BackgroundHealInfo) { + if *GLOBAL_IsErasureSD.read().await { + return; + } + let b = match serde_json::to_vec(info) { + Ok(info) => info, + Err(_) => return, + }; + let _ = save_config(store, &BACKGROUND_HEAL_INFO_PATH, &b).await; +} + +async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot_start_time: SystemTime) -> HealScanMode { + let bitrot_cycle = globalHealConfig.read().await.bitrot_scan_cycle(); + let v = bitrot_cycle.as_secs_f64() ; + if v == -1.0 { + return HEAL_NORMAL_SCAN; + } else if v == 0.0 { + return HEAL_DEEP_SCAN; + } + + if current_cycle - bitrot_start_cycle < HEAL_OBJECT_SELECT_PROB { + return HEAL_DEEP_SCAN; + } + + if bitrot_start_time.duration_since(SystemTime::now()).unwrap() > bitrot_cycle { + return HEAL_DEEP_SCAN; + } + + HEAL_NORMAL_SCAN +} + +#[derive(Clone, Debug)] +pub struct CurrentScannerCycle { + pub current: u64, + pub next: u64, + pub started: SystemTime, + pub cycle_completed: Vec, +} + +impl Default for CurrentScannerCycle { + fn default() -> Self { + Self { + current: Default::default(), + next: Default::default(), + started: SystemTime::now(), + cycle_completed: Default::default(), + } + } +} + +impl CurrentScannerCycle { + pub fn marshal_msg(&self) -> Result> { + let len: u32 = 4; + let mut wr = Vec::new(); + + // 字段数量 + rmp::encode::write_map_len(&mut wr, len)?; + + // write "current" + rmp::encode::write_str(&mut wr, "current")?; + rmp::encode::write_uint(&mut wr, self.current)?; + + // write "next" + rmp::encode::write_str(&mut wr, "next")?; + rmp::encode::write_uint(&mut wr, self.next)?; + + // write "started" + rmp::encode::write_str(&mut wr, "started")?; + rmp::encode::write_uint(&mut wr, system_time_to_timestamp(&self.started))?; + + // write "cycle_completed" + rmp::encode::write_str(&mut wr, "cycle_completed")?; + let mut buf = Vec::new(); + self.cycle_completed + .serialize(&mut Serializer::new(&mut buf)) + .expect("Serialization failed"); + rmp::encode::write_bin(&mut wr, &buf)?; + + Ok(wr) + } + + #[tracing::instrument] + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { + let mut cur = Cursor::new(buf); + + let mut fields_len = rmp::decode::read_map_len(&mut cur)?; + + while fields_len > 0 { + fields_len -= 1; + + let str_len = rmp::decode::read_str_len(&mut cur)?; + + // !!! Vec::with_capacity(str_len) 失败,vec!正常 + let mut field_buff = vec![0u8; str_len as usize]; + + cur.read_exact(&mut field_buff)?; + + let field = String::from_utf8(field_buff)?; + + match field.as_str() { + "current" => { + let u: u64 = rmp::decode::read_int(&mut cur)?; + self.current = u; + } + + "next" => { + let u: u64 = rmp::decode::read_int(&mut cur)?; + self.next = u; + } + "started" => { + let u: u64 = rmp::decode::read_int(&mut cur)?; + let started = timestamp_to_system_time(u); + self.started = started; + } + "cycleCompleted" => { + let mut buf = Vec::new(); + let _ = cur.read_to_end(&mut buf)?; + let u: Vec = + Deserialize::deserialize(&mut Deserializer::new(&buf[..])).expect("Deserialization failed"); + self.cycle_completed = u; + } + name => return Err(Error::msg(format!("not suport field name {}", name))), + } + } + + Ok(cur.position()) + } +} + +// 将 SystemTime 转换为时间戳 +fn system_time_to_timestamp(time: &SystemTime) -> u64 { + time.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs() +} + +// 将时间戳转换为 SystemTime +fn timestamp_to_system_time(timestamp: u64) -> SystemTime { + UNIX_EPOCH + std::time::Duration::new(timestamp, 0) +} diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs new file mode 100644 index 00000000..93af8310 --- /dev/null +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -0,0 +1,74 @@ +use std::{ + collections::HashMap, + sync::{atomic::{AtomicU32, Ordering}, Arc}, + time::SystemTime, +}; + +use lazy_static::lazy_static; +use tokio::sync::RwLock; + +use super::data_scanner::CurrentScannerCycle; + +lazy_static! { + pub static ref globalScannerMetrics: Arc> = Arc::new(RwLock::new(ScannerMetrics::new())); +} + +#[derive(Clone, Debug, PartialEq, PartialOrd)] +pub enum ScannerMetric { + // START Realtime metrics, that only to records + // last minute latencies and total operation count. + ReadMetadata = 0, + CheckMissing, + SaveUsage, + ApplyAll, + ApplyVersion, + TierObjSweep, + HealCheck, + Ilm, + CheckReplication, + Yield, + CleanAbandoned, + ApplyNonCurrent, + HealAbandonedVersion, + + // START Trace metrics: + StartTrace, + ScanObject, // Scan object. All operations included. + HealAbandonedObject, + + // END realtime metrics: + LastRealtime, + + // Trace only metrics: + ScanFolder, // Scan a folder on disk, recursively. + ScanCycle, // Full cycle, cluster global. + ScanBucketDrive, // Single bucket on one drive. + CompactFolder, // Folder compacted. + + // Must be last: + Last, +} + +pub struct ScannerMetrics { + operations: Vec, + cycle_info: RwLock>, +} + +impl ScannerMetrics { + pub fn new() -> Self { + Self { + operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU32::new(0)).collect(), + cycle_info: RwLock::new(None), + } + } + + pub fn log(&mut self, s: ScannerMetric, _paths: &[String], _custom: &HashMap, _start_time: SystemTime) { + // let duration = start_time.duration_since(start_time); + self.operations[s.clone() as usize].fetch_add(1, Ordering::SeqCst); + // Dodo + } + + pub async fn set_cycle(&mut self, c: Option) { + *self.cycle_info.write().await = c; + } +} diff --git a/ecstore/src/heal/data_usage.rs b/ecstore/src/heal/data_usage.rs new file mode 100644 index 00000000..3ffa3453 --- /dev/null +++ b/ecstore/src/heal/data_usage.rs @@ -0,0 +1,158 @@ +use std::{collections::HashMap, time::SystemTime}; + +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Receiver; +use tracing::info; + +use crate::{ + config::common::save_config, + disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, + new_object_layer_fn, + utils::path::SLASH_SEPARATOR, +}; + +pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR; +const DATA_USAGE_OBJ_NAME: &str = ".usage.json"; +const DATA_USAGE_BLOOM_NAME: &str = ".bloomcycle.bin"; +pub const DATA_USAGE_CACHE_NAME: &str = ".usage-cache.bin"; +lazy_static! { + pub static ref DATA_USAGE_BUCKET: String = format!("{}{}{}", RUSTFS_META_BUCKET, SLASH_SEPARATOR, BUCKET_META_PREFIX); + pub static ref DATA_USAGE_OBJ_NAME_PATH: String = format!("{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, DATA_USAGE_OBJ_NAME); + pub static ref DATA_USAGE_BLOOM_NAME_PATH: String = + format!("{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, DATA_USAGE_BLOOM_NAME); + pub static ref BACKGROUND_HEAL_INFO_PATH: String = + format!("{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, ".background-heal.json"); +} + +// BucketTargetUsageInfo - bucket target usage info provides +// - replicated size for all objects sent to this target +// - replica size for all objects received from this target +// - replication pending size for all objects pending replication to this target +// - replication failed size for all objects failed replication to this target +// - replica pending count +// - replica failed count +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct BucketTargetUsageInfo { + replication_pending_size: u64, + replication_failed_size: u64, + replicated_size: u64, + replica_size: u64, + replication_pending_count: u64, + replication_failed_count: u64, + replicated_count: u64, +} + +// BucketUsageInfo - bucket usage info provides +// - total size of the bucket +// - total objects in a bucket +// - object size histogram per bucket +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct BucketUsageInfo { + size: u64, + // Following five fields suffixed with V1 are here for backward compatibility + // Total Size for objects that have not yet been replicated + replication_pending_size_v1: u64, + // Total size for objects that have witness one or more failures and will be retried + replication_failed_size_v1: u64, + // Total size for objects that have been replicated to destination + replicated_size_v1: u64, + // Total number of objects pending replication + replication_pending_count_v1: u64, + // Total number of objects that failed replication + replication_failed_count_v1: u64, + + objects_count: u64, + object_size_histogram: HashMap, + object_versions_histogram: HashMap, + versions_count: u64, + delete_markers_count: u64, + replica_size: u64, + replica_count: u64, + replication_info: HashMap, +} + +// DataUsageInfo represents data usage stats of the underlying Object API +#[derive(Debug, Serialize, Deserialize)] +pub struct DataUsageInfo { + total_capacity: u64, + total_used_capacity: u64, + total_free_capacity: u64, + + // LastUpdate is the timestamp of when the data usage info was last updated. + // This does not indicate a full scan. + last_update: SystemTime, + + // Objects total count across all buckets + objects_total_count: u64, + // Versions total count across all buckets + versions_total_count: u64, + // Delete markers total count across all buckets + delete_markers_total_count: u64, + // Objects total size across all buckets + objects_total_size: u64, + replication_info: HashMap, + + // Total number of buckets in this cluster + buckets_count: u64, + // Buckets usage info provides following information across all buckets + // - total size of the bucket + // - total objects in a bucket + // - object size histogram per bucket + buckets_usage: HashMap, + // Deprecated kept here for backward compatibility reasons. + bucket_sizes: HashMap, + // Todo: TierStats + // TierStats contains per-tier stats of all configured remote tiers +} + +impl Default for DataUsageInfo { + fn default() -> Self { + Self { + total_capacity: Default::default(), + total_used_capacity: Default::default(), + total_free_capacity: Default::default(), + last_update: SystemTime::now(), + objects_total_count: Default::default(), + versions_total_count: Default::default(), + delete_markers_total_count: Default::default(), + objects_total_size: Default::default(), + replication_info: Default::default(), + buckets_count: Default::default(), + buckets_usage: Default::default(), + bucket_sizes: Default::default(), + } + } +} + +pub async fn store_data_usage_in_backend(mut rx: Receiver) { + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => { + info!("errServerNotInitialized"); + return; + } + }; + let mut attempts = 1; + loop { + match rx.recv().await { + Some(data_usage_info) => { + if let Ok(data) = serde_json::to_vec(&data_usage_info) { + if attempts > 10 { + let _ = save_config(store, &format!("{}{}", DATA_USAGE_OBJ_NAME_PATH.to_string(), ".bkp"), &data).await; + attempts += 1; + } + let _ = save_config(store, &DATA_USAGE_OBJ_NAME_PATH, &data).await; + attempts += 1; + } else { + continue; + } + } + None => { + return; + } + } + } +} diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs new file mode 100644 index 00000000..3bbf1f50 --- /dev/null +++ b/ecstore/src/heal/data_usage_cache.rs @@ -0,0 +1,234 @@ +use http::HeaderMap; +use rand::Rng; +use rmp_serde::Serializer; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use std::time::{Duration, SystemTime}; +use s3s::{S3Error, S3ErrorCode}; +use tokio::sync::mpsc::Sender; +use tokio::time::sleep; +use crate::config::common::save_config; +use crate::disk::error::DiskError; +use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; +use crate::error::{Error, Result}; +use crate::new_object_layer_fn; +use crate::set_disk::SetDisks; +use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions, StorageAPI}; + +use super::data_usage::DATA_USAGE_ROOT; + +// DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals +pub const DATA_USAGE_BUCKET_LEN: usize = 11; +pub const DATA_USAGE_VERSION_LEN: usize = 7; + +type DataUsageHashMap = HashSet; +// sizeHistogram is a size histogram. +type SizeHistogram = Vec; +// versionsHistogram is a histogram of number of versions in an object. +type VersionsHistogram = Vec; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplicationStats { + pub pending_size: u64, + pub replicated_size: u64, + pub failed_size: u64, + pub failed_count: u64, + pub pending_count: u64, + pub missed_threshold_size: u64, + pub after_threshold_size: u64, + pub missed_threshold_count: u64, + pub after_threshold_count: u64, + pub replicated_count: u64, +} + +impl ReplicationStats { + pub fn empty(&self) -> bool { + self.replicated_size == 0 && self.failed_size == 0 && self.failed_count == 0 + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplicationAllStats { + pub targets: HashMap, + pub replica_size: u64, + pub replica_count: u64, +} + +impl ReplicationAllStats { + pub fn empty(&self) -> bool { + if self.replica_size != 0 && self.replica_count != 0 { + return false; + } + for (_, v) in self.targets.iter() { + if !v.empty() { + return false; + } + } + + true + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct DataUsageEntry { + pub children: DataUsageHashMap, + // These fields do no include any children. + pub size: i64, + pub objects: u64, + pub versions: u64, + pub delete_markers: u64, + pub obj_sizes: SizeHistogram, + pub obj_versions: VersionsHistogram, + pub replication_stats: ReplicationAllStats, + // Todo: tier + // pub all_tier_stats: , + pub compacted: bool, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct DataUsageCacheInfo { + pub name: String, + pub next_cycle: usize, + pub last_update: SystemTime, + pub skip_healing: bool, + // todo: life_cycle + // pub life_cycle: + #[serde(skip)] + pub updates: Option>, + // Todo: replication + // #[serde(skip_serializing)] + // replication: +} + +impl Default for DataUsageCacheInfo { + fn default() -> Self { + Self { + name: Default::default(), + next_cycle: Default::default(), + last_update: SystemTime::now(), + skip_healing: Default::default(), + updates: Default::default(), + } + } +} + +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct DataUsageCache { + pub info: DataUsageCacheInfo, + pub cache: HashMap, +} + +impl DataUsageCache { + pub async fn load(store: &SetDisks, name: &str) -> Result { + let mut d = DataUsageCache::default(); + let mut retries = 0; + while retries < 5 { + let path = Path::new(BUCKET_META_PREFIX).join(name); + match store + .get_object_reader( + RUSTFS_META_BUCKET, + path.to_str().unwrap(), + HTTPRangeSpec::nil(), + HeaderMap::new(), + &ObjectOptions { + no_lock: true, + ..Default::default() + }, + ) + .await + { + Ok(mut reader) => { + if let Ok(info) = Self::unmarshal(&reader.read_all().await?) { + d = info + } + break; + } + Err(err) => match err.downcast_ref::() { + Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => { + match store + .get_object_reader( + RUSTFS_META_BUCKET, + name, + HTTPRangeSpec::nil(), + HeaderMap::new(), + &ObjectOptions { + no_lock: true, + ..Default::default() + }, + ) + .await + { + Ok(mut reader) => { + if let Ok(info) = Self::unmarshal(&reader.read_all().await?) { + d = info + } + break; + } + Err(_) => match err.downcast_ref::() { + Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => { + break; + } + _ => {} + }, + } + } + _ => {} + }, + } + retries += 1; + let mut rng = rand::thread_rng(); + sleep(Duration::from_millis(rng.gen_range(0..1_000))).await; + } + Ok(d) + } + + pub async fn save(&self, name: &str) -> Result<()> { + let buf = self.marshal_msg()?; + let buf_clone = buf.clone(); + let layer = new_object_layer_fn(); + let lock = layer.read().await; + let store = match lock.as_ref() { + Some(s) => s, + None => return Err(Error::from(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()))), + }; + let store_clone = store.clone(); + let name_clone = name.to_string(); + tokio::spawn(async move { + let _ = save_config(&store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await; + }); + save_config(&store, name, &buf).await + } + + pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) { + let hash = hash_path(path); + } + + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut Serializer::new(&mut buf))?; + + Ok(buf) + } + + pub fn unmarshal(buf: &[u8]) -> Result { + let t: Self = rmp_serde::from_slice(buf)?; + Ok(t) + } +} + +struct DataUsageHash(String); + +impl DataUsageHash { + +} + +pub fn hash_path(data: &str) -> DataUsageHash { + let mut data = data; + if data != DATA_USAGE_ROOT { + data = data.trim_matches('/'); + } + Path::new(&data); + todo!() +} diff --git a/ecstore/src/heal/heal_commands.rs b/ecstore/src/heal/heal_commands.rs index 39da448e..5c208d30 100644 --- a/ecstore/src/heal/heal_commands.rs +++ b/ecstore/src/heal/heal_commands.rs @@ -7,12 +7,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use crate::{ - disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, - error::{Error, Result}, - heal::heal_ops::HEALING_TRACKER_FILENAME, - new_object_layer_fn, - store_api::{BucketInfo, StorageAPI}, - utils::fs::read_file, + disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, error::{Error, Result}, global::GLOBAL_BackgroundHealState, heal::heal_ops::HEALING_TRACKER_FILENAME, new_object_layer_fn, store_api::{BucketInfo, StorageAPI}, utils::fs::read_file }; pub type HealScanMode = usize; @@ -50,7 +45,7 @@ pub struct HealOpts { pub set: Option, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct HealDriveInfo { pub uuid: String, pub endpoint: String, @@ -259,7 +254,7 @@ impl HealingTracker { let htracker_bytes = self.marshal_msg()?; - // TODO: globalBackgroundHealState + GLOBAL_BackgroundHealState.write().await.update_heal_status(&self).await; if let Some(disk) = &self.disk { let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME); @@ -430,10 +425,10 @@ async fn load_healing_tracker(disk: &Option) -> Result Result { +pub async fn init_healing_tracker(disk: DiskStore, heal_id: &str) -> Result { let mut healing_tracker = HealingTracker::default(); healing_tracker.id = disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string()); - healing_tracker.heal_id = heal_id; + healing_tracker.heal_id = heal_id.to_string(); healing_tracker.path = disk.to_string(); healing_tracker.endpoint = disk.endpoint().to_string(); healing_tracker.started = SystemTime::now() diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 9ef9ee37..0c9415d2 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -141,6 +141,7 @@ impl HealSequence { ..Default::default() } } + } impl HealSequence { @@ -160,7 +161,7 @@ impl HealSequence { self.heal_failed_items_map.clone() } - fn count_failed(&mut self, heal_type: HealItemType) { + pub fn count_failed(&mut self, heal_type: HealItemType) { *self.heal_failed_items_map.entry(heal_type).or_insert(0) += 1; self.last_heal_activity = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -168,7 +169,7 @@ impl HealSequence { .as_secs(); } - fn count_scanned(&mut self, heal_type: HealItemType) { + pub fn count_scanned(&mut self, heal_type: HealItemType) { *self.scanned_items_map.entry(heal_type).or_insert(0) += 1; self.last_heal_activity = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -176,7 +177,7 @@ impl HealSequence { .as_secs(); } - fn count_healed(&mut self, heal_type: HealItemType) { + pub fn count_healed(&mut self, heal_type: HealItemType) { *self.healed_items_map.entry(heal_type).or_insert(0) += 1; self.last_heal_activity = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -353,10 +354,25 @@ pub struct AllHealState { } impl AllHealState { - pub fn new(cleanup: bool) -> Self { - let hstate = AllHealState::default(); + pub fn new(cleanup: bool) -> Arc> { + let hstate = Arc::new(RwLock::new(AllHealState::default())); + let (_, mut rx) = broadcast::channel(1); if cleanup { - // spawn(f); + let hstate_clone = hstate.clone(); + tokio::spawn(async move { + loop { + select! { + result = rx.recv() =>{ + if let Ok(true) = result { + return; + } + } + _ = sleep(Duration::from_secs(5 * 60)) => { + hstate_clone.write().await.periodic_heal_seqs_clean().await; + } + } + } + }); } hstate @@ -382,7 +398,7 @@ impl AllHealState { }); } - async fn update_heal_status(&mut self, tracker: &HealingTracker) { + pub async fn update_heal_status(&mut self, tracker: &HealingTracker) { let _ = self.mu.write().await; let _ = tracker.mu.read().await; @@ -427,30 +443,19 @@ impl AllHealState { }); } - async fn periodic_heal_seqs_clean(&mut self, mut rx: Receiver) { - loop { - select! { - result = rx.recv() =>{ - if let Ok(true) = result { - return; - } - } - _ = sleep(Duration::from_secs(5 * 60)) => { - let _ = self.mu.write().await; - let now = SystemTime::now(); - - let mut keys_to_reomve = Vec::new(); - for (k, v) in self.heal_seq_map.iter() { - if v.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(v.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now { - keys_to_reomve.push(k.clone()) - } - } - for key in keys_to_reomve.iter() { - self.heal_seq_map.remove(key); - } - } + async fn periodic_heal_seqs_clean(&mut self) { + let _ = self.mu.write().await; + let now = SystemTime::now(); + + let mut keys_to_reomve = Vec::new(); + for (k, v) in self.heal_seq_map.iter() { + if v.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(v.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now { + keys_to_reomve.push(k.clone()) } } + for key in keys_to_reomve.iter() { + self.heal_seq_map.remove(key); + } } async fn get_heal_sequence_by_token(&self, token: &str) -> (Option, bool) { diff --git a/ecstore/src/heal/mod.rs b/ecstore/src/heal/mod.rs index 38db00f2..f477be10 100644 --- a/ecstore/src/heal/mod.rs +++ b/ecstore/src/heal/mod.rs @@ -1,3 +1,7 @@ pub mod background_heal_ops; +pub mod data_scanner; +pub mod data_scanner_metric; +pub mod data_usage; pub mod heal_commands; pub mod heal_ops; +pub mod data_usage_cache; diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 07551a34..7f0a32d9 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -9,7 +9,7 @@ pub mod erasure; pub mod error; mod file_meta; mod global; -mod heal; +pub mod heal; pub mod peer; mod quorum; pub mod set_disk; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index c5dc2c07..30d8c5a0 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -10,14 +10,18 @@ use uuid::Uuid; use crate::{ disk::{ + error::DiskError, format::{DistributionAlgoVersion, FormatV3}, - DiskAPI, DiskStore, + new_disk, DiskInfo, DiskOption, DiskStore, }, - endpoints::PoolEndpoints, + endpoints::{Endpoints, PoolEndpoints}, error::{Error, Result}, global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, heal::{ - heal_commands::{HealOpts, HealResultItem}, + heal_commands::{ + HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, + DRIVE_STATE_OK, HEAL_ITEM_METADATA, + }, heal_ops::HealObjectFn, }, set_disk::SetDisks, @@ -26,7 +30,9 @@ use crate::{ ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, }, - utils::hash, + store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file}, + + utils::hash,, }; use tokio::time::Duration; @@ -489,13 +495,99 @@ impl StorageAPI for Sets { async fn delete_bucket(&self, _bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> { unimplemented!() } - async fn heal_format(&self, dry_run: bool) -> Result { - unimplemented!() + async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)> { + let (disks, _) = init_storage_disks_with_errors( + &self.endpoints.endpoints, + &DiskOption { + cleanup: false, + health_check: false, + }, + ) + .await; + let (formats, errs) = load_format_erasure_all(&disks, true).await; + if let Err(err) = check_format_erasure_values(&formats, self.set_drive_count) { + return Ok((HealResultItem::default(), Some(err))); + } + let ref_format = match get_format_erasure_in_quorum(&formats) { + Ok(format) => format, + Err(err) => return Ok((HealResultItem::default(), Some(err))), + }; + let mut res = HealResultItem { + heal_item_type: HEAL_ITEM_METADATA.to_string(), + detail: "disk-format".to_string(), + disk_count: self.set_count * self.set_drive_count, + set_count: self.set_count, + ..Default::default() + }; + let before_derives = formats_to_drives_info(&self.endpoints.endpoints, &formats, &errs); + res.before = vec![HealDriveInfo::default(); before_derives.len()]; + res.after = vec![HealDriveInfo::default(); before_derives.len()]; + + for v in before_derives.iter() { + res.before.push(v.clone()); + res.after.push(v.clone()); + } + if DiskError::UnformattedDisk.count_errs(&errs) == 0 { + return Ok((res, Some(Error::new(DiskError::NoHealRequired)))); + } + + if !self.format.eq(&ref_format) { + return Ok((res, Some(Error::new(DiskError::CorruptedFormat)))); + } + + let format_op_id = Uuid::new_v4().to_string(); + let (new_format_sets, current_disks_info) = new_heal_format_sets(&ref_format, self.set_count, self.set_drive_count, &formats, &errs); + if !dry_run { + let mut tmp_new_formats = vec![None; self.set_count*self.set_drive_count]; + for (i, set) in new_format_sets.iter().enumerate() { + for (j, fm) in set.iter().enumerate() { + if let Some(fm) = fm { + res.after[i*self.set_drive_count+j].uuid = fm.erasure.this.to_string(); + res.after[i*self.set_drive_count+j].state = DRIVE_STATE_OK.to_string(); + tmp_new_formats[i*self.set_drive_count+j] = Some(fm.clone()); + } + } + } + // Save new formats `format.json` on unformatted disks. + for (fm, disk) in tmp_new_formats.iter_mut().zip(disks.iter()) { + if fm.is_some() && disk.is_some() { + if save_format_file(disk, fm, &format_op_id).await.is_err() { + let _ = disk.as_ref().unwrap().close().await; + *fm = None; + } + } + } + + for (index, fm) in tmp_new_formats.iter().enumerate() { + if let Some(fm) = fm { + let (m, n) = match ref_format.find_disk_index_by_disk_id(fm.erasure.this) { + Ok((m, n)) => (m, n), + Err(_) => continue, + }; + if let Some(set) = self.disk_set.get(m) { + if let Some(Some(disk)) = set.disks.read().await.get(n) { + let _ = disk.close().await; + } + } + + if let Some(Some(disk)) = disks.get(index) { + self.disk_set[m].renew_disk(&disk.endpoint()).await; + } + } + } + } + Ok((res, None)) } async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result { unimplemented!() } - async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result { + async fn heal_object( + &self, + bucket: &str, + object: &str, + version_id: &str, + opts: &HealOpts, + ) -> Result<(HealResultItem, Option)> { self.get_disks_by_key(object) .heal_object(bucket, object, version_id, opts) .await @@ -510,3 +602,119 @@ impl StorageAPI for Sets { unimplemented!() } } + +async fn close_storage_disks(disks: &[Option]) { + let mut futures = Vec::with_capacity(disks.len()); + for disk in disks.iter() { + if let Some(disk) = disk { + let disk = disk.clone(); + futures.push(tokio::spawn(async move { + let _ = disk.close().await; + })); + } + } + let _ = join_all(futures).await; +} + +async fn init_storage_disks_with_errors( + endpoints: &Endpoints, + opts: &DiskOption, +) -> (Vec>, Vec>) { + // Bootstrap disks. + let disks = Arc::new(RwLock::new(vec![None; endpoints.as_ref().len()])); + let errs = Arc::new(RwLock::new(vec![None; endpoints.as_ref().len()])); + let mut futures = Vec::with_capacity(endpoints.as_ref().len()); + for (index, endpoint) in endpoints.as_ref().iter().enumerate() { + let ep = endpoint.clone(); + let opt = opts.clone(); + let disks_clone = disks.clone(); + let errs_clone = errs.clone(); + futures.push(tokio::spawn(async move { + match new_disk(&ep, &opt).await { + Ok(disk) => { + disks_clone.write().await[index] = Some(disk); + errs_clone.write().await[index] = None; + } + Err(err) => { + disks_clone.write().await[index] = None; + errs_clone.write().await[index] = Some(err); + } + } + })); + } + let _ = join_all(futures).await; + let disks = disks.read().await.clone(); + let errs = errs.read().await.clone(); + (disks, errs) +} + +fn formats_to_drives_info(endpoints: &Endpoints, formats: &[Option], errs: &[Option]) -> Vec { + let mut before_drives = Vec::with_capacity(endpoints.as_ref().len()); + for (index, format) in formats.iter().enumerate() { + let drive = endpoints.get_string(index); + let mut state = if format.is_some() { + DRIVE_STATE_OK + } else { + if let Some(Some(err)) = errs.get(index) { + match err.downcast_ref::() { + Some(DiskError::UnformattedDisk) => DRIVE_STATE_MISSING, + Some(DiskError::DiskNotFound) => DRIVE_STATE_OFFLINE, + _ => DRIVE_STATE_CORRUPT, + }; + } + DRIVE_STATE_CORRUPT + }; + + let uuid = if let Some(format) = format { + format.erasure.this.to_string() + } else { + "".to_string() + }; + before_drives.push(HealDriveInfo { + uuid, + endpoint: drive, + state: state.to_string(), + }); + } + before_drives +} + +fn new_heal_format_sets( + ref_format: &FormatV3, + set_count: usize, + set_drive_count: usize, + formats: &[Option], + errs: &[Option], +) -> (Vec>>, Vec>) { + let mut new_formats = vec![vec![None; set_drive_count]; set_count]; + let mut current_disks_info = vec![vec![DiskInfo::default(); set_drive_count]; set_count]; + for (i, set) in ref_format.erasure.sets.iter().enumerate() { + for (j, value) in set.iter().enumerate() { + if let Some(Some(err)) = errs.get(i*set_drive_count+j) { + match err.downcast_ref::() { + Some(DiskError::UnformattedDisk) => { + let mut fm = FormatV3::new(set_count, set_drive_count); + fm.id = ref_format.id; + fm.format = ref_format.format.clone(); + fm.version = ref_format.version.clone(); + fm.erasure.this = ref_format.erasure.sets[i][j]; + fm.erasure.sets = ref_format.erasure.sets.clone(); + fm.erasure.version = ref_format.erasure.version.clone(); + fm.erasure.distribution_algo = ref_format.erasure.distribution_algo.clone(); + new_formats[i][j] = Some(fm); + }, + _ => {}, + } + } + if let (Some(format), None) = (&formats[i*set_drive_count+j], &errs[i*set_drive_count+j]) { + if let Some(info) = &format.disk_info { + if !info.endpoint.is_empty() { + current_disks_info[i][j] = info.clone(); + } + } + } + } + } + + (new_formats, current_disks_info) +} diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 9f361d48..b9d1c492 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -10,7 +10,8 @@ use crate::global::{ is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION, DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, }; -use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode}; +use crate::heal::data_usage::DataUsageInfo; +use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA}; use crate::heal::heal_ops::HealObjectFn; use crate::new_object_layer_fn; use crate::store_api::{ListMultipartsInfo, ObjectIO}; @@ -43,6 +44,7 @@ use http::HeaderMap; use lazy_static::lazy_static; use rand::Rng; use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration}; +use tokio::sync::mpsc::Sender; use std::cmp::Ordering; use std::slice::Iter; use std::{ @@ -52,10 +54,11 @@ use std::{ }; use time::OffsetDateTime; use tokio::fs; -use tokio::sync::Semaphore; +use tokio::sync::{mpsc, RwLock, Semaphore}; use tracing::{debug, info, warn}; use uuid::Uuid; +use crate::heal::data_usage_cache::DataUsageCache; const MAX_UPLOADS_LIST: usize = 10000; @@ -488,6 +491,47 @@ impl ECStore { internal_get_pool_info_existing_with_opts(&self.pools, bucket, object, opts).await } + pub async fn ns_scanner(&self, updates: Sender, want_cycle: usize, heal_scan_mode: HealScanMode) -> Result<()> { + let all_buckets = self.list_bucket(&BucketOptions::default()).await?; + if all_buckets.is_empty() { + let _ = updates.send(DataUsageInfo::default()).await; + return Ok(()); + } + + let mut total_results = 0; + let mut result_index = 0; + self.pools.iter().for_each(|pool| { + total_results += pool.disk_set.len(); + }); + let mut results = Arc::new(RwLock::new(vec![DataUsageCache::default(); total_results])); + let mut futures = Vec::new(); + for pool in self.pools.iter() { + for set in pool.disk_set.iter() { + let index = result_index; + let results_clone = results.clone(); + futures.push(async move { + let (tx, mut rx) = mpsc::channel(100); + let task = tokio::spawn(async move { + loop { + match rx.recv().await { + Some(info) => { + results_clone.write().await[index] = info; + }, + None => { + return ; + } + } + } + }); + + let _ = task.await; + }); + result_index += 1; + } + } + Ok(()) + } + async fn get_latest_object_info_with_idx( &self, bucket: &str, @@ -1369,51 +1413,93 @@ impl StorageAPI for ECStore { } counts } - async fn heal_format(&self, dry_run: bool) -> Result { - unimplemented!() + async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)> { + let mut r = HealResultItem { + heal_item_type: HEAL_ITEM_METADATA.to_string(), + detail: "disk-format".to_string(), + ..Default::default() + }; + + let mut count_no_heal = 0; + for pool in self.pools.iter() { + let (mut result, err) = pool.heal_format(dry_run).await?; + if let Some(err) = err { + match err.downcast_ref::() { + Some(DiskError::NoHealRequired) => { + count_no_heal += 1; + }, + _ => { + + continue; + } + } + } + r.disk_count += result.disk_count; + r.set_count += result.set_count; + r.before.append(&mut result.before); + r.after.append(&mut result.after); + + } + if count_no_heal == self.pools.len() { + return Ok((r, Some(Error::new(DiskError::NoHealRequired)))); + } + Ok((r, None)) } async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result { unimplemented!() } - async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result { + async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<(HealResultItem, Option)> { let object = utils::path::encode_dir_object(object); - let mut errs = HashMap::new(); - let mut results = HashMap::new(); + let errs = Arc::new(RwLock::new(vec![None; self.pools.len()])); + let results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.pools.len()])); + let mut futures = Vec::with_capacity(self.pools.len()); for (idx, pool) in self.pools.iter().enumerate() { //TODO: IsSuspended - match pool.heal_object(bucket, &object, version_id, opts).await { - Ok(mut result) => { - result.object = utils::path::decode_dir_object(&result.object); - results.insert(idx, result); + let object = object.clone(); + let results = results.clone(); + let errs = errs.clone(); + futures.push(async move { + match pool.heal_object(bucket, &object, version_id, opts).await { + Ok((mut result, err)) => { + result.object = utils::path::decode_dir_object(&result.object); + results.write().await.insert(idx, result); + errs.write().await.insert(idx, err); + } + Err(err) => { + errs.write().await.insert(idx, Some(err)); + } } - Err(err) => { - errs.insert(idx, err); - } - } + }); } + let _ = join_all(futures).await; // Return the first nil error - for i in 0..self.pools.len() { - if !errs.contains_key(&i) { - return Ok(results.remove(&i).unwrap()); + for (index, err) in errs.read().await.iter().enumerate() { + if err.is_none() { + return Ok((results.write().await.remove(index), None)); } } // No pool returned a nil error, return the first non 'not found' error - for (k, err) in errs.iter() { - match err.downcast_ref::() { - Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {} - _ => return Ok(results.remove(k).unwrap()), + for (index, err) in errs.read().await.iter().enumerate() { + match err { + Some(err) => match err.downcast_ref::() { + Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {} + _ => return Ok((results.write().await.remove(index), Some(err.clone()))), + }, + None => { + return Ok((results.write().await.remove(index), None)); + } } } // At this stage, all errors are 'not found' if !version_id.is_empty() { - return Err(Error::new(DiskError::FileVersionNotFound)); + return Ok((HealResultItem::default(), Some(Error::new(DiskError::FileVersionNotFound)))); } - Err(Error::new(DiskError::FileNotFound)) + Ok((HealResultItem::default(), Some(Error::new(DiskError::FileNotFound)))) } async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()> { let mut first_err = None; diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 8cc0342f..c64180d2 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -22,6 +22,8 @@ pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; pub const BLOCK_SIZE_V2: usize = 1048576; // 1M pub const RESERVED_METADATA_PREFIX: &str = "X-Rustfs-Internal-"; pub const RESERVED_METADATA_PREFIX_LOWER: &str = "X-Rustfs-Internal-"; +pub const RUSTFS_HEALING: &str = "X-Rustfs-Internal-healing"; +pub const RUSTFS_DATA_MOVE: &str = "X-Rustfs-Internal-data-mov"; // #[derive(Debug, Clone)] #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] @@ -100,6 +102,12 @@ impl FileInfo { false } + pub fn inline_data(&self) -> bool { + self.metadata.as_ref().map_or(false, |metadata| { + metadata.contains_key(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER)) && !self.is_remote() + }) + } + pub fn get_etag(&self) -> Option { if let Some(meta) = &self.metadata { meta.get("etag").cloned() @@ -237,6 +245,16 @@ impl FileInfo { Err(Error::msg("part not found")) } + pub fn set_healing(&mut self) { + if self.metadata.is_none() { + self.metadata = Some(HashMap::new()); + } + + if let Some(metadata) = self.metadata.as_mut() { + metadata.insert(RUSTFS_HEALING.to_string(), "true".to_string()); + } + } + pub fn set_inline_data(&mut self) { if let Some(meta) = self.metadata.as_mut() { meta.insert("x-rustfs-inline-data".to_owned(), "true".to_owned()); @@ -899,9 +917,9 @@ pub trait StorageAPI: ObjectIO { async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result; async fn delete_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; - async fn heal_format(&self, dry_run: bool) -> Result; + async fn heal_format(&self, dry_run: bool) ->Result<(HealResultItem, Option)>; async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result; - async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result; + async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<(HealResultItem, Option)>; async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()>; async fn get_pool_and_set(&self, id: &str) -> Result<(Option, Option, Option)>; async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>; diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index 6d8f7dd5..e531d647 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -2,12 +2,10 @@ use crate::config::{storageclass, KVS}; use crate::disk::DiskAPI; use crate::{ disk::{ - error::DiskError, - format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, - new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET, + error::DiskError, format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, new_disk, DiskInfoOptions, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET }, endpoints::Endpoints, - error::{Error, Result}, + error::{Error, Result}, heal::heal_commands::init_healing_tracker, }; use futures::future::join_all; use std::{ @@ -114,7 +112,7 @@ fn init_format_erasure( fms } -fn get_format_erasure_in_quorum(formats: &[Option]) -> Result { +pub fn get_format_erasure_in_quorum(formats: &[Option]) -> Result { let mut countmap = HashMap::new(); for f in formats.iter() { @@ -148,7 +146,7 @@ fn get_format_erasure_in_quorum(formats: &[Option]) -> Result], // disks: &Vec>, set_drive_count: usize, @@ -184,7 +182,7 @@ fn check_format_erasure_value(format: &FormatV3) -> Result<()> { } // load_format_erasure_all 读取所有foramt.json -async fn load_format_erasure_all(disks: &[Option], heal: bool) -> (Vec>, Vec>) { +pub async fn load_format_erasure_all(disks: &[Option], heal: bool) -> (Vec>, Vec>) { let mut futures = Vec::with_capacity(disks.len()); let mut datas = Vec::with_capacity(disks.len()); let mut errors = Vec::with_capacity(disks.len()); @@ -220,7 +218,7 @@ async fn load_format_erasure_all(disks: &[Option], heal: bool) -> (Ve (datas, errors) } -pub async fn load_format_erasure(disk: &DiskStore, _heal: bool) -> Result { +pub async fn load_format_erasure(disk: &DiskStore, heal: bool) -> Result { let data = disk .read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE) .await @@ -231,9 +229,15 @@ pub async fn load_format_erasure(disk: &DiskStore, _heal: bool) -> Result e, })?; - let fm = FormatV3::try_from(data.as_slice())?; + let mut fm = FormatV3::try_from(data.as_slice())?; - // TODO: heal + if heal { + let info = disk.disk_info(&DiskInfoOptions { + noop: heal, + ..Default::default() + }).await?; + fm.disk_info = Some(info); + } Ok(fm) } @@ -242,7 +246,7 @@ async fn save_format_file_all(disks: &[Option], formats: &[Option], formats: &[Option, format: &Option) -> Result<()> { +pub async fn save_format_file(disk: &Option, format: &Option, heal_id: &str) -> Result<()> { if disk.is_none() { return Err(Error::new(DiskError::DiskNotFound)); } @@ -281,6 +285,10 @@ async fn save_format_file(disk: &Option, format: &Option) - .await?; disk.set_disk_id(Some(format.erasure.this)).await?; + if !heal_id.is_empty() { + let mut ht = init_healing_tracker(disk.clone(), heal_id).await?; + return ht.save().await; + } Ok(()) } diff --git a/ecstore/src/utils/bool_flag.rs b/ecstore/src/utils/bool_flag.rs new file mode 100644 index 00000000..895fd976 --- /dev/null +++ b/ecstore/src/utils/bool_flag.rs @@ -0,0 +1,15 @@ +use crate::error::{Error, Result}; + +pub fn parse_bool(str: &str) -> Result { + match str { + "1"| "t"| "T"| "true"| "TRUE"| "True"| "on"| "ON"| "On"| "enabled" => { + return Ok(true); + }, + "0"| "f"| "F"| "false"| "FALSE"| "False"| "off"| "OFF"| "Off"| "disabled" => { + return Ok(false); + } + _ => { + return Err(Error::from_string(format!("ParseBool: parsing {}", str))); + } + } +} \ No newline at end of file diff --git a/ecstore/src/utils/mod.rs b/ecstore/src/utils/mod.rs index daff92a6..c1f6ce7d 100644 --- a/ecstore/src/utils/mod.rs +++ b/ecstore/src/utils/mod.rs @@ -1,5 +1,6 @@ pub mod crypto; pub mod ellipses; +pub mod bool_flag; pub mod fs; pub mod hash; pub mod net; diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 46daded3..975d4c9b 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -6,9 +6,12 @@ mod storage; use clap::Parser; use common::error::{Error, Result}; use ecstore::{ + bucket::metadata_sys::init_bucket_metadata_sys, endpoints::EndpointServerPools, + heal::data_scanner::init_data_scanner, set_global_endpoints, store::{init_local_disks, ECStore}, + store_api::{BucketOptions, StorageAPI}, update_erasure_type, }; use grpc::make_server; @@ -184,6 +187,8 @@ async fn run(opt: config::Opt) -> Result<()> { .map_err(|err| Error::from_string(err.to_string()))?; store.init().await.map_err(|err| Error::from_string(err.to_string()))?; + // init scanner + init_data_scanner().await; tokio::select! { _ = tokio::signal::ctrl_c() => {