add down object heal

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-12-17 15:51:46 +08:00
parent f902d4b1f6
commit fac0feeeaa
10 changed files with 3406 additions and 904 deletions

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -212,9 +212,9 @@ impl Erasure {
offset: usize,
length: usize,
total_length: usize,
) -> Result<usize> {
) -> (usize, Option<Error>) {
if length == 0 {
return Ok(0);
return (0, None);
}
let mut reader = ShardReader::new(readers, self, offset, total_length);
@@ -247,15 +247,24 @@ impl Erasure {
// debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length);
let mut bufs = reader.read().await?;
let mut bufs = match reader.read().await {
Ok(bufs) => bufs,
Err(err) => return (bytes_writed, Some(err)),
};
if self.parity_shards > 0 {
self.decode_data(&mut bufs)?;
if let Err(err) = self.decode_data(&mut bufs) {
return (bytes_writed, Some(err));
}
}
let writed_n = self
let writed_n = match self
.write_data_blocks(writer, bufs, self.data_shards, block_offset, block_length)
.await?;
.await
{
Ok(n) => n,
Err(err) => return (bytes_writed, Some(err)),
};
bytes_writed += writed_n;
@@ -264,10 +273,10 @@ impl Erasure {
if bytes_writed != length {
// debug!("bytes_writed != length: {} != {} ", bytes_writed, length);
return Err(Error::msg("erasure decode less data"));
return (bytes_writed, Some(Error::msg("erasure decode less data")));
}
Ok(bytes_writed)
(bytes_writed, None)
}
async fn write_data_blocks(

View File

@@ -6,6 +6,7 @@ use std::{
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::heal::mrf::MRFState;
use crate::{
disk::DiskStore,
endpoints::{EndpointServerPools, PoolEndpoints, SetupType},
@@ -34,6 +35,7 @@ lazy_static! {
pub static ref GLOBAL_BackgroundHealRoutine: Arc<HealRoutine> = HealRoutine::new();
pub static ref GLOBAL_BackgroundHealState: Arc<AllHealState> = AllHealState::new(false);
pub static ref GLOBAL_ALlHealState: Arc<AllHealState> = AllHealState::new(false);
pub static ref GLOBAL_MRFState: Arc<MRFState> = Arc::new(MRFState::new());
static ref globalDeploymentIDPtr: RwLock<Uuid> = RwLock::new(Uuid::nil());
}

View File

@@ -1,6 +1,7 @@
use madmin::heal_commands::HealResultItem;
use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration};
use tokio::{
spawn,
sync::{
mpsc::{self, Receiver, Sender},
RwLock,
@@ -14,7 +15,10 @@ use super::{
heal_commands::HealOpts,
heal_ops::{new_bg_heal_sequence, HealSequence},
};
use crate::global::GLOBAL_MRFState;
use crate::heal::error::ERR_RETRY_HEALING;
use crate::heal::heal_commands::{HealScanMode, HEAL_ITEM_BUCKET};
use crate::heal::heal_ops::{HealSource, BG_HEALING_UUID};
use crate::{
config::RUSTFS_CONFIG_PREFIX,
disk::{endpoint::Endpoint, error::DiskError, DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
@@ -42,18 +46,21 @@ pub async fn init_auto_heal() {
GLOBAL_BackgroundHealState
.push_heal_local_disks(&get_local_disks_to_heal().await)
.await;
tokio::spawn(async {
spawn(async {
monitor_local_disks_and_heal().await;
});
}
}
spawn(async {
GLOBAL_MRFState.heal_routine().await;
});
}
async fn init_background_healing() {
let bg_seq = Arc::new(new_bg_heal_sequence());
for _ in 0..GLOBAL_BackgroundHealRoutine.workers {
let bg_seq_clone = bg_seq.clone();
tokio::spawn(async {
spawn(async {
GLOBAL_BackgroundHealRoutine.add_worker(bg_seq_clone).await;
});
}
@@ -92,13 +99,14 @@ async fn monitor_local_disks_and_heal() {
interval.tick().await;
let heal_disks = GLOBAL_BackgroundHealState.get_heal_local_disk_endpoints().await;
if heal_disks.is_empty() {
info!("heal local disks is empty");
interval.reset();
continue;
}
let store = new_object_layer_fn().expect("errServerNotInitialized");
if let (_, Some(err)) = store.heal_format(false).await.expect("heal format failed") {
if let Some(DiskError::NoHealRequired) = err.downcast_ref() {
if let Some(DiskError::NoHealRequired) = err.downcast_ref::<DiskError>() {
} else {
info!("heal format err: {}", err.to_string());
interval.reset();
@@ -108,7 +116,7 @@ async fn monitor_local_disks_and_heal() {
for disk in heal_disks.into_ref().iter() {
let disk_clone = disk.clone();
tokio::spawn(async move {
spawn(async move {
GLOBAL_BackgroundHealState
.set_disk_healing_status(disk_clone.clone(), true)
.await;
@@ -420,3 +428,30 @@ async fn heal_disk_format(opts: HealOpts) -> Result<(HealResultItem, Option<Erro
}
Ok((res, err))
}
pub(crate) async fn heal_bucket(bucket: &str) -> Result<()> {
let (bg_seq, ok) = GLOBAL_BackgroundHealState.get_heal_sequence_by_token(BG_HEALING_UUID).await;
if ok {
// bg_seq must be Some when ok is true
return bg_seq
.unwrap()
.queue_heal_task(
HealSource {
bucket: bucket.to_string(),
..Default::default()
},
HEAL_ITEM_BUCKET.to_string(),
)
.await;
}
Ok(())
}
pub(crate) async fn heal_object(bucket: &str, object: &str, version_id: &str, scan_mode: HealScanMode) -> Result<()> {
let (bg_seq, ok) = GLOBAL_BackgroundHealState.get_heal_sequence_by_token(BG_HEALING_UUID).await;
if ok {
// bg_seq must be Some when ok is true
return HealSequence::heal_object(bg_seq.unwrap(), bucket, object, version_id, scan_mode).await;
}
Ok(())
}

View File

@@ -6,3 +6,4 @@ pub mod data_usage_cache;
pub mod error;
pub mod heal_commands;
pub mod heal_ops;
pub mod mrf;

124
ecstore/src/heal/mrf.rs Normal file
View File

@@ -0,0 +1,124 @@
use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::heal::background_heal_ops::{heal_bucket, heal_object};
use crate::heal::heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN};
use crate::utils::path::SLASH_SEPARATOR;
use chrono::{DateTime, TimeDelta, Utc};
use lazy_static::lazy_static;
use regex::Regex;
use std::ops::Sub;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::error;
use uuid::Uuid;
pub const MRF_OPS_QUEUE_SIZE: u64 = 100000;
pub const HEAL_DIR: &'static str = ".heal";
pub const HEAL_MRFMETA_FORMAT: u64 = 1;
pub const HEAL_MRFMETA_VERSION_V1: u64 = 1;
lazy_static! {
pub static ref HEAL_MRF_DIR: String =
format!("{}{}{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, HEAL_DIR, SLASH_SEPARATOR, "mrf");
static ref PATTERNS: Vec<Regex> = vec![
Regex::new(r"^buckets/.*/.metacache/.*").unwrap(),
Regex::new(r"^tmp/.*").unwrap(),
Regex::new(r"^multipart/.*").unwrap(),
Regex::new(r"^tmp-old/.*").unwrap(),
];
}
#[derive(Default)]
pub struct PartialOperation {
pub bucket: String,
pub object: String,
pub version_id: Option<String>,
pub versions: Vec<u8>,
pub set_index: usize,
pub pool_index: usize,
pub queued: DateTime<Utc>,
pub bitrot_scan: bool,
}
pub struct MRFState {
tx: Sender<PartialOperation>,
rx: RwLock<Receiver<PartialOperation>>,
closed: AtomicBool,
closing: AtomicBool,
}
impl MRFState {
pub fn new() -> MRFState {
let (tx, rx) = tokio::sync::mpsc::channel(MRF_OPS_QUEUE_SIZE as usize);
MRFState {
tx,
rx: RwLock::new(rx),
closed: Default::default(),
closing: Default::default(),
}
}
pub async fn add_partial(&self, op: PartialOperation) {
if self.closed.load(Ordering::SeqCst) || self.closing.load(Ordering::SeqCst) {
return;
}
let _ = self.tx.send(op).await;
}
pub async fn heal_routine(&self) {
loop {
// rx used only there,
if let Some(op) = self.rx.write().await.recv().await {
if op.bucket == RUSTFS_META_BUCKET {
for pattern in &*PATTERNS {
if pattern.is_match(&op.object) {
return;
}
}
}
let now = Utc::now();
if now.sub(op.queued).num_seconds() < 1 {
sleep(Duration::from_secs(1)).await;
}
let scan_mode = if op.bitrot_scan { HEAL_DEEP_SCAN } else { HEAL_NORMAL_SCAN };
if op.object.is_empty() {
if let Err(err) = heal_bucket(&op.bucket).await {
error!("heal bucket failed, bucket: {}, err: {:?}", op.bucket, err);
}
} else {
if op.versions.is_empty() {
if let Err(err) =
heal_object(&op.bucket, &op.object, &op.version_id.clone().unwrap_or_default(), scan_mode).await
{
error!("heal object failed, bucket: {}, object: {}, err: {:?}", op.bucket, op.object, err);
}
} else {
let vers = op.versions.len() / 16;
if vers > 0 {
for i in 0..vers {
let start = i * 16;
let end = start + 16;
if let Err(err) = heal_object(
&op.bucket,
&op.object,
&Uuid::from_slice(&op.versions[start..end]).expect("").to_string(),
scan_mode,
)
.await
{
error!("heal object failed, bucket: {}, object: {}, err: {:?}", op.bucket, op.object, err);
}
}
}
}
}
} else {
return;
}
}
}
}

View File

@@ -6,7 +6,10 @@ use std::{
time::Duration,
};
use crate::config::error::is_not_found;
use crate::global::GLOBAL_MRFState;
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::heal::mrf::PartialOperation;
use crate::{
bitrot::{bitrot_verify, close_bitrot_writers, new_bitrot_filereader, new_bitrot_filewriter, BitrotFileWriter},
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
@@ -56,6 +59,7 @@ use crate::{
heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING},
store_api::ListObjectVersionsInfo,
};
use chrono::Utc;
use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
@@ -1621,6 +1625,19 @@ impl SetDisks {
let (op_online_disks, mot_time, etag) = Self::list_online_disks(&disks, &parts_metadata, &errs, read_quorum as usize);
let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?;
if errs.iter().any(|err| err.is_some()) {
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: fi.volume.to_string(),
object: fi.name.to_string(),
queued: Utc::now(),
version_id: fi.version_id.map(|v| v.to_string()),
set_index: self.set_index,
pool_index: self.pool_index,
..Default::default()
})
.await;
}
// debug!("get_object_fileinfo pick fi {:?}", &fi);
// let online_disks: Vec<Option<DiskStore>> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect();
@@ -1639,6 +1656,8 @@ impl SetDisks {
fi: FileInfo,
files: Vec<FileInfo>,
disks: &[Option<DiskStore>],
set_index: usize,
pool_index: usize,
) -> Result<()> {
let (disks, files) = Self::shuffle_disks_and_parts_metadata_by_index(disks, &files, &fi);
@@ -1724,10 +1743,34 @@ impl SetDisks {
// "read part {} part_offset {},part_length {},part_size {} ",
// part_number, part_offset, part_length, part_size
// );
let _n = erasure
let (written, mut err) = erasure
.decode(writer, readers, part_offset as usize, part_length, part_size)
.await?;
.await;
if let Some(e) = err.as_ref() {
if written == part_length {
match e.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::FileCorrupt) => {
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: bucket.to_string(),
object: object.to_string(),
queued: Utc::now(),
version_id: fi.version_id.map(|v| v.to_string()),
set_index,
pool_index,
bitrot_scan: !is_not_found(e),
..Default::default()
})
.await;
err = None;
}
_ => {}
}
}
}
if let Some(err) = err {
return Err(err);
}
// debug!("ec decode {} writed size {}", part_number, n);
total_readed += part_length as i64;
@@ -3378,8 +3421,14 @@ impl ObjectIO for SetDisks {
// let disks = disks.clone();
let bucket = String::from(bucket);
let object = String::from(object);
let set_index = self.set_index.clone();
let pool_index = self.pool_index.clone();
tokio::spawn(async move {
if let Err(e) = Self::get_object_with_fileinfo(&bucket, &object, offset, length, &mut wd, fi, files, &disks).await {
if let Err(e) = Self::get_object_with_fileinfo(
&bucket, &object, offset, length, &mut wd, fi, files, &disks, set_index, pool_index,
)
.await
{
error!("get_object_with_fileinfo err {:?}", e);
};
});
@@ -4438,7 +4487,7 @@ impl StorageAPI for SetDisks {
}
}
let (online_disks, _, op_old_dir) = Self::rename_data(
let (online_disks, versions, op_old_dir) = Self::rename_data(
&shuffle_disks,
RUSTFS_META_MULTIPART_BUCKET,
&upload_id_path,
@@ -4467,6 +4516,19 @@ impl StorageAPI for SetDisks {
self.commit_rename_data_dir(&shuffle_disks, bucket, object, &old_dir.to_string(), write_quorum)
.await?;
}
if let Some(versions) = versions {
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: bucket.to_string(),
object: object.to_string(),
queued: Utc::now(),
versions,
set_index: self.set_index,
pool_index: self.pool_index,
..Default::default()
})
.await;
}
let _ = self.delete_all(RUSTFS_META_MULTIPART_BUCKET, &upload_id_path).await;

View File

@@ -671,8 +671,10 @@ impl ECStore {
want_cycle: usize,
heal_scan_mode: HealScanMode,
) -> Result<()> {
info!("ns_scanner updates - {}", want_cycle);
let all_buckets = self.list_bucket(&BucketOptions::default()).await?;
if all_buckets.is_empty() {
info!("No buckets found");
let _ = updates.send(DataUsageInfo::default()).await;
return Ok(());
}

View File

@@ -56,7 +56,7 @@ impl Action {
}
impl Action {
const S3_PREFIX: &str = "s3:";
const S3_PREFIX: &'static str = "s3:";
const ADMIN_PREFIX: &str = "admin:";
const STS_PREFIX: &str = "sts:";
const KMS_PREFIX: &str = "kms:";