feat: 添加定时检测重连disk

This commit is contained in:
weisd
2024-09-20 16:45:17 +08:00
parent 2acc3e19ef
commit c0ff822fc6
4 changed files with 63 additions and 11 deletions

View File

@@ -182,6 +182,55 @@ impl FormatV3 {
Err(Error::msg(format!("disk id not found {}", disk_id)))
}
pub fn check_other(&self, other: &FormatV3) -> Result<()> {
let mut tmp = other.clone();
let this = tmp.erasure.this;
tmp.erasure.this = Uuid::nil();
if self.erasure.sets.len() != other.erasure.sets.len() {
return Err(Error::from_string(format!(
"Expected number of sets {}, got {}",
self.erasure.sets.len(),
other.erasure.sets.len()
)));
}
for i in 0..self.erasure.sets.len() {
if self.erasure.sets[i].len() != other.erasure.sets[i].len() {
return Err(Error::from_string(format!(
"Each set should be of same size, expected {}, got {}",
self.erasure.sets[i].len(),
other.erasure.sets[i].len()
)));
}
for j in 0..self.erasure.sets[i].len() {
if self.erasure.sets[i][j] != self.erasure.sets[i][j] {
return Err(Error::from_string(format!(
"UUID on positions {}:{} do not match with, expected {:?} got {:?}: (%w)",
i,
j,
self.erasure.sets[i][j].to_string(),
other.erasure.sets[i][j].to_string(),
)));
}
}
}
for i in 0..tmp.erasure.sets.len() {
for j in 0..tmp.erasure.sets[i].len() {
if this == tmp.erasure.sets[i][j] {
return Ok(());
}
}
}
Err(Error::msg(format!(
"DriveID {:?} not found in any drive sets {:?}",
this, other.erasure.sets
)))
}
}
#[cfg(test)]

View File

@@ -222,6 +222,7 @@ impl MetaCacheEntry {
}
}
#[derive(Debug, Default)]
pub struct DiskOption {
pub cleanup: bool,
pub health_check: bool,

View File

@@ -102,6 +102,7 @@ impl Sets {
set_index: i,
pool_index: pool_idx,
set_endpoints,
format: fm.clone(),
};
disk_set.push(Arc::new(set_disks));
@@ -159,6 +160,7 @@ impl Sets {
for set in self.disk_set.iter() {
set.connect_disks().await;
}
debug!("done connect_disks ...");
}
pub fn get_disks(&self, set_idx: usize) -> Arc<SetDisks> {

View File

@@ -202,14 +202,19 @@ pub fn default_partiy_count(drive: usize) -> usize {
// load_format_erasure_all 读取所有foramt.json
async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
for disk in disks.iter() {
futures.push(read_format_file(disk, heal));
}
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if disk.is_none() {
datas.push(None);
errors.push(Some(Error::new(DiskError::DiskNotFound)));
}
let disk = disk.as_ref().unwrap();
futures.push(load_format_erasure(disk, heal));
}
let results = join_all(futures).await;
let mut i = 0;
for result in results {
@@ -234,12 +239,7 @@ async fn load_format_erasure_all(disks: &[Option<DiskStore>], heal: bool) -> (Ve
(datas, errors)
}
async fn read_format_file(disk: &Option<DiskStore>, _heal: bool) -> Result<FormatV3, Error> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let disk = disk.as_ref().unwrap();
pub async fn load_format_erasure(disk: &DiskStore, _heal: bool) -> Result<FormatV3, Error> {
let data = disk
.read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await