heal admin api(3)

Signed-off-by: mujunxiang <1948535941@qq.com>
This commit is contained in:
mujunxiang
2024-11-30 15:27:21 +08:00
parent a127fafc26
commit 1f2cb78b9d
12 changed files with 571 additions and 1678 deletions

View File

@@ -1,10 +1,9 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -12,112 +11,114 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::mem;
use core::cmp::Ordering;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,7 @@ use std::{
collections::HashMap,
io::{Cursor, Read},
};
use tracing::error;
use tracing::{error, info};
use tokio::{
spawn,
@@ -219,7 +219,9 @@ pub fn bitrot_verify(
if algo != BitrotAlgorithm::HighwayHash256S {
let mut h = algo.new_hasher();
h.update(r.get_ref());
if h.finalize() != want {
let hash = h.finalize();
if hash != want {
info!("bitrot_verify except: {:?}, got: {:?}", want, hash);
return Err(Error::new(DiskError::FileCorrupt));
}
@@ -229,7 +231,11 @@ pub fn bitrot_verify(
let mut hash_buf = vec![0; h.size()];
let mut left = want_size;
if left != bitrot_shard_file_size(part_size, shard_size, algo) {
if left != bitrot_shard_file_size(part_size, shard_size, algo.clone()) {
info!(
"bitrot_shard_file_size failed, left: {}, part_size: {}, shard_size: {}, algo: {:?}",
left, part_size, shard_size, algo
);
return Err(Error::new(DiskError::FileCorrupt));
}
@@ -246,7 +252,9 @@ pub fn bitrot_verify(
let read = r.read(&mut buf)?;
h.update(buf);
left -= read;
let hash = h.clone().finalize();
if h.clone().finalize() != hash_buf[0..n] {
info!("bitrot_verify except: {:?}, got: {:?}", hash_buf[0..n].to_vec(), hash);
return Err(Error::new(DiskError::FileCorrupt));
}
}

View File

@@ -1375,6 +1375,7 @@ impl DiskAPI for LocalDisk {
let src_volume_dir = self.get_bucket_path(src_volume)?;
if !skip_access_checks(src_volume) {
if let Err(e) = utils::fs::access(&src_volume_dir).await {
info!("access checks failed, src_volume_dir: {:?}, err: {}", src_volume_dir, e.to_string());
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
}
}
@@ -1382,6 +1383,7 @@ impl DiskAPI for LocalDisk {
let dst_volume_dir = self.get_bucket_path(dst_volume)?;
if !skip_access_checks(dst_volume) {
if let Err(e) = utils::fs::access(&dst_volume_dir).await {
info!("access checks failed, dst_volume_dir: {:?}, err: {}", dst_volume_dir, e.to_string());
return Err(convert_access_error(e, DiskError::VolumeAccessDenied));
}
}
@@ -1430,6 +1432,7 @@ impl DiskAPI for LocalDisk {
return Err(os_err_to_file_err(e));
}
info!("read xl.meta failed, dst_file_path: {:?}, err: {:?}", dst_file_path, e);
None
}
};
@@ -1491,13 +1494,15 @@ impl DiskAPI for LocalDisk {
err
}
})?;
if let Some((src_data_path, dst_data_path)) = has_data_dir_path.as_ref() {
let no_inline = fi.data.is_none() && fi.size > 0;
if no_inline {
if let Err(err) = os::rename_all(&src_data_path, &dst_data_path, &skip_parent).await {
let _ = self.delete_file(&dst_volume_dir, dst_data_path, false, false).await;
info!(
"rename all failed src_data_path: {:?}, dst_data_path: {:?}, err: {:?}",
src_data_path, dst_data_path, err
);
return Err({
if let Some(e) = err.to_io_err() {
os_err_to_file_err(e)
@@ -1522,6 +1527,7 @@ impl DiskAPI for LocalDisk {
)
.await
{
info!("write_all_private failed err: {:?}", err);
return Err({
if let Some(e) = err.to_io_err() {
os_err_to_file_err(e)
@@ -1537,6 +1543,7 @@ impl DiskAPI for LocalDisk {
if let Some((_, dst_data_path)) = has_data_dir_path.as_ref() {
let _ = self.delete_file(&dst_volume_dir, dst_data_path, false, false).await;
}
info!("rename all failed err: {:?}", err);
return Err({
if let Some(e) = err.to_io_err() {
os_err_to_file_err(e)

View File

@@ -4,6 +4,7 @@ use std::{
};
use tokio::fs;
use tracing::info;
use crate::{
disk::error::{is_sys_err_not_dir, is_sys_err_path_not_found, os_is_not_exist},
@@ -138,7 +139,16 @@ pub async fn reliable_rename(
if let Some(parent) = dst_file_path.as_ref().parent() {
reliable_mkdir_all(parent, base_dir.as_ref()).await?;
}
// need remove dst path
if let Err(err) = utils::fs::remove_all(dst_file_path.as_ref()).await {
info!(
"reliable_rename rm dst failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
err
);
}
let mut i = 0;
loop {
if let Err(e) = utils::fs::rename(src_file_path.as_ref(), dst_file_path.as_ref()).await {
@@ -146,7 +156,13 @@ pub async fn reliable_rename(
i += 1;
continue;
}
info!(
"reliable_rename failed. src_file_path: {:?}, dst_file_path: {:?}, base_dir: {:?}, err: {:?}",
src_file_path.as_ref(),
dst_file_path.as_ref(),
base_dir.as_ref(),
e
);
return Err(e);
}

View File

@@ -11,6 +11,7 @@ use std::io::ErrorKind;
use tokio::io::DuplexStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::warn;
use tracing::{debug, info};
// use tracing::debug;
use uuid::Uuid;
@@ -425,6 +426,12 @@ impl Erasure {
total_length: usize,
_prefer: &[bool],
) -> Result<()> {
info!(
"Erasure heal, writers len: {}, readers len: {}, total_length: {}",
writers.len(),
readers.len(),
total_length
);
if writers.len() != self.parity_shards + self.data_shards {
return Err(Error::from_string("invalid argument"));
}
@@ -437,14 +444,14 @@ impl Erasure {
}
let mut errs = Vec::new();
for _ in start_block..=end_block {
for _ in start_block..end_block {
let mut bufs = reader.read().await?;
if self.parity_shards > 0 {
self.encoder.as_ref().unwrap().reconstruct(&mut bufs)?;
}
let shards = bufs.into_iter().flatten().collect::<Vec<_>>();
let shards: Vec<Vec<u8>> = bufs.into_iter().flatten().collect::<Vec<_>>();
if shards.len() != self.parity_shards + self.data_shards {
return Err(Error::from_string("can not reconstruct data"));
}
@@ -455,7 +462,10 @@ impl Erasure {
}
match w.as_mut().unwrap().write(shards[i].as_ref()).await {
Ok(_) => {}
Err(e) => errs.push(e),
Err(e) => {
info!("write failed, err: {:?}", e);
errs.push(e);
}
}
}
}

View File

@@ -412,7 +412,7 @@ impl HealRoutine {
None => {
info!("add_worker, tasks_rx was closed, return");
return;
},
}
}
}
}

View File

@@ -40,9 +40,9 @@ use std::{
use tokio::{
select, spawn,
sync::{
watch::{self, Receiver as W_Receiver, Sender as W_Sender},
broadcast::{self, Receiver, Sender},
broadcast,
mpsc::{self, Receiver as M_Receiver, Sender as M_Sender},
watch::{self, Receiver as W_Receiver, Sender as W_Sender},
RwLock,
},
time::{interval, sleep},
@@ -401,7 +401,7 @@ impl HealSequence {
let bucket = h.bucket.clone();
let task1 = Self::heal_disk_meta(h.clone());
let task2 = Self::heal_bucket(h.clone(), &bucket, buckets_only);
let results = join!(task1, task2);
let results = join!(task1, task2);
results.0?;
results.1?;
@@ -756,7 +756,7 @@ impl AllHealState {
let _ = self.mu.write().await;
for (k, v) in self.heal_seq_map.iter() {
if !v.has_ended().await && (has_profix(k, path_s) || has_profix(path_s, k)) {
if (has_profix(k, path_s) || has_profix(path_s, k)) && !v.has_ended().await {
return Err(Error::from_string(format!(
"The provided heal sequence path overlaps with an existing heal path: {}",
k

View File

@@ -102,8 +102,6 @@ impl S3PeerSys {
pool_errs.push(reduce_write_quorum_errs(&per_pool_errs, &bucket_op_ignored_errs(), qu));
}
error!("found pool errs: {:?}", pool_errs);
if !opts.recreate {
opts.remove = is_all_buckets_not_found(&pool_errs);
opts.recreate = !opts.remove;

View File

@@ -1968,10 +1968,10 @@ impl StorageAPI for ECStore {
Ok((mut result, err)) => {
result.object = utils::path::decode_dir_object(&result.object);
results.write().await.insert(idx, result);
errs.write().await.insert(idx, err);
errs.write().await[idx] = err;
}
Err(err) => {
errs.write().await.insert(idx, Some(err));
errs.write().await[idx] = Some(err);
}
}
});

View File

@@ -1022,14 +1022,8 @@ pub trait StorageAPI: ObjectIO {
version_id: &str,
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)>;
async fn heal_objects(
&self,
bucket: &str,
prefix: &str,
opts: &HealOpts,
hs: Arc<HealSequence>,
is_meta: bool,
) -> Result<()>;
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, hs: Arc<HealSequence>, is_meta: bool)
-> Result<()>;
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)>;
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;
}

View File

@@ -123,6 +123,15 @@ pub async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
}
}
pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
let meta = fs::metadata(path.as_ref()).await?;
if meta.is_dir() {
fs::remove_dir_all(path.as_ref()).await
} else {
fs::remove_file(path.as_ref()).await
}
}
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir(path.as_ref()).await
}