mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
@@ -1,5 +1,9 @@
|
||||
use crate::{
|
||||
disk::{DiskAPI, DiskStore, MetaCacheEntries, MetaCacheEntry, WalkDirOptions},
|
||||
error::{Error, Result},
|
||||
};
|
||||
use futures::future::join_all;
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use tokio::{
|
||||
spawn,
|
||||
sync::{
|
||||
@@ -8,11 +12,7 @@ use tokio::{
|
||||
RwLock,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
disk::{DiskAPI, DiskStore, MetaCacheEntries, MetaCacheEntry, WalkDirOptions},
|
||||
error::{Error, Result},
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
type AgreedFn = Box<dyn Fn(MetaCacheEntry) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
|
||||
type PartialFn = Box<dyn Fn(MetaCacheEntries, &[Option<Error>]) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
|
||||
@@ -58,18 +58,20 @@ impl Clone for ListPathRawOptions {
|
||||
|
||||
pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -> Result<()> {
|
||||
if opts.disks.is_empty() {
|
||||
info!("list_path_raw 0 drives provided");
|
||||
return Err(Error::from_string("list_path_raw: 0 drives provided"));
|
||||
}
|
||||
|
||||
let mut readers = Vec::with_capacity(opts.disks.len());
|
||||
let fds = Arc::new(RwLock::new(opts.fallback_disks.clone()));
|
||||
let mut futures = Vec::with_capacity(opts.disks.len());
|
||||
for disk in opts.disks.iter() {
|
||||
let disk = disk.clone();
|
||||
let opts_clone = opts.clone();
|
||||
let fds_clone = fds.clone();
|
||||
let (m_tx, m_rx) = mpsc::channel::<MetaCacheEntry>(100);
|
||||
readers.push(m_rx);
|
||||
spawn(async move {
|
||||
futures.push(async move {
|
||||
let mut need_fallback = false;
|
||||
if disk.is_none() {
|
||||
need_fallback = true;
|
||||
@@ -136,21 +138,25 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
drop(m_tx);
|
||||
});
|
||||
}
|
||||
|
||||
let _ = join_all(futures).await;
|
||||
|
||||
let errs: Vec<Option<Error>> = vec![None; readers.len()];
|
||||
loop {
|
||||
let mut current = MetaCacheEntry::default();
|
||||
let (mut at_eof, mut has_err, mut agree) = (0, 0, 0);
|
||||
if rx.try_recv().is_ok() {
|
||||
info!("list_path_raw canceled");
|
||||
return Err(Error::from_string("canceled"));
|
||||
}
|
||||
let mut top_entries: Vec<MetaCacheEntry> = Vec::with_capacity(readers.len());
|
||||
let mut top_entries: Vec<MetaCacheEntry> = vec![MetaCacheEntry::default(); readers.len()];
|
||||
// top_entries.clear();
|
||||
|
||||
for (i, r) in readers.iter_mut().enumerate() {
|
||||
if errs[i].is_none() {
|
||||
if errs[i].is_some() {
|
||||
has_err += 1;
|
||||
continue;
|
||||
}
|
||||
@@ -163,20 +169,20 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
};
|
||||
// If no current, add it.
|
||||
if current.name.is_empty() {
|
||||
top_entries.insert(i, entry.clone());
|
||||
top_entries[i] = entry.clone();
|
||||
current = entry;
|
||||
agree += 1;
|
||||
continue;
|
||||
}
|
||||
// If exact match, we agree.
|
||||
if let Ok((_, true)) = current.matches(&entry, true) {
|
||||
top_entries.insert(i, entry);
|
||||
top_entries[i] = entry;
|
||||
agree += 1;
|
||||
continue;
|
||||
}
|
||||
// If only the name matches we didn't agree, but add it for resolution.
|
||||
if entry.name == current.name {
|
||||
top_entries.insert(i, entry);
|
||||
top_entries[i] = entry;
|
||||
continue;
|
||||
}
|
||||
// We got different entries
|
||||
@@ -185,9 +191,11 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
// We got a new, better current.
|
||||
// Clear existing entries.
|
||||
top_entries.clear();
|
||||
for i in 0..top_entries.len() {
|
||||
top_entries[i] = MetaCacheEntry::default();
|
||||
}
|
||||
agree += 1;
|
||||
top_entries.insert(i, entry.clone());
|
||||
top_entries[i] = entry.clone();
|
||||
current = entry;
|
||||
}
|
||||
|
||||
@@ -205,14 +213,16 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
|
||||
}
|
||||
_ => {}
|
||||
});
|
||||
|
||||
info!("list_path_raw failed, err: {:?}", combined_err);
|
||||
return Err(Error::from_string(combined_err.join(", ")));
|
||||
}
|
||||
|
||||
// Break if all at EOF or error.
|
||||
if at_eof + has_err == readers.len() && has_err > 0 {
|
||||
if at_eof + has_err == readers.len() {
|
||||
if let Some(finished_fn) = opts.finished.as_ref() {
|
||||
finished_fn(&errs).await;
|
||||
if has_err > 0 {
|
||||
finished_fn(&errs).await;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -762,7 +762,7 @@ impl DiskAPI for LocalDisk {
|
||||
self.endpoint.host_port()
|
||||
}
|
||||
async fn is_online(&self) -> bool {
|
||||
true
|
||||
self.check_format_json().await.is_ok()
|
||||
}
|
||||
|
||||
fn endpoint(&self) -> Endpoint {
|
||||
|
||||
@@ -212,9 +212,9 @@ impl Erasure {
|
||||
offset: usize,
|
||||
length: usize,
|
||||
total_length: usize,
|
||||
) -> Result<usize> {
|
||||
) -> (usize, Option<Error>) {
|
||||
if length == 0 {
|
||||
return Ok(0);
|
||||
return (0, None);
|
||||
}
|
||||
|
||||
let mut reader = ShardReader::new(readers, self, offset, total_length);
|
||||
@@ -247,15 +247,24 @@ impl Erasure {
|
||||
|
||||
// debug!("decode {} block_offset {},block_length {} ", block_idx, block_offset, block_length);
|
||||
|
||||
let mut bufs = reader.read().await?;
|
||||
let mut bufs = match reader.read().await {
|
||||
Ok(bufs) => bufs,
|
||||
Err(err) => return (bytes_writed, Some(err)),
|
||||
};
|
||||
|
||||
if self.parity_shards > 0 {
|
||||
self.decode_data(&mut bufs)?;
|
||||
if let Err(err) = self.decode_data(&mut bufs) {
|
||||
return (bytes_writed, Some(err));
|
||||
}
|
||||
}
|
||||
|
||||
let writed_n = self
|
||||
let writed_n = match self
|
||||
.write_data_blocks(writer, bufs, self.data_shards, block_offset, block_length)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(n) => n,
|
||||
Err(err) => return (bytes_writed, Some(err)),
|
||||
};
|
||||
|
||||
bytes_writed += writed_n;
|
||||
|
||||
@@ -264,10 +273,10 @@ impl Erasure {
|
||||
|
||||
if bytes_writed != length {
|
||||
// debug!("bytes_writed != length: {} != {} ", bytes_writed, length);
|
||||
return Err(Error::msg("erasure decode less data"));
|
||||
return (bytes_writed, Some(Error::msg("erasure decode less data")));
|
||||
}
|
||||
|
||||
Ok(bytes_writed)
|
||||
(bytes_writed, None)
|
||||
}
|
||||
|
||||
async fn write_data_blocks(
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::{
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::heal::mrf::MRFState;
|
||||
use crate::{
|
||||
disk::DiskStore,
|
||||
endpoints::{EndpointServerPools, PoolEndpoints, SetupType},
|
||||
@@ -34,6 +35,7 @@ lazy_static! {
|
||||
pub static ref GLOBAL_BackgroundHealRoutine: Arc<HealRoutine> = HealRoutine::new();
|
||||
pub static ref GLOBAL_BackgroundHealState: Arc<AllHealState> = AllHealState::new(false);
|
||||
pub static ref GLOBAL_ALlHealState: Arc<AllHealState> = AllHealState::new(false);
|
||||
pub static ref GLOBAL_MRFState: Arc<MRFState> = Arc::new(MRFState::new());
|
||||
static ref globalDeploymentIDPtr: RwLock<Uuid> = RwLock::new(Uuid::nil());
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use madmin::heal_commands::HealResultItem;
|
||||
use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
spawn,
|
||||
sync::{
|
||||
mpsc::{self, Receiver, Sender},
|
||||
RwLock,
|
||||
@@ -14,7 +15,10 @@ use super::{
|
||||
heal_commands::HealOpts,
|
||||
heal_ops::{new_bg_heal_sequence, HealSequence},
|
||||
};
|
||||
use crate::global::GLOBAL_MRFState;
|
||||
use crate::heal::error::ERR_RETRY_HEALING;
|
||||
use crate::heal::heal_commands::{HealScanMode, HEAL_ITEM_BUCKET};
|
||||
use crate::heal::heal_ops::{HealSource, BG_HEALING_UUID};
|
||||
use crate::{
|
||||
config::RUSTFS_CONFIG_PREFIX,
|
||||
disk::{endpoint::Endpoint, error::DiskError, DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
|
||||
@@ -36,24 +40,26 @@ pub static DEFAULT_MONITOR_NEW_DISK_INTERVAL: Duration = Duration::from_secs(10)
|
||||
|
||||
pub async fn init_auto_heal() {
|
||||
init_background_healing().await;
|
||||
if let Ok(v) = env::var("_RUSTFS_AUTO_DRIVE_HEALING") {
|
||||
if v == "on" {
|
||||
info!("start monitor local disks and heal");
|
||||
GLOBAL_BackgroundHealState
|
||||
.push_heal_local_disks(&get_local_disks_to_heal().await)
|
||||
.await;
|
||||
tokio::spawn(async {
|
||||
monitor_local_disks_and_heal().await;
|
||||
});
|
||||
}
|
||||
let v = env::var("_RUSTFS_AUTO_DRIVE_HEALING").unwrap_or("on".to_string());
|
||||
if v == "on" {
|
||||
info!("start monitor local disks and heal");
|
||||
GLOBAL_BackgroundHealState
|
||||
.push_heal_local_disks(&get_local_disks_to_heal().await)
|
||||
.await;
|
||||
spawn(async {
|
||||
monitor_local_disks_and_heal().await;
|
||||
});
|
||||
}
|
||||
spawn(async {
|
||||
GLOBAL_MRFState.heal_routine().await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn init_background_healing() {
|
||||
let bg_seq = Arc::new(new_bg_heal_sequence());
|
||||
for _ in 0..GLOBAL_BackgroundHealRoutine.workers {
|
||||
let bg_seq_clone = bg_seq.clone();
|
||||
tokio::spawn(async {
|
||||
spawn(async {
|
||||
GLOBAL_BackgroundHealRoutine.add_worker(bg_seq_clone).await;
|
||||
});
|
||||
}
|
||||
@@ -66,12 +72,14 @@ pub async fn get_local_disks_to_heal() -> Vec<Endpoint> {
|
||||
if let Some(disk) = disk {
|
||||
if let Err(err) = disk.disk_info(&DiskInfoOptions::default()).await {
|
||||
if let Some(DiskError::UnformattedDisk) = err.downcast_ref() {
|
||||
info!("get_local_disks_to_heal, disk is unformatted: {}", err);
|
||||
disks_to_heal.push(disk.endpoint());
|
||||
}
|
||||
}
|
||||
let h = disk.healing().await;
|
||||
if let Some(h) = h {
|
||||
if !h.finished {
|
||||
info!("get_local_disks_to_heal, disk healing not finished");
|
||||
disks_to_heal.push(disk.endpoint());
|
||||
}
|
||||
}
|
||||
@@ -92,13 +100,17 @@ async fn monitor_local_disks_and_heal() {
|
||||
interval.tick().await;
|
||||
let heal_disks = GLOBAL_BackgroundHealState.get_heal_local_disk_endpoints().await;
|
||||
if heal_disks.is_empty() {
|
||||
info!("heal local disks is empty");
|
||||
interval.reset();
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("heal local disks: {:?}", heal_disks);
|
||||
|
||||
let store = new_object_layer_fn().expect("errServerNotInitialized");
|
||||
if let (_, Some(err)) = store.heal_format(false).await.expect("heal format failed") {
|
||||
if let Some(DiskError::NoHealRequired) = err.downcast_ref() {
|
||||
if let (result, Some(err)) = store.heal_format(false).await.expect("heal format failed") {
|
||||
error!("heal local disk format error: {}", err);
|
||||
if let Some(DiskError::NoHealRequired) = err.downcast_ref::<DiskError>() {
|
||||
} else {
|
||||
info!("heal format err: {}", err.to_string());
|
||||
interval.reset();
|
||||
@@ -108,7 +120,7 @@ async fn monitor_local_disks_and_heal() {
|
||||
|
||||
for disk in heal_disks.into_ref().iter() {
|
||||
let disk_clone = disk.clone();
|
||||
tokio::spawn(async move {
|
||||
spawn(async move {
|
||||
GLOBAL_BackgroundHealState
|
||||
.set_disk_healing_status(disk_clone.clone(), true)
|
||||
.await;
|
||||
@@ -420,3 +432,30 @@ async fn heal_disk_format(opts: HealOpts) -> Result<(HealResultItem, Option<Erro
|
||||
}
|
||||
Ok((res, err))
|
||||
}
|
||||
|
||||
pub(crate) async fn heal_bucket(bucket: &str) -> Result<()> {
|
||||
let (bg_seq, ok) = GLOBAL_BackgroundHealState.get_heal_sequence_by_token(BG_HEALING_UUID).await;
|
||||
if ok {
|
||||
// bg_seq must be Some when ok is true
|
||||
return bg_seq
|
||||
.unwrap()
|
||||
.queue_heal_task(
|
||||
HealSource {
|
||||
bucket: bucket.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
HEAL_ITEM_BUCKET.to_string(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn heal_object(bucket: &str, object: &str, version_id: &str, scan_mode: HealScanMode) -> Result<()> {
|
||||
let (bg_seq, ok) = GLOBAL_BackgroundHealState.get_heal_sequence_by_token(BG_HEALING_UUID).await;
|
||||
if ok {
|
||||
// bg_seq must be Some when ok is true
|
||||
return HealSequence::heal_object(bg_seq.unwrap(), bucket, object, version_id, scan_mode).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -388,6 +388,7 @@ pub async fn load_healing_tracker(disk: &Option<DiskStore>) -> Result<HealingTra
|
||||
)));
|
||||
}
|
||||
healing_tracker.id = disk_id;
|
||||
healing_tracker.disk = Some(disk.clone());
|
||||
Ok(healing_tracker)
|
||||
} else {
|
||||
Err(Error::from_string("loadHealingTracker: disk not have id"))
|
||||
@@ -400,7 +401,10 @@ pub async fn load_healing_tracker(disk: &Option<DiskStore>) -> Result<HealingTra
|
||||
pub async fn init_healing_tracker(disk: DiskStore, heal_id: &str) -> Result<HealingTracker> {
|
||||
let disk_location = disk.get_disk_location();
|
||||
Ok(HealingTracker {
|
||||
id: disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string()),
|
||||
id: disk
|
||||
.get_disk_id()
|
||||
.await
|
||||
.map_or("".to_string(), |id| id.map_or("".to_string(), |id| id.to_string())),
|
||||
heal_id: heal_id.to_string(),
|
||||
path: disk.to_string(),
|
||||
endpoint: disk.endpoint().to_string(),
|
||||
|
||||
@@ -6,3 +6,4 @@ pub mod data_usage_cache;
|
||||
pub mod error;
|
||||
pub mod heal_commands;
|
||||
pub mod heal_ops;
|
||||
pub mod mrf;
|
||||
|
||||
124
ecstore/src/heal/mrf.rs
Normal file
124
ecstore/src/heal/mrf.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
|
||||
use crate::heal::background_heal_ops::{heal_bucket, heal_object};
|
||||
use crate::heal::heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN};
|
||||
use crate::utils::path::SLASH_SEPARATOR;
|
||||
use chrono::{DateTime, TimeDelta, Utc};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use std::ops::Sub;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::sleep;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub const MRF_OPS_QUEUE_SIZE: u64 = 100000;
|
||||
pub const HEAL_DIR: &'static str = ".heal";
|
||||
pub const HEAL_MRFMETA_FORMAT: u64 = 1;
|
||||
pub const HEAL_MRFMETA_VERSION_V1: u64 = 1;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref HEAL_MRF_DIR: String =
|
||||
format!("{}{}{}{}{}", BUCKET_META_PREFIX, SLASH_SEPARATOR, HEAL_DIR, SLASH_SEPARATOR, "mrf");
|
||||
static ref PATTERNS: Vec<Regex> = vec![
|
||||
Regex::new(r"^buckets/.*/.metacache/.*").unwrap(),
|
||||
Regex::new(r"^tmp/.*").unwrap(),
|
||||
Regex::new(r"^multipart/.*").unwrap(),
|
||||
Regex::new(r"^tmp-old/.*").unwrap(),
|
||||
];
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct PartialOperation {
|
||||
pub bucket: String,
|
||||
pub object: String,
|
||||
pub version_id: Option<String>,
|
||||
pub versions: Vec<u8>,
|
||||
pub set_index: usize,
|
||||
pub pool_index: usize,
|
||||
pub queued: DateTime<Utc>,
|
||||
pub bitrot_scan: bool,
|
||||
}
|
||||
|
||||
pub struct MRFState {
|
||||
tx: Sender<PartialOperation>,
|
||||
rx: RwLock<Receiver<PartialOperation>>,
|
||||
closed: AtomicBool,
|
||||
closing: AtomicBool,
|
||||
}
|
||||
|
||||
impl MRFState {
|
||||
pub fn new() -> MRFState {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(MRF_OPS_QUEUE_SIZE as usize);
|
||||
MRFState {
|
||||
tx,
|
||||
rx: RwLock::new(rx),
|
||||
closed: Default::default(),
|
||||
closing: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_partial(&self, op: PartialOperation) {
|
||||
if self.closed.load(Ordering::SeqCst) || self.closing.load(Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
let _ = self.tx.send(op).await;
|
||||
}
|
||||
|
||||
pub async fn heal_routine(&self) {
|
||||
loop {
|
||||
// rx used only there,
|
||||
if let Some(op) = self.rx.write().await.recv().await {
|
||||
if op.bucket == RUSTFS_META_BUCKET {
|
||||
for pattern in &*PATTERNS {
|
||||
if pattern.is_match(&op.object) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
if now.sub(op.queued).num_seconds() < 1 {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
let scan_mode = if op.bitrot_scan { HEAL_DEEP_SCAN } else { HEAL_NORMAL_SCAN };
|
||||
if op.object.is_empty() {
|
||||
if let Err(err) = heal_bucket(&op.bucket).await {
|
||||
error!("heal bucket failed, bucket: {}, err: {:?}", op.bucket, err);
|
||||
}
|
||||
} else {
|
||||
if op.versions.is_empty() {
|
||||
if let Err(err) =
|
||||
heal_object(&op.bucket, &op.object, &op.version_id.clone().unwrap_or_default(), scan_mode).await
|
||||
{
|
||||
error!("heal object failed, bucket: {}, object: {}, err: {:?}", op.bucket, op.object, err);
|
||||
}
|
||||
} else {
|
||||
let vers = op.versions.len() / 16;
|
||||
if vers > 0 {
|
||||
for i in 0..vers {
|
||||
let start = i * 16;
|
||||
let end = start + 16;
|
||||
if let Err(err) = heal_object(
|
||||
&op.bucket,
|
||||
&op.object,
|
||||
&Uuid::from_slice(&op.versions[start..end]).expect("").to_string(),
|
||||
scan_mode,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("heal object failed, bucket: {}, object: {}, err: {:?}", op.bucket, op.object, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,10 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::config::error::is_not_found;
|
||||
use crate::global::GLOBAL_MRFState;
|
||||
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
|
||||
use crate::heal::mrf::PartialOperation;
|
||||
use crate::{
|
||||
bitrot::{bitrot_verify, close_bitrot_writers, new_bitrot_filereader, new_bitrot_filewriter, BitrotFileWriter},
|
||||
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
|
||||
@@ -56,6 +59,7 @@ use crate::{
|
||||
heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING},
|
||||
store_api::ListObjectVersionsInfo,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use futures::future::join_all;
|
||||
use glob::Pattern;
|
||||
use http::HeaderMap;
|
||||
@@ -1234,6 +1238,7 @@ impl SetDisks {
|
||||
for (i, opdisk) in disks.iter().enumerate() {
|
||||
if let Some(disk) = opdisk {
|
||||
if disk.is_online().await && disk.get_disk_location().set_idx.is_some() {
|
||||
info!("Disk {:?} is online", disk);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1241,6 +1246,7 @@ impl SetDisks {
|
||||
}
|
||||
|
||||
if let Some(endpoint) = self.set_endpoints.get(i) {
|
||||
info!("will renew disk, opdisk: {:?}", opdisk);
|
||||
self.renew_disk(endpoint).await;
|
||||
}
|
||||
}
|
||||
@@ -1254,12 +1260,20 @@ impl SetDisks {
|
||||
Err(e) => {
|
||||
warn!("connect_endpoint err {:?}", &e);
|
||||
if ep.is_local && DiskError::UnformattedDisk.is(&e) {
|
||||
// TODO: pushHealLocalDisks
|
||||
GLOBAL_BackgroundHealState.push_heal_local_disks(&[ep.clone()]).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if new_disk.is_local() {
|
||||
if let Some(h) = new_disk.healing().await {
|
||||
if !h.finished {
|
||||
GLOBAL_BackgroundHealState.push_heal_local_disks(&[new_disk.endpoint()]).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (set_idx, disk_idx) = match self.find_disk_index(&fm) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
@@ -1621,6 +1635,19 @@ impl SetDisks {
|
||||
let (op_online_disks, mot_time, etag) = Self::list_online_disks(&disks, &parts_metadata, &errs, read_quorum as usize);
|
||||
|
||||
let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?;
|
||||
if errs.iter().any(|err| err.is_some()) {
|
||||
GLOBAL_MRFState
|
||||
.add_partial(PartialOperation {
|
||||
bucket: fi.volume.to_string(),
|
||||
object: fi.name.to_string(),
|
||||
queued: Utc::now(),
|
||||
version_id: fi.version_id.map(|v| v.to_string()),
|
||||
set_index: self.set_index,
|
||||
pool_index: self.pool_index,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
// debug!("get_object_fileinfo pick fi {:?}", &fi);
|
||||
|
||||
// let online_disks: Vec<Option<DiskStore>> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect();
|
||||
@@ -1639,6 +1666,8 @@ impl SetDisks {
|
||||
fi: FileInfo,
|
||||
files: Vec<FileInfo>,
|
||||
disks: &[Option<DiskStore>],
|
||||
set_index: usize,
|
||||
pool_index: usize,
|
||||
) -> Result<()> {
|
||||
let (disks, files) = Self::shuffle_disks_and_parts_metadata_by_index(disks, &files, &fi);
|
||||
|
||||
@@ -1724,10 +1753,34 @@ impl SetDisks {
|
||||
// "read part {} part_offset {},part_length {},part_size {} ",
|
||||
// part_number, part_offset, part_length, part_size
|
||||
// );
|
||||
let _n = erasure
|
||||
let (written, mut err) = erasure
|
||||
.decode(writer, readers, part_offset as usize, part_length, part_size)
|
||||
.await?;
|
||||
|
||||
.await;
|
||||
if let Some(e) = err.as_ref() {
|
||||
if written == part_length {
|
||||
match e.downcast_ref::<DiskError>() {
|
||||
Some(DiskError::FileNotFound) | Some(DiskError::FileCorrupt) => {
|
||||
GLOBAL_MRFState
|
||||
.add_partial(PartialOperation {
|
||||
bucket: bucket.to_string(),
|
||||
object: object.to_string(),
|
||||
queued: Utc::now(),
|
||||
version_id: fi.version_id.map(|v| v.to_string()),
|
||||
set_index,
|
||||
pool_index,
|
||||
bitrot_scan: !is_not_found(e),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
err = None;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(err) = err {
|
||||
return Err(err);
|
||||
}
|
||||
// debug!("ec decode {} writed size {}", part_number, n);
|
||||
|
||||
total_readed += part_length as i64;
|
||||
@@ -2850,7 +2903,6 @@ impl SetDisks {
|
||||
info!("ns_scanner start");
|
||||
let _ = join_all(futures).await;
|
||||
drop(buckets_results_tx);
|
||||
info!("1");
|
||||
let _ = task.await;
|
||||
info!("ns_scanner completed");
|
||||
Ok(())
|
||||
@@ -2979,6 +3031,7 @@ impl SetDisks {
|
||||
let mut ret_err = None;
|
||||
for bucket in buckets.iter() {
|
||||
if tracker.read().await.is_healed(bucket).await {
|
||||
info!("bucket{} was healed", bucket);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -3099,6 +3152,7 @@ impl SetDisks {
|
||||
let bg_seq_clone = bg_seq.clone();
|
||||
let send_clone = send.clone();
|
||||
let heal_entry = Arc::new(move |bucket: String, entry: MetaCacheEntry| {
|
||||
info!("heal entry, bucket: {}, entry: {:?}", bucket, entry);
|
||||
let jt_clone = jt_clone.clone();
|
||||
let self_clone = self_clone.clone();
|
||||
let started = started_clone;
|
||||
@@ -3378,8 +3432,14 @@ impl ObjectIO for SetDisks {
|
||||
// let disks = disks.clone();
|
||||
let bucket = String::from(bucket);
|
||||
let object = String::from(object);
|
||||
let set_index = self.set_index.clone();
|
||||
let pool_index = self.pool_index.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = Self::get_object_with_fileinfo(&bucket, &object, offset, length, &mut wd, fi, files, &disks).await {
|
||||
if let Err(e) = Self::get_object_with_fileinfo(
|
||||
&bucket, &object, offset, length, &mut wd, fi, files, &disks, set_index, pool_index,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("get_object_with_fileinfo err {:?}", e);
|
||||
};
|
||||
});
|
||||
@@ -4438,7 +4498,7 @@ impl StorageAPI for SetDisks {
|
||||
}
|
||||
}
|
||||
|
||||
let (online_disks, _, op_old_dir) = Self::rename_data(
|
||||
let (online_disks, versions, op_old_dir) = Self::rename_data(
|
||||
&shuffle_disks,
|
||||
RUSTFS_META_MULTIPART_BUCKET,
|
||||
&upload_id_path,
|
||||
@@ -4467,6 +4527,19 @@ impl StorageAPI for SetDisks {
|
||||
self.commit_rename_data_dir(&shuffle_disks, bucket, object, &old_dir.to_string(), write_quorum)
|
||||
.await?;
|
||||
}
|
||||
if let Some(versions) = versions {
|
||||
GLOBAL_MRFState
|
||||
.add_partial(PartialOperation {
|
||||
bucket: bucket.to_string(),
|
||||
object: object.to_string(),
|
||||
queued: Utc::now(),
|
||||
versions,
|
||||
set_index: self.set_index,
|
||||
pool_index: self.pool_index,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
let _ = self.delete_all(RUSTFS_META_MULTIPART_BUCKET, &upload_id_path).await;
|
||||
|
||||
|
||||
@@ -589,6 +589,7 @@ impl StorageAPI for Sets {
|
||||
.await;
|
||||
let (formats, errs) = load_format_erasure_all(&disks, true).await;
|
||||
if let Err(err) = check_format_erasure_values(&formats, self.set_drive_count) {
|
||||
info!("failed to check formats erasure values: {}", err);
|
||||
return Ok((HealResultItem::default(), Some(err)));
|
||||
}
|
||||
let ref_format = match get_format_erasure_in_quorum(&formats) {
|
||||
@@ -611,12 +612,14 @@ impl StorageAPI for Sets {
|
||||
res.after.drives.push(v.clone());
|
||||
}
|
||||
if DiskError::UnformattedDisk.count_errs(&errs) == 0 {
|
||||
info!("disk formats success, NoHealRequired, errs: {:?}", errs);
|
||||
return Ok((res, Some(Error::new(DiskError::NoHealRequired))));
|
||||
}
|
||||
|
||||
if !self.format.eq(&ref_format) {
|
||||
return Ok((res, Some(Error::new(DiskError::CorruptedFormat))));
|
||||
}
|
||||
// if !self.format.eq(&ref_format) {
|
||||
// info!("format ({:?}) not eq ref_format ({:?})", self.format, ref_format);
|
||||
// return Ok((res, Some(Error::new(DiskError::CorruptedFormat))));
|
||||
// }
|
||||
|
||||
let format_op_id = Uuid::new_v4().to_string();
|
||||
let (new_format_sets, _) = new_heal_format_sets(&ref_format, self.set_count, self.set_drive_count, &formats, &errs);
|
||||
|
||||
@@ -671,8 +671,10 @@ impl ECStore {
|
||||
want_cycle: usize,
|
||||
heal_scan_mode: HealScanMode,
|
||||
) -> Result<()> {
|
||||
info!("ns_scanner updates - {}", want_cycle);
|
||||
let all_buckets = self.list_bucket(&BucketOptions::default()).await?;
|
||||
if all_buckets.is_empty() {
|
||||
info!("No buckets found");
|
||||
let _ = updates.send(DataUsageInfo::default()).await;
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1907,6 +1909,7 @@ impl StorageAPI for ECStore {
|
||||
counts
|
||||
}
|
||||
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)> {
|
||||
info!("heal_format");
|
||||
let mut r = HealResultItem {
|
||||
heal_item_type: HEAL_ITEM_METADATA.to_string(),
|
||||
detail: "disk-format".to_string(),
|
||||
@@ -1932,8 +1935,10 @@ impl StorageAPI for ECStore {
|
||||
r.after.drives.append(&mut result.after.drives);
|
||||
}
|
||||
if count_no_heal == self.pools.len() {
|
||||
info!("heal format success, NoHealRequired");
|
||||
return Ok((r, Some(Error::new(DiskError::NoHealRequired))));
|
||||
}
|
||||
info!("heal format success result: {:?}", r);
|
||||
Ok((r, None))
|
||||
}
|
||||
|
||||
|
||||
@@ -16,8 +16,7 @@ use std::{
|
||||
fmt::Debug,
|
||||
};
|
||||
|
||||
use crate::config::error::ConfigError;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
|
||||
|
||||
@@ -56,7 +56,7 @@ impl Action {
|
||||
}
|
||||
|
||||
impl Action {
|
||||
const S3_PREFIX: &str = "s3:";
|
||||
const S3_PREFIX: &'static str = "s3:";
|
||||
const ADMIN_PREFIX: &str = "admin:";
|
||||
const STS_PREFIX: &str = "sts:";
|
||||
const KMS_PREFIX: &str = "kms:";
|
||||
|
||||
Reference in New Issue
Block a user