This commit is contained in:
junxiang Mu
2024-11-11 17:33:25 +08:00
parent 4ff41d9416
commit 870ac2f371
25 changed files with 1413 additions and 112 deletions

View File

@@ -41,6 +41,7 @@ protos.workspace = true
rmp-serde = "1.3.0"
tokio-util = { version = "0.7.12", features = ["io", "compat"] }
crc32fast = "1.4.2"
rand = "0.8.5"
siphasher = "1.0.1"
base64-simd = "0.8.0"
sha2 = { version = "0.11.0-pre.4" }

View File

@@ -0,0 +1,65 @@
use std::time::Duration;
use crate::{
error::{Error, Result},
utils::bool_flag::parse_bool,
};
#[derive(Debug, Default)]
pub struct Config {
pub bitrot: String,
pub sleep: Duration,
pub io_count: usize,
pub drive_workers: usize,
pub cache: Duration,
}
impl Config {
pub fn bitrot_scan_cycle(&self) -> Duration {
self.cache
}
pub fn get_workers(&self) -> usize {
self.drive_workers
}
pub fn update(&mut self, nopts: &Config) {
self.bitrot = nopts.bitrot.clone();
self.io_count = nopts.io_count;
self.sleep = nopts.sleep;
self.drive_workers = nopts.drive_workers;
}
}
const RUSTFS_BITROT_CYCLE_IN_MONTHS: u64 = 1;
fn parse_bitrot_config(s: &str) -> Result<Duration> {
match parse_bool(s) {
Ok(enabled) => {
if enabled {
return Ok(Duration::from_secs_f64(0.0));
} else {
return Ok(Duration::from_secs_f64(-1.0));
}
}
Err(_) => {
if !s.ends_with("m") {
return Err(Error::from_string("unknown format"));
}
match s.trim_end_matches('m').parse::<u64>() {
Ok(months) => {
if months < RUSTFS_BITROT_CYCLE_IN_MONTHS {
return Err(Error::from_string(format!(
"minimum bitrot cycle is {} month(s)",
RUSTFS_BITROT_CYCLE_IN_MONTHS
)));
}
Ok(Duration::from_secs(months * 30 * 24 * 60))
}
Err(err) => Err(err.into()),
}
}
}
}

View File

@@ -1,5 +1,6 @@
pub mod common;
pub mod error;
pub mod heal;
pub mod storageclass;
use crate::error::Result;

View File

@@ -106,6 +106,9 @@ pub enum DiskError {
#[error("part missing or corrupt")]
PartMissingOrCorrupt,
#[error("No healing is required")]
NoHealRequired,
}
impl DiskError {
@@ -210,6 +213,7 @@ pub fn clone_disk_err(e: &DiskError) -> Error {
DiskError::MoreData => Error::new(DiskError::MoreData),
DiskError::OutdatedXLMeta => Error::new(DiskError::OutdatedXLMeta),
DiskError::PartMissingOrCorrupt => Error::new(DiskError::PartMissingOrCorrupt),
DiskError::NoHealRequired => Error::new(DiskError::NoHealRequired),
}
}

View File

@@ -1,4 +1,4 @@
use super::error::DiskError;
use super::{error::DiskError, DiskInfo};
use crate::error::{Error, Result};
use serde::{Deserialize, Serialize};
use serde_json::Error as JsonError;
@@ -31,7 +31,7 @@ pub enum FormatBackend {
///
/// The V3 format to support "large bucket" support where a bucket
/// can span multiple erasure sets.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FormatErasureV3 {
/// Version of 'xl' format.
pub version: FormatErasureVersion,
@@ -88,7 +88,7 @@ pub enum DistributionAlgoVersion {
///
/// Ideally we will never have a situation where we will have to change the
/// fields of this struct and deal with related migration.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FormatV3 {
/// Version of the format config.
pub version: FormatMetaVersion,
@@ -103,8 +103,8 @@ pub struct FormatV3 {
pub erasure: FormatErasureV3,
// /// DiskInfo is an extended type which returns current
// /// disk usage per path.
// #[serde(skip)]
// pub disk_info: Option<data_types::DeskInfo>,
#[serde(skip)]
pub disk_info: Option<DiskInfo>,
}
impl TryFrom<&[u8]> for FormatV3 {
@@ -146,7 +146,7 @@ impl FormatV3 {
format,
id: Uuid::new_v4(),
erasure,
// disk_info: None,
disk_info: None,
}
}

View File

@@ -467,7 +467,7 @@ pub struct DiskInfoOptions {
pub noop: bool,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DiskInfo {
pub total: u64,
pub free: u64,
@@ -489,7 +489,7 @@ pub struct DiskInfo {
pub error: String,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DiskMetrics {
api_calls: HashMap<String, u64>,
total_waiting: u32,
@@ -835,7 +835,7 @@ impl MetaCacheEntries {
}
}
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct DiskOption {
pub cleanup: bool,
pub health_check: bool,

View File

@@ -109,6 +109,16 @@ impl Endpoints {
pub fn into_inner(self) -> Vec<Endpoint> {
self.0
}
// GetString - returns endpoint string of i-th endpoint (0-based),
// and empty string for invalid indexes.
pub fn get_string(&self, i: usize) -> String {
if i < 0 || i >= self.0.len() {
return "".to_string();
}
self.0[i].to_string()
}
}
#[derive(Debug)]

View File

@@ -19,6 +19,7 @@ use uuid::Uuid;
// use crate::chunk_stream::ChunkedStream;
use crate::disk::error::DiskError;
#[derive(Default)]
pub struct Erasure {
data_shards: usize,
parity_shards: usize,
@@ -417,6 +418,50 @@ impl Erasure {
till_offset
}
pub async fn heal(&self, writers: &mut [Option<BitrotWriter>], readers: Vec<Option<BitrotReader>>, total_length: usize, prefer: &[bool]) -> Result<()> {
if writers.len() != self.parity_shards + self.data_shards {
return Err(Error::from_string("invalid argument"));
}
let mut reader = ShardReader::new(readers, self, 0, total_length);
let start_block = 0;
let mut end_block = total_length / self.block_size;
if total_length % self.block_size != 0 {
end_block += 1;
}
let mut bytes_writed = 0;
let mut errs = Vec::new();
for _ in start_block..=end_block {
let mut bufs = reader.read().await?;
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().reconstruct(&mut bufs)?;
}
let shards = bufs.into_iter().filter_map(|x| x).collect::<Vec<_>>();
if shards.len() != self.parity_shards + self.data_shards {
return Err(Error::from_string("can not reconstruct data"));
}
for (i, w) in writers.iter_mut().enumerate() {
if w.is_none() {
continue;
}
match w.as_mut().unwrap().write(shards[i].as_ref()).await {
Ok(_) => {},
Err(e) => errs.push(e),
}
}
}
if !errs.is_empty() {
return Err(errs[0].clone());
}
Ok(())
}
}
#[async_trait::async_trait]

View File

@@ -4,9 +4,7 @@ use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
disk::DiskStore,
endpoints::{EndpointServerPools, PoolEndpoints, SetupType},
store::ECStore,
disk::DiskStore, endpoints::{EndpointServerPools, PoolEndpoints, SetupType}, heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState}, store::ECStore
};
pub const DISK_ASSUME_UNKNOWN_SIZE: u64 = 1 << 30;
@@ -24,6 +22,8 @@ lazy_static! {
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
pub static ref GLOBAL_Endpoints: RwLock<EndpointServerPools> = RwLock::new(EndpointServerPools(Vec::new()));
pub static ref GLOBAL_RootDiskThreshold: RwLock<u64> = RwLock::new(0);
pub static ref GLOBAL_BackgroundHealRoutine: Arc<RwLock<HealRoutine>> = HealRoutine::new();
pub static ref GLOBAL_BackgroundHealState: Arc<RwLock<AllHealState>> = AllHealState::new(false);
static ref globalDeploymentIDPtr: RwLock<Uuid> = RwLock::new(Uuid::nil());
}

View File

@@ -1,14 +1,16 @@
use std::sync::Arc;
use std::{env, sync::Arc};
use tokio::{
select,
sync::{
broadcast::Receiver as B_Receiver,
mpsc::{self, Receiver, Sender},
mpsc::{self, Receiver, Sender}, RwLock,
},
};
use crate::{error::Error, heal::heal_ops::NOP_HEAL, utils::path::SLASH_SEPARATOR};
use crate::{
disk::error::DiskError, error::{Error, Result}, heal::heal_ops::NOP_HEAL, new_object_layer_fn, store_api::StorageAPI, utils::path::SLASH_SEPARATOR
};
use super::{
heal_commands::{HealOpts, HealResultItem},
@@ -21,8 +23,8 @@ pub struct HealTask {
pub object: String,
pub version_id: String,
pub opts: HealOpts,
pub resp_tx: Arc<Sender<HealResult>>,
pub resp_rx: Arc<Receiver<HealResult>>,
pub resp_tx: Option<Arc<Sender<HealResult>>>,
pub resp_rx: Option<Arc<Receiver<HealResult>>>,
}
impl HealTask {
@@ -33,15 +35,15 @@ impl HealTask {
object: object.to_string(),
version_id: version_id.to_string(),
opts: opts.clone(),
resp_tx: tx.into(),
resp_rx: rx.into(),
resp_tx: Some(tx.into()),
resp_rx: Some(rx.into()),
}
}
}
pub struct HealResult {
pub result: HealResultItem,
err: Error,
err: Option<Error>,
}
pub struct HealRoutine {
@@ -51,18 +53,78 @@ pub struct HealRoutine {
}
impl HealRoutine {
pub async fn add_worker(&mut self, mut ctx: B_Receiver<bool>, bgseq: &HealSequence) {
pub fn new() -> Arc<RwLock<Self>> {
let mut workers = num_cpus::get() / 2;
if let Ok(env_heal_workers) = env::var("_RUSTFS_HEAL_WORKERS") {
if let Ok(num_healers) = env_heal_workers.parse::<usize>() {
workers = num_healers;
}
}
if workers == 0 {
workers = 4;
}
let (tx, rx) = mpsc::channel(100);
Arc::new(RwLock::new(Self {
tasks_tx: tx,
tasks_rx: rx,
workers,
}))
}
pub async fn add_worker(&mut self, mut ctx: B_Receiver<bool>, bgseq: &mut HealSequence) {
loop {
select! {
task = self.tasks_rx.recv() => {
let mut res = HealResultItem::default();
let mut err: Error;
let mut d_res = HealResultItem::default();
let d_err: Option<Error>;
match task {
Some(task) => {
if task.bucket == NOP_HEAL {
err = Error::from_string("skip file");
d_err = Some(Error::from_string("skip file"));
} else if task.bucket == SLASH_SEPARATOR {
(res, err) = heal_disk_format(task.opts).await;
match heal_disk_format(task.opts).await {
Ok((res, err)) => {
d_res = res;
d_err = err;
},
Err(err) => {d_err = Some(err)},
}
} else {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.expect("Not init");
if task.object.is_empty() {
match store.heal_object(&task.bucket, &task.object, &task.version_id, &task.opts).await {
Ok((res, err)) => {
d_res = res;
d_err = err;
},
Err(err) => {d_err = Some(err)},
}
} else {
match store.heal_object(&task.bucket, &task.object, &task.version_id, &task.opts).await {
Ok((res, err)) => {
d_res = res;
d_err = err;
},
Err(err) => {d_err = Some(err)},
}
}
}
if let Some(resp_tx) = task.resp_tx {
let _ = resp_tx.send(HealResult{result: d_res, err: d_err}).await;
} else {
// when respCh is not set caller is not waiting but we
// update the relevant metrics for them
if d_err.is_none() {
bgseq.count_healed(d_res.heal_item_type);
} else {
bgseq.count_failed(d_res.heal_item_type);
}
}
},
None => return,
@@ -80,6 +142,15 @@ impl HealRoutine {
// }
async fn heal_disk_format(opts: HealOpts) -> (HealResultItem, Error) {
todo!()
async fn heal_disk_format(opts: HealOpts) -> Result<(HealResultItem, Option<Error>)> {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock.as_ref().expect("Not init");
let (res, err) = store.heal_format(opts.dry_run).await?;
// return any error, ignore error returned when disks have
// already healed.
if err.is_some() {
return Ok((HealResultItem::default(), err));
}
return Ok((res, err));
}

View File

@@ -0,0 +1,293 @@
use std::{
io::{Cursor, Read},
sync::{atomic::{AtomicU32, AtomicU64}, Arc},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use byteorder::{LittleEndian, ReadBytesExt};
use lazy_static::lazy_static;
use rand::Rng;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use tokio::{sync::{mpsc, RwLock}, time::sleep};
use tracing::{error, info};
use crate::{
config::{common::{read_config, save_config}, heal::Config},
error::{Error, Result},
global::GLOBAL_IsErasureSD,
heal::data_usage::BACKGROUND_HEAL_INFO_PATH,
new_object_layer_fn,
store::ECStore,
};
use super::{data_scanner_metric::globalScannerMetrics, data_usage::{store_data_usage_in_backend, DATA_USAGE_BLOOM_NAME_PATH}, heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}};
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch.
const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch.
const DATA_SCANNER_COMPACT_AT_FOLDERS: u64 = DATA_SCANNER_COMPACT_AT_CHILDREN / 4; // Compact when this many subfolders in a single folder.
const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: u64 = 250_000; // Compact when this many subfolders in a single folder (even top level).
const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to wait on startup and between cycles.
const HEAL_DELETE_DANGLING: bool = true;
const HEAL_OBJECT_SELECT_PROB: u64 = 1024; // Overall probability of a file being scanned; one in n.
// static SCANNER_SLEEPER: () = new_dynamic_sleeper(2, Duration::from_secs(1), true); // Keep defaults same as config defaults
static SCANNER_CYCLE: AtomicU64 = AtomicU64::new(DATA_SCANNER_START_DELAY.as_secs());
static SCANNER_IDLE_MODE: AtomicU32 = AtomicU32::new(0); // default is throttled when idle
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB
static SCANNER_EXCESS_FOLDERS: AtomicU64 = AtomicU64::new(50_000);
lazy_static! {
pub static ref globalHealConfig: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::default()));
}
pub async fn init_data_scanner() {
let mut r = rand::thread_rng();
let random = r.gen_range(0.0..1.0);
tokio::spawn(async move {
loop {
run_data_scanner().await;
let duration = Duration::from_secs_f64(random * (SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst) as f64));
let sleep_duration = if duration < Duration::new(1, 0) {
Duration::new(1, 0)
} else {
duration
};
sleep(sleep_duration).await;
}
});
}
async fn run_data_scanner() {
let mut cycle_info = CurrentScannerCycle::default();
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => {
info!("errServerNotInitialized");
return;
}
};
let mut buf = read_config(store, &DATA_USAGE_BLOOM_NAME_PATH)
.await
.map_or(Vec::new(), |buf| buf);
if buf.len() == 8 {
cycle_info.next = match Cursor::new(buf).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
} else if buf.len() > 8 {
cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
let _ = cycle_info.unmarshal_msg(&buf.split_off(8));
}
loop {
cycle_info.current = cycle_info.next;
cycle_info.started = SystemTime::now();
{
globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await;
}
let bg_heal_info = read_background_heal_info(store).await;
let scan_mode = get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await;
if bg_heal_info.current_scan_mode != scan_mode {
let mut new_heal_info = bg_heal_info;
new_heal_info.current_scan_mode = scan_mode;
if scan_mode == HEAL_DEEP_SCAN {
new_heal_info.bitrot_start_time = SystemTime::now();
new_heal_info.bitrot_start_cycle = cycle_info.current;
}
save_background_heal_info(store, &new_heal_info).await;
}
// Wait before starting next cycle and wait on startup.
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async {
store_data_usage_in_backend(rx).await;
});
sleep(Duration::from_secs(SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst))).await;
}
}
#[derive(Debug, Serialize, Deserialize)]
struct BackgroundHealInfo {
bitrot_start_time: SystemTime,
bitrot_start_cycle: u64,
current_scan_mode: HealScanMode,
}
impl Default for BackgroundHealInfo {
fn default() -> Self {
Self {
bitrot_start_time: SystemTime::now(),
bitrot_start_cycle: Default::default(),
current_scan_mode: Default::default(),
}
}
}
async fn read_background_heal_info(store: &ECStore) -> BackgroundHealInfo {
if *GLOBAL_IsErasureSD.read().await {
return BackgroundHealInfo::default();
}
let buf = read_config(store, &BACKGROUND_HEAL_INFO_PATH)
.await
.map_or(Vec::new(), |buf| buf);
if buf.is_empty() {
return BackgroundHealInfo::default();
}
serde_json::from_slice::<BackgroundHealInfo>(&buf).map_or(BackgroundHealInfo::default(), |b| b)
}
async fn save_background_heal_info(store: &ECStore, info: &BackgroundHealInfo) {
if *GLOBAL_IsErasureSD.read().await {
return;
}
let b = match serde_json::to_vec(info) {
Ok(info) => info,
Err(_) => return,
};
let _ = save_config(store, &BACKGROUND_HEAL_INFO_PATH, &b).await;
}
async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot_start_time: SystemTime) -> HealScanMode {
let bitrot_cycle = globalHealConfig.read().await.bitrot_scan_cycle();
let v = bitrot_cycle.as_secs_f64() ;
if v == -1.0 {
return HEAL_NORMAL_SCAN;
} else if v == 0.0 {
return HEAL_DEEP_SCAN;
}
if current_cycle - bitrot_start_cycle < HEAL_OBJECT_SELECT_PROB {
return HEAL_DEEP_SCAN;
}
if bitrot_start_time.duration_since(SystemTime::now()).unwrap() > bitrot_cycle {
return HEAL_DEEP_SCAN;
}
HEAL_NORMAL_SCAN
}
#[derive(Clone, Debug)]
pub struct CurrentScannerCycle {
pub current: u64,
pub next: u64,
pub started: SystemTime,
pub cycle_completed: Vec<SystemTime>,
}
impl Default for CurrentScannerCycle {
fn default() -> Self {
Self {
current: Default::default(),
next: Default::default(),
started: SystemTime::now(),
cycle_completed: Default::default(),
}
}
}
impl CurrentScannerCycle {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let len: u32 = 4;
let mut wr = Vec::new();
// 字段数量
rmp::encode::write_map_len(&mut wr, len)?;
// write "current"
rmp::encode::write_str(&mut wr, "current")?;
rmp::encode::write_uint(&mut wr, self.current)?;
// write "next"
rmp::encode::write_str(&mut wr, "next")?;
rmp::encode::write_uint(&mut wr, self.next)?;
// write "started"
rmp::encode::write_str(&mut wr, "started")?;
rmp::encode::write_uint(&mut wr, system_time_to_timestamp(&self.started))?;
// write "cycle_completed"
rmp::encode::write_str(&mut wr, "cycle_completed")?;
let mut buf = Vec::new();
self.cycle_completed
.serialize(&mut Serializer::new(&mut buf))
.expect("Serialization failed");
rmp::encode::write_bin(&mut wr, &buf)?;
Ok(wr)
}
#[tracing::instrument]
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let mut cur = Cursor::new(buf);
let mut fields_len = rmp::decode::read_map_len(&mut cur)?;
while fields_len > 0 {
fields_len -= 1;
let str_len = rmp::decode::read_str_len(&mut cur)?;
// Vec::with_capacity(str_len) 失败vec!正常
let mut field_buff = vec![0u8; str_len as usize];
cur.read_exact(&mut field_buff)?;
let field = String::from_utf8(field_buff)?;
match field.as_str() {
"current" => {
let u: u64 = rmp::decode::read_int(&mut cur)?;
self.current = u;
}
"next" => {
let u: u64 = rmp::decode::read_int(&mut cur)?;
self.next = u;
}
"started" => {
let u: u64 = rmp::decode::read_int(&mut cur)?;
let started = timestamp_to_system_time(u);
self.started = started;
}
"cycleCompleted" => {
let mut buf = Vec::new();
let _ = cur.read_to_end(&mut buf)?;
let u: Vec<SystemTime> =
Deserialize::deserialize(&mut Deserializer::new(&buf[..])).expect("Deserialization failed");
self.cycle_completed = u;
}
name => return Err(Error::msg(format!("not suport field name {}", name))),
}
}
Ok(cur.position())
}
}
// 将 SystemTime 转换为时间戳
fn system_time_to_timestamp(time: &SystemTime) -> u64 {
time.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs()
}
// 将时间戳转换为 SystemTime
fn timestamp_to_system_time(timestamp: u64) -> SystemTime {
UNIX_EPOCH + std::time::Duration::new(timestamp, 0)
}

View File

@@ -0,0 +1,74 @@
use std::{
collections::HashMap,
sync::{atomic::{AtomicU32, Ordering}, Arc},
time::SystemTime,
};
use lazy_static::lazy_static;
use tokio::sync::RwLock;
use super::data_scanner::CurrentScannerCycle;
lazy_static! {
pub static ref globalScannerMetrics: Arc<RwLock<ScannerMetrics>> = Arc::new(RwLock::new(ScannerMetrics::new()));
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum ScannerMetric {
// START Realtime metrics, that only to records
// last minute latencies and total operation count.
ReadMetadata = 0,
CheckMissing,
SaveUsage,
ApplyAll,
ApplyVersion,
TierObjSweep,
HealCheck,
Ilm,
CheckReplication,
Yield,
CleanAbandoned,
ApplyNonCurrent,
HealAbandonedVersion,
// START Trace metrics:
StartTrace,
ScanObject, // Scan object. All operations included.
HealAbandonedObject,
// END realtime metrics:
LastRealtime,
// Trace only metrics:
ScanFolder, // Scan a folder on disk, recursively.
ScanCycle, // Full cycle, cluster global.
ScanBucketDrive, // Single bucket on one drive.
CompactFolder, // Folder compacted.
// Must be last:
Last,
}
pub struct ScannerMetrics {
operations: Vec<AtomicU32>,
cycle_info: RwLock<Option<CurrentScannerCycle>>,
}
impl ScannerMetrics {
pub fn new() -> Self {
Self {
operations: (0..ScannerMetric::Last as usize).map(|_| AtomicU32::new(0)).collect(),
cycle_info: RwLock::new(None),
}
}
pub fn log(&mut self, s: ScannerMetric, _paths: &[String], _custom: &HashMap<String, String>, _start_time: SystemTime) {
// let duration = start_time.duration_since(start_time);
self.operations[s.clone() as usize].fetch_add(1, Ordering::SeqCst);
// Dodo
}
pub async fn set_cycle(&mut self, c: Option<CurrentScannerCycle>) {
*self.cycle_info.write().await = c;
}
}

View File

@@ -0,0 +1,158 @@
use std::{collections::HashMap, time::SystemTime};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;
use tracing::info;
use crate::{
config::common::save_config,
disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
new_object_layer_fn,
utils::path::SLASH_SEPARATOR,
};
pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR;
const DATA_USAGE_OBJ_NAME: &str = ".usage.json";
const DATA_USAGE_BLOOM_NAME: &str = ".bloomcycle.bin";
pub const DATA_USAGE_CACHE_NAME: &str = ".usage-cache.bin";
lazy_static! {
pub static ref DATA_USAGE_BUCKET: String = format!("{}{}{}", RUSTFS_META_BUCKET, SLASH_SEPARATOR, BUCKET_META_PREFIX);
pub static ref DATA_USAGE_OBJ_NAME_PATH: String = format!("{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, DATA_USAGE_OBJ_NAME);
pub static ref DATA_USAGE_BLOOM_NAME_PATH: String =
format!("{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, DATA_USAGE_BLOOM_NAME);
pub static ref BACKGROUND_HEAL_INFO_PATH: String =
format!("{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, ".background-heal.json");
}
// BucketTargetUsageInfo - bucket target usage info provides
// - replicated size for all objects sent to this target
// - replica size for all objects received from this target
// - replication pending size for all objects pending replication to this target
// - replication failed size for all objects failed replication to this target
// - replica pending count
// - replica failed count
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketTargetUsageInfo {
replication_pending_size: u64,
replication_failed_size: u64,
replicated_size: u64,
replica_size: u64,
replication_pending_count: u64,
replication_failed_count: u64,
replicated_count: u64,
}
// BucketUsageInfo - bucket usage info provides
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketUsageInfo {
size: u64,
// Following five fields suffixed with V1 are here for backward compatibility
// Total Size for objects that have not yet been replicated
replication_pending_size_v1: u64,
// Total size for objects that have witness one or more failures and will be retried
replication_failed_size_v1: u64,
// Total size for objects that have been replicated to destination
replicated_size_v1: u64,
// Total number of objects pending replication
replication_pending_count_v1: u64,
// Total number of objects that failed replication
replication_failed_count_v1: u64,
objects_count: u64,
object_size_histogram: HashMap<String, u64>,
object_versions_histogram: HashMap<String, u64>,
versions_count: u64,
delete_markers_count: u64,
replica_size: u64,
replica_count: u64,
replication_info: HashMap<String, BucketTargetUsageInfo>,
}
// DataUsageInfo represents data usage stats of the underlying Object API
#[derive(Debug, Serialize, Deserialize)]
pub struct DataUsageInfo {
total_capacity: u64,
total_used_capacity: u64,
total_free_capacity: u64,
// LastUpdate is the timestamp of when the data usage info was last updated.
// This does not indicate a full scan.
last_update: SystemTime,
// Objects total count across all buckets
objects_total_count: u64,
// Versions total count across all buckets
versions_total_count: u64,
// Delete markers total count across all buckets
delete_markers_total_count: u64,
// Objects total size across all buckets
objects_total_size: u64,
replication_info: HashMap<String, BucketTargetUsageInfo>,
// Total number of buckets in this cluster
buckets_count: u64,
// Buckets usage info provides following information across all buckets
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
buckets_usage: HashMap<String, BucketUsageInfo>,
// Deprecated kept here for backward compatibility reasons.
bucket_sizes: HashMap<String, u64>,
// Todo: TierStats
// TierStats contains per-tier stats of all configured remote tiers
}
impl Default for DataUsageInfo {
fn default() -> Self {
Self {
total_capacity: Default::default(),
total_used_capacity: Default::default(),
total_free_capacity: Default::default(),
last_update: SystemTime::now(),
objects_total_count: Default::default(),
versions_total_count: Default::default(),
delete_markers_total_count: Default::default(),
objects_total_size: Default::default(),
replication_info: Default::default(),
buckets_count: Default::default(),
buckets_usage: Default::default(),
bucket_sizes: Default::default(),
}
}
}
pub async fn store_data_usage_in_backend(mut rx: Receiver<DataUsageInfo>) {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => {
info!("errServerNotInitialized");
return;
}
};
let mut attempts = 1;
loop {
match rx.recv().await {
Some(data_usage_info) => {
if let Ok(data) = serde_json::to_vec(&data_usage_info) {
if attempts > 10 {
let _ = save_config(store, &format!("{}{}", DATA_USAGE_OBJ_NAME_PATH.to_string(), ".bkp"), &data).await;
attempts += 1;
}
let _ = save_config(store, &DATA_USAGE_OBJ_NAME_PATH, &data).await;
attempts += 1;
} else {
continue;
}
}
None => {
return;
}
}
}
}

View File

@@ -0,0 +1,234 @@
use http::HeaderMap;
use rand::Rng;
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::{Duration, SystemTime};
use s3s::{S3Error, S3ErrorCode};
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use crate::config::common::save_config;
use crate::disk::error::DiskError;
use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::error::{Error, Result};
use crate::new_object_layer_fn;
use crate::set_disk::SetDisks;
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions, StorageAPI};
use super::data_usage::DATA_USAGE_ROOT;
// DATA_USAGE_BUCKET_LEN must be length of ObjectsHistogramIntervals
pub const DATA_USAGE_BUCKET_LEN: usize = 11;
pub const DATA_USAGE_VERSION_LEN: usize = 7;
type DataUsageHashMap = HashSet<String>;
// sizeHistogram is a size histogram.
type SizeHistogram = Vec<u64>;
// versionsHistogram is a histogram of number of versions in an object.
type VersionsHistogram = Vec<u64>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationStats {
pub pending_size: u64,
pub replicated_size: u64,
pub failed_size: u64,
pub failed_count: u64,
pub pending_count: u64,
pub missed_threshold_size: u64,
pub after_threshold_size: u64,
pub missed_threshold_count: u64,
pub after_threshold_count: u64,
pub replicated_count: u64,
}
impl ReplicationStats {
pub fn empty(&self) -> bool {
self.replicated_size == 0 && self.failed_size == 0 && self.failed_count == 0
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationAllStats {
pub targets: HashMap<String, ReplicationStats>,
pub replica_size: u64,
pub replica_count: u64,
}
impl ReplicationAllStats {
pub fn empty(&self) -> bool {
if self.replica_size != 0 && self.replica_count != 0 {
return false;
}
for (_, v) in self.targets.iter() {
if !v.empty() {
return false;
}
}
true
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct DataUsageEntry {
pub children: DataUsageHashMap,
// These fields do no include any children.
pub size: i64,
pub objects: u64,
pub versions: u64,
pub delete_markers: u64,
pub obj_sizes: SizeHistogram,
pub obj_versions: VersionsHistogram,
pub replication_stats: ReplicationAllStats,
// Todo: tier
// pub all_tier_stats: ,
pub compacted: bool,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct DataUsageCacheInfo {
pub name: String,
pub next_cycle: usize,
pub last_update: SystemTime,
pub skip_healing: bool,
// todo: life_cycle
// pub life_cycle:
#[serde(skip)]
pub updates: Option<Sender<DataUsageEntry>>,
// Todo: replication
// #[serde(skip_serializing)]
// replication:
}
impl Default for DataUsageCacheInfo {
fn default() -> Self {
Self {
name: Default::default(),
next_cycle: Default::default(),
last_update: SystemTime::now(),
skip_healing: Default::default(),
updates: Default::default(),
}
}
}
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct DataUsageCache {
pub info: DataUsageCacheInfo,
pub cache: HashMap<String, DataUsageEntry>,
}
impl DataUsageCache {
pub async fn load(store: &SetDisks, name: &str) -> Result<Self> {
let mut d = DataUsageCache::default();
let mut retries = 0;
while retries < 5 {
let path = Path::new(BUCKET_META_PREFIX).join(name);
match store
.get_object_reader(
RUSTFS_META_BUCKET,
path.to_str().unwrap(),
HTTPRangeSpec::nil(),
HeaderMap::new(),
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.await
{
Ok(mut reader) => {
if let Ok(info) = Self::unmarshal(&reader.read_all().await?) {
d = info
}
break;
}
Err(err) => match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => {
match store
.get_object_reader(
RUSTFS_META_BUCKET,
name,
HTTPRangeSpec::nil(),
HeaderMap::new(),
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.await
{
Ok(mut reader) => {
if let Ok(info) = Self::unmarshal(&reader.read_all().await?) {
d = info
}
break;
}
Err(_) => match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => {
break;
}
_ => {}
},
}
}
_ => {}
},
}
retries += 1;
let mut rng = rand::thread_rng();
sleep(Duration::from_millis(rng.gen_range(0..1_000))).await;
}
Ok(d)
}
pub async fn save(&self, name: &str) -> Result<()> {
let buf = self.marshal_msg()?;
let buf_clone = buf.clone();
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::from(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()))),
};
let store_clone = store.clone();
let name_clone = name.to_string();
tokio::spawn(async move {
let _ = save_config(&store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await;
});
save_config(&store, name, &buf).await
}
pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) {
let hash = hash_path(path);
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut Serializer::new(&mut buf))?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Self = rmp_serde::from_slice(buf)?;
Ok(t)
}
}
struct DataUsageHash(String);
impl DataUsageHash {
}
pub fn hash_path(data: &str) -> DataUsageHash {
let mut data = data;
if data != DATA_USAGE_ROOT {
data = data.trim_matches('/');
}
Path::new(&data);
todo!()
}

View File

@@ -7,12 +7,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::{
disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
error::{Error, Result},
heal::heal_ops::HEALING_TRACKER_FILENAME,
new_object_layer_fn,
store_api::{BucketInfo, StorageAPI},
utils::fs::read_file,
disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, error::{Error, Result}, global::GLOBAL_BackgroundHealState, heal::heal_ops::HEALING_TRACKER_FILENAME, new_object_layer_fn, store_api::{BucketInfo, StorageAPI}, utils::fs::read_file
};
pub type HealScanMode = usize;
@@ -50,7 +45,7 @@ pub struct HealOpts {
pub set: Option<usize>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct HealDriveInfo {
pub uuid: String,
pub endpoint: String,
@@ -259,7 +254,7 @@ impl HealingTracker {
let htracker_bytes = self.marshal_msg()?;
// TODO: globalBackgroundHealState
GLOBAL_BackgroundHealState.write().await.update_heal_status(&self).await;
if let Some(disk) = &self.disk {
let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME);
@@ -430,10 +425,10 @@ async fn load_healing_tracker(disk: &Option<DiskStore>) -> Result<HealingTracker
}
}
async fn init_healing_tracker(disk: DiskStore, heal_id: String) -> Result<HealingTracker> {
pub async fn init_healing_tracker(disk: DiskStore, heal_id: &str) -> Result<HealingTracker> {
let mut healing_tracker = HealingTracker::default();
healing_tracker.id = disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string());
healing_tracker.heal_id = heal_id;
healing_tracker.heal_id = heal_id.to_string();
healing_tracker.path = disk.to_string();
healing_tracker.endpoint = disk.endpoint().to_string();
healing_tracker.started = SystemTime::now()

View File

@@ -141,6 +141,7 @@ impl HealSequence {
..Default::default()
}
}
}
impl HealSequence {
@@ -160,7 +161,7 @@ impl HealSequence {
self.heal_failed_items_map.clone()
}
fn count_failed(&mut self, heal_type: HealItemType) {
pub fn count_failed(&mut self, heal_type: HealItemType) {
*self.heal_failed_items_map.entry(heal_type).or_insert(0) += 1;
self.last_heal_activity = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -168,7 +169,7 @@ impl HealSequence {
.as_secs();
}
fn count_scanned(&mut self, heal_type: HealItemType) {
pub fn count_scanned(&mut self, heal_type: HealItemType) {
*self.scanned_items_map.entry(heal_type).or_insert(0) += 1;
self.last_heal_activity = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -176,7 +177,7 @@ impl HealSequence {
.as_secs();
}
fn count_healed(&mut self, heal_type: HealItemType) {
pub fn count_healed(&mut self, heal_type: HealItemType) {
*self.healed_items_map.entry(heal_type).or_insert(0) += 1;
self.last_heal_activity = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -353,10 +354,25 @@ pub struct AllHealState {
}
impl AllHealState {
pub fn new(cleanup: bool) -> Self {
let hstate = AllHealState::default();
pub fn new(cleanup: bool) -> Arc<RwLock<Self>> {
let hstate = Arc::new(RwLock::new(AllHealState::default()));
let (_, mut rx) = broadcast::channel(1);
if cleanup {
// spawn(f);
let hstate_clone = hstate.clone();
tokio::spawn(async move {
loop {
select! {
result = rx.recv() =>{
if let Ok(true) = result {
return;
}
}
_ = sleep(Duration::from_secs(5 * 60)) => {
hstate_clone.write().await.periodic_heal_seqs_clean().await;
}
}
}
});
}
hstate
@@ -382,7 +398,7 @@ impl AllHealState {
});
}
async fn update_heal_status(&mut self, tracker: &HealingTracker) {
pub async fn update_heal_status(&mut self, tracker: &HealingTracker) {
let _ = self.mu.write().await;
let _ = tracker.mu.read().await;
@@ -427,30 +443,19 @@ impl AllHealState {
});
}
async fn periodic_heal_seqs_clean(&mut self, mut rx: Receiver<bool>) {
loop {
select! {
result = rx.recv() =>{
if let Ok(true) = result {
return;
}
}
_ = sleep(Duration::from_secs(5 * 60)) => {
let _ = self.mu.write().await;
let now = SystemTime::now();
let mut keys_to_reomve = Vec::new();
for (k, v) in self.heal_seq_map.iter() {
if v.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(v.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now {
keys_to_reomve.push(k.clone())
}
}
for key in keys_to_reomve.iter() {
self.heal_seq_map.remove(key);
}
}
async fn periodic_heal_seqs_clean(&mut self) {
let _ = self.mu.write().await;
let now = SystemTime::now();
let mut keys_to_reomve = Vec::new();
for (k, v) in self.heal_seq_map.iter() {
if v.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(v.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now {
keys_to_reomve.push(k.clone())
}
}
for key in keys_to_reomve.iter() {
self.heal_seq_map.remove(key);
}
}
async fn get_heal_sequence_by_token(&self, token: &str) -> (Option<HealSequence>, bool) {

View File

@@ -1,3 +1,7 @@
pub mod background_heal_ops;
pub mod data_scanner;
pub mod data_scanner_metric;
pub mod data_usage;
pub mod heal_commands;
pub mod heal_ops;
pub mod data_usage_cache;

View File

@@ -9,7 +9,7 @@ pub mod erasure;
pub mod error;
mod file_meta;
mod global;
mod heal;
pub mod heal;
pub mod peer;
mod quorum;
pub mod set_disk;

View File

@@ -10,14 +10,18 @@ use uuid::Uuid;
use crate::{
disk::{
error::DiskError,
format::{DistributionAlgoVersion, FormatV3},
DiskAPI, DiskStore,
new_disk, DiskInfo, DiskOption, DiskStore,
},
endpoints::PoolEndpoints,
endpoints::{Endpoints, PoolEndpoints},
error::{Error, Result},
global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
heal::{
heal_commands::{HealOpts, HealResultItem},
heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
DRIVE_STATE_OK, HEAL_ITEM_METADATA,
},
heal_ops::HealObjectFn,
},
set_disk::SetDisks,
@@ -26,7 +30,9 @@ use crate::{
ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions,
ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
},
utils::hash,
store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file},
utils::hash,,
};
use tokio::time::Duration;
@@ -489,13 +495,99 @@ impl StorageAPI for Sets {
async fn delete_bucket(&self, _bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
unimplemented!()
}
async fn heal_format(&self, dry_run: bool) -> Result<HealResultItem> {
unimplemented!()
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)> {
let (disks, _) = init_storage_disks_with_errors(
&self.endpoints.endpoints,
&DiskOption {
cleanup: false,
health_check: false,
},
)
.await;
let (formats, errs) = load_format_erasure_all(&disks, true).await;
if let Err(err) = check_format_erasure_values(&formats, self.set_drive_count) {
return Ok((HealResultItem::default(), Some(err)));
}
let ref_format = match get_format_erasure_in_quorum(&formats) {
Ok(format) => format,
Err(err) => return Ok((HealResultItem::default(), Some(err))),
};
let mut res = HealResultItem {
heal_item_type: HEAL_ITEM_METADATA.to_string(),
detail: "disk-format".to_string(),
disk_count: self.set_count * self.set_drive_count,
set_count: self.set_count,
..Default::default()
};
let before_derives = formats_to_drives_info(&self.endpoints.endpoints, &formats, &errs);
res.before = vec![HealDriveInfo::default(); before_derives.len()];
res.after = vec![HealDriveInfo::default(); before_derives.len()];
for v in before_derives.iter() {
res.before.push(v.clone());
res.after.push(v.clone());
}
if DiskError::UnformattedDisk.count_errs(&errs) == 0 {
return Ok((res, Some(Error::new(DiskError::NoHealRequired))));
}
if !self.format.eq(&ref_format) {
return Ok((res, Some(Error::new(DiskError::CorruptedFormat))));
}
let format_op_id = Uuid::new_v4().to_string();
let (new_format_sets, current_disks_info) = new_heal_format_sets(&ref_format, self.set_count, self.set_drive_count, &formats, &errs);
if !dry_run {
let mut tmp_new_formats = vec![None; self.set_count*self.set_drive_count];
for (i, set) in new_format_sets.iter().enumerate() {
for (j, fm) in set.iter().enumerate() {
if let Some(fm) = fm {
res.after[i*self.set_drive_count+j].uuid = fm.erasure.this.to_string();
res.after[i*self.set_drive_count+j].state = DRIVE_STATE_OK.to_string();
tmp_new_formats[i*self.set_drive_count+j] = Some(fm.clone());
}
}
}
// Save new formats `format.json` on unformatted disks.
for (fm, disk) in tmp_new_formats.iter_mut().zip(disks.iter()) {
if fm.is_some() && disk.is_some() {
if save_format_file(disk, fm, &format_op_id).await.is_err() {
let _ = disk.as_ref().unwrap().close().await;
*fm = None;
}
}
}
for (index, fm) in tmp_new_formats.iter().enumerate() {
if let Some(fm) = fm {
let (m, n) = match ref_format.find_disk_index_by_disk_id(fm.erasure.this) {
Ok((m, n)) => (m, n),
Err(_) => continue,
};
if let Some(set) = self.disk_set.get(m) {
if let Some(Some(disk)) = set.disks.read().await.get(n) {
let _ = disk.close().await;
}
}
if let Some(Some(disk)) = disks.get(index) {
self.disk_set[m].renew_disk(&disk.endpoint()).await;
}
}
}
}
Ok((res, None))
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
unimplemented!()
}
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<HealResultItem> {
async fn heal_object(
&self,
bucket: &str,
object: &str,
version_id: &str,
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)> {
self.get_disks_by_key(object)
.heal_object(bucket, object, version_id, opts)
.await
@@ -510,3 +602,119 @@ impl StorageAPI for Sets {
unimplemented!()
}
}
async fn close_storage_disks(disks: &[Option<DiskStore>]) {
let mut futures = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if let Some(disk) = disk {
let disk = disk.clone();
futures.push(tokio::spawn(async move {
let _ = disk.close().await;
}));
}
}
let _ = join_all(futures).await;
}
async fn init_storage_disks_with_errors(
endpoints: &Endpoints,
opts: &DiskOption,
) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
// Bootstrap disks.
let disks = Arc::new(RwLock::new(vec![None; endpoints.as_ref().len()]));
let errs = Arc::new(RwLock::new(vec![None; endpoints.as_ref().len()]));
let mut futures = Vec::with_capacity(endpoints.as_ref().len());
for (index, endpoint) in endpoints.as_ref().iter().enumerate() {
let ep = endpoint.clone();
let opt = opts.clone();
let disks_clone = disks.clone();
let errs_clone = errs.clone();
futures.push(tokio::spawn(async move {
match new_disk(&ep, &opt).await {
Ok(disk) => {
disks_clone.write().await[index] = Some(disk);
errs_clone.write().await[index] = None;
}
Err(err) => {
disks_clone.write().await[index] = None;
errs_clone.write().await[index] = Some(err);
}
}
}));
}
let _ = join_all(futures).await;
let disks = disks.read().await.clone();
let errs = errs.read().await.clone();
(disks, errs)
}
fn formats_to_drives_info(endpoints: &Endpoints, formats: &[Option<FormatV3>], errs: &[Option<Error>]) -> Vec<HealDriveInfo> {
let mut before_drives = Vec::with_capacity(endpoints.as_ref().len());
for (index, format) in formats.iter().enumerate() {
let drive = endpoints.get_string(index);
let mut state = if format.is_some() {
DRIVE_STATE_OK
} else {
if let Some(Some(err)) = errs.get(index) {
match err.downcast_ref::<DiskError>() {
Some(DiskError::UnformattedDisk) => DRIVE_STATE_MISSING,
Some(DiskError::DiskNotFound) => DRIVE_STATE_OFFLINE,
_ => DRIVE_STATE_CORRUPT,
};
}
DRIVE_STATE_CORRUPT
};
let uuid = if let Some(format) = format {
format.erasure.this.to_string()
} else {
"".to_string()
};
before_drives.push(HealDriveInfo {
uuid,
endpoint: drive,
state: state.to_string(),
});
}
before_drives
}
fn new_heal_format_sets(
ref_format: &FormatV3,
set_count: usize,
set_drive_count: usize,
formats: &[Option<FormatV3>],
errs: &[Option<Error>],
) -> (Vec<Vec<Option<FormatV3>>>, Vec<Vec<DiskInfo>>) {
let mut new_formats = vec![vec![None; set_drive_count]; set_count];
let mut current_disks_info = vec![vec![DiskInfo::default(); set_drive_count]; set_count];
for (i, set) in ref_format.erasure.sets.iter().enumerate() {
for (j, value) in set.iter().enumerate() {
if let Some(Some(err)) = errs.get(i*set_drive_count+j) {
match err.downcast_ref::<DiskError>() {
Some(DiskError::UnformattedDisk) => {
let mut fm = FormatV3::new(set_count, set_drive_count);
fm.id = ref_format.id;
fm.format = ref_format.format.clone();
fm.version = ref_format.version.clone();
fm.erasure.this = ref_format.erasure.sets[i][j];
fm.erasure.sets = ref_format.erasure.sets.clone();
fm.erasure.version = ref_format.erasure.version.clone();
fm.erasure.distribution_algo = ref_format.erasure.distribution_algo.clone();
new_formats[i][j] = Some(fm);
},
_ => {},
}
}
if let (Some(format), None) = (&formats[i*set_drive_count+j], &errs[i*set_drive_count+j]) {
if let Some(info) = &format.disk_info {
if !info.endpoint.is_empty() {
current_disks_info[i][j] = info.clone();
}
}
}
}
}
(new_formats, current_disks_info)
}

View File

@@ -10,7 +10,8 @@ use crate::global::{
is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION,
DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES,
};
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode};
use crate::heal::data_usage::DataUsageInfo;
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
use crate::heal::heal_ops::HealObjectFn;
use crate::new_object_layer_fn;
use crate::store_api::{ListMultipartsInfo, ObjectIO};
@@ -43,6 +44,7 @@ use http::HeaderMap;
use lazy_static::lazy_static;
use rand::Rng;
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
use tokio::sync::mpsc::Sender;
use std::cmp::Ordering;
use std::slice::Iter;
use std::{
@@ -52,10 +54,11 @@ use std::{
};
use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, RwLock, Semaphore};
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::heal::data_usage_cache::DataUsageCache;
const MAX_UPLOADS_LIST: usize = 10000;
@@ -488,6 +491,47 @@ impl ECStore {
internal_get_pool_info_existing_with_opts(&self.pools, bucket, object, opts).await
}
pub async fn ns_scanner(&self, updates: Sender<DataUsageInfo>, want_cycle: usize, heal_scan_mode: HealScanMode) -> Result<()> {
let all_buckets = self.list_bucket(&BucketOptions::default()).await?;
if all_buckets.is_empty() {
let _ = updates.send(DataUsageInfo::default()).await;
return Ok(());
}
let mut total_results = 0;
let mut result_index = 0;
self.pools.iter().for_each(|pool| {
total_results += pool.disk_set.len();
});
let mut results = Arc::new(RwLock::new(vec![DataUsageCache::default(); total_results]));
let mut futures = Vec::new();
for pool in self.pools.iter() {
for set in pool.disk_set.iter() {
let index = result_index;
let results_clone = results.clone();
futures.push(async move {
let (tx, mut rx) = mpsc::channel(100);
let task = tokio::spawn(async move {
loop {
match rx.recv().await {
Some(info) => {
results_clone.write().await[index] = info;
},
None => {
return ;
}
}
}
});
let _ = task.await;
});
result_index += 1;
}
}
Ok(())
}
async fn get_latest_object_info_with_idx(
&self,
bucket: &str,
@@ -1369,51 +1413,93 @@ impl StorageAPI for ECStore {
}
counts
}
async fn heal_format(&self, dry_run: bool) -> Result<HealResultItem> {
unimplemented!()
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)> {
let mut r = HealResultItem {
heal_item_type: HEAL_ITEM_METADATA.to_string(),
detail: "disk-format".to_string(),
..Default::default()
};
let mut count_no_heal = 0;
for pool in self.pools.iter() {
let (mut result, err) = pool.heal_format(dry_run).await?;
if let Some(err) = err {
match err.downcast_ref::<DiskError>() {
Some(DiskError::NoHealRequired) => {
count_no_heal += 1;
},
_ => {
continue;
}
}
}
r.disk_count += result.disk_count;
r.set_count += result.set_count;
r.before.append(&mut result.before);
r.after.append(&mut result.after);
}
if count_no_heal == self.pools.len() {
return Ok((r, Some(Error::new(DiskError::NoHealRequired))));
}
Ok((r, None))
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
unimplemented!()
}
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<HealResultItem> {
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<(HealResultItem, Option<Error>)> {
let object = utils::path::encode_dir_object(object);
let mut errs = HashMap::new();
let mut results = HashMap::new();
let errs = Arc::new(RwLock::new(vec![None; self.pools.len()]));
let results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.pools.len()]));
let mut futures = Vec::with_capacity(self.pools.len());
for (idx, pool) in self.pools.iter().enumerate() {
//TODO: IsSuspended
match pool.heal_object(bucket, &object, version_id, opts).await {
Ok(mut result) => {
result.object = utils::path::decode_dir_object(&result.object);
results.insert(idx, result);
let object = object.clone();
let results = results.clone();
let errs = errs.clone();
futures.push(async move {
match pool.heal_object(bucket, &object, version_id, opts).await {
Ok((mut result, err)) => {
result.object = utils::path::decode_dir_object(&result.object);
results.write().await.insert(idx, result);
errs.write().await.insert(idx, err);
}
Err(err) => {
errs.write().await.insert(idx, Some(err));
}
}
Err(err) => {
errs.insert(idx, err);
}
}
});
}
let _ = join_all(futures).await;
// Return the first nil error
for i in 0..self.pools.len() {
if !errs.contains_key(&i) {
return Ok(results.remove(&i).unwrap());
for (index, err) in errs.read().await.iter().enumerate() {
if err.is_none() {
return Ok((results.write().await.remove(index), None));
}
}
// No pool returned a nil error, return the first non 'not found' error
for (k, err) in errs.iter() {
match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => return Ok(results.remove(k).unwrap()),
for (index, err) in errs.read().await.iter().enumerate() {
match err {
Some(err) => match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => return Ok((results.write().await.remove(index), Some(err.clone()))),
},
None => {
return Ok((results.write().await.remove(index), None));
}
}
}
// At this stage, all errors are 'not found'
if !version_id.is_empty() {
return Err(Error::new(DiskError::FileVersionNotFound));
return Ok((HealResultItem::default(), Some(Error::new(DiskError::FileVersionNotFound))));
}
Err(Error::new(DiskError::FileNotFound))
Ok((HealResultItem::default(), Some(Error::new(DiskError::FileNotFound))))
}
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()> {
let mut first_err = None;

View File

@@ -22,6 +22,8 @@ pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
pub const BLOCK_SIZE_V2: usize = 1048576; // 1M
pub const RESERVED_METADATA_PREFIX: &str = "X-Rustfs-Internal-";
pub const RESERVED_METADATA_PREFIX_LOWER: &str = "X-Rustfs-Internal-";
pub const RUSTFS_HEALING: &str = "X-Rustfs-Internal-healing";
pub const RUSTFS_DATA_MOVE: &str = "X-Rustfs-Internal-data-mov";
// #[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
@@ -100,6 +102,12 @@ impl FileInfo {
false
}
pub fn inline_data(&self) -> bool {
self.metadata.as_ref().map_or(false, |metadata| {
metadata.contains_key(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER)) && !self.is_remote()
})
}
pub fn get_etag(&self) -> Option<String> {
if let Some(meta) = &self.metadata {
meta.get("etag").cloned()
@@ -237,6 +245,16 @@ impl FileInfo {
Err(Error::msg("part not found"))
}
pub fn set_healing(&mut self) {
if self.metadata.is_none() {
self.metadata = Some(HashMap::new());
}
if let Some(metadata) = self.metadata.as_mut() {
metadata.insert(RUSTFS_HEALING.to_string(), "true".to_string());
}
}
pub fn set_inline_data(&mut self) {
if let Some(meta) = self.metadata.as_mut() {
meta.insert("x-rustfs-inline-data".to_owned(), "true".to_owned());
@@ -899,9 +917,9 @@ pub trait StorageAPI: ObjectIO {
async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn delete_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
async fn heal_format(&self, dry_run: bool) -> Result<HealResultItem>;
async fn heal_format(&self, dry_run: bool) ->Result<(HealResultItem, Option<Error>)>;
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem>;
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<HealResultItem>;
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<(HealResultItem, Option<Error>)>;
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()>;
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)>;
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;

View File

@@ -2,12 +2,10 @@ use crate::config::{storageclass, KVS};
use crate::disk::DiskAPI;
use crate::{
disk::{
error::DiskError,
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET,
error::DiskError, format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, new_disk, DiskInfoOptions, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET
},
endpoints::Endpoints,
error::{Error, Result},
error::{Error, Result}, heal::heal_commands::init_healing_tracker,
};
use futures::future::join_all;
use std::{
@@ -114,7 +112,7 @@ fn init_format_erasure(
fms
}
fn get_format_erasure_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3> {
pub fn get_format_erasure_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3> {
let mut countmap = HashMap::new();
for f in formats.iter() {
@@ -148,7 +146,7 @@ fn get_format_erasure_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3
Ok(format)
}
fn check_format_erasure_values(
pub fn check_format_erasure_values(
formats: &[Option<FormatV3>],
// disks: &Vec<Option<DiskStore>>,
set_drive_count: usize,
@@ -184,7 +182,7 @@ fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
}
// load_format_erasure_all 读取所有foramt.json
async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
pub async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
@@ -220,7 +218,7 @@ async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Ve
(datas, errors)
}
pub async fn load_format_erasure(disk: &DiskStore, _heal: bool) -> Result<FormatV3, Error> {
pub async fn load_format_erasure(disk: &DiskStore, heal: bool) -> Result<FormatV3, Error> {
let data = disk
.read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await
@@ -231,9 +229,15 @@ pub async fn load_format_erasure(disk: &DiskStore, _heal: bool) -> Result<Format
None => e,
})?;
let fm = FormatV3::try_from(data.as_slice())?;
let mut fm = FormatV3::try_from(data.as_slice())?;
// TODO: heal
if heal {
let info = disk.disk_info(&DiskInfoOptions {
noop: heal,
..Default::default()
}).await?;
fm.disk_info = Some(info);
}
Ok(fm)
}
@@ -242,7 +246,7 @@ async fn save_format_file_all(disks: &[Option<DiskStore>], formats: &[Option<For
let mut futures = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
futures.push(save_format_file(disk, &formats[i]));
futures.push(save_format_file(disk, &formats[i], ""));
}
let mut errors = Vec::with_capacity(disks.len());
@@ -262,7 +266,7 @@ async fn save_format_file_all(disks: &[Option<DiskStore>], formats: &[Option<For
errors
}
async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -> Result<()> {
pub async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>, heal_id: &str) -> Result<()> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
@@ -281,6 +285,10 @@ async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -
.await?;
disk.set_disk_id(Some(format.erasure.this)).await?;
if !heal_id.is_empty() {
let mut ht = init_healing_tracker(disk.clone(), heal_id).await?;
return ht.save().await;
}
Ok(())
}

View File

@@ -0,0 +1,15 @@
use crate::error::{Error, Result};
pub fn parse_bool(str: &str) -> Result<bool> {
match str {
"1"| "t"| "T"| "true"| "TRUE"| "True"| "on"| "ON"| "On"| "enabled" => {
return Ok(true);
},
"0"| "f"| "F"| "false"| "FALSE"| "False"| "off"| "OFF"| "Off"| "disabled" => {
return Ok(false);
}
_ => {
return Err(Error::from_string(format!("ParseBool: parsing {}", str)));
}
}
}

View File

@@ -1,5 +1,6 @@
pub mod crypto;
pub mod ellipses;
pub mod bool_flag;
pub mod fs;
pub mod hash;
pub mod net;

View File

@@ -6,9 +6,12 @@ mod storage;
use clap::Parser;
use common::error::{Error, Result};
use ecstore::{
bucket::metadata_sys::init_bucket_metadata_sys,
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
set_global_endpoints,
store::{init_local_disks, ECStore},
store_api::{BucketOptions, StorageAPI},
update_erasure_type,
};
use grpc::make_server;
@@ -184,6 +187,8 @@ async fn run(opt: config::Opt) -> Result<()> {
.map_err(|err| Error::from_string(err.to_string()))?;
store.init().await.map_err(|err| Error::from_string(err.to_string()))?;
// init scanner
init_data_scanner().await;
tokio::select! {
_ = tokio::signal::ctrl_c() => {