mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -336,12 +336,6 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
|
||||
|
||||
[[package]]
|
||||
name = "bytesize"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
|
||||
|
||||
[[package]]
|
||||
name = "bytestring"
|
||||
version = "1.3.1"
|
||||
|
||||
@@ -23,6 +23,7 @@ version = "0.0.1"
|
||||
async-trait = "0.1.83"
|
||||
backon = "1.2.0"
|
||||
bytes = "1.8.0"
|
||||
bytesize = "1.3.0"
|
||||
clap = { version = "4.5.20", features = ["derive"] }
|
||||
ecstore = { path = "./ecstore" }
|
||||
flatbuffers = "24.3.25"
|
||||
@@ -84,4 +85,3 @@ uuid = { version = "1.11.0", features = [
|
||||
log = "0.4.22"
|
||||
axum = "0.7.7"
|
||||
md-5 = "0.10.6"
|
||||
bytesize = "1.3.0"
|
||||
|
||||
@@ -2,7 +2,8 @@ pub mod error;
|
||||
pub mod globals;
|
||||
|
||||
/// Defers evaluation of a block of code until the end of the scope.
|
||||
#[macro_export] macro_rules! defer {
|
||||
#[macro_export]
|
||||
macro_rules! defer {
|
||||
($($body:tt)*) => {
|
||||
let _guard = {
|
||||
pub struct Guard<F: FnOnce()>(Option<F>);
|
||||
|
||||
@@ -13,7 +13,6 @@ async-trait.workspace = true
|
||||
backon.workspace = true
|
||||
blake2 = "0.10.6"
|
||||
bytes.workspace = true
|
||||
bytesize = "1.3.0"
|
||||
common.workspace = true
|
||||
reader.workspace = true
|
||||
glob = "0.3.1"
|
||||
@@ -42,7 +41,6 @@ protos.workspace = true
|
||||
rmp-serde = "1.3.0"
|
||||
tokio-util = { version = "0.7.12", features = ["io", "compat"] }
|
||||
crc32fast = "1.4.2"
|
||||
rand = "0.8.5"
|
||||
siphasher = "1.0.1"
|
||||
base64-simd = "0.8.0"
|
||||
sha2 = { version = "0.11.0-pre.4" }
|
||||
|
||||
@@ -14,7 +14,7 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json";
|
||||
const STORAGE_FORMAT_FILE: &str = "xl.meta";
|
||||
|
||||
use crate::{
|
||||
erasure::{ReadAt, Writer},
|
||||
erasure::Writer,
|
||||
error::{Error, Result},
|
||||
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion},
|
||||
heal::{
|
||||
@@ -342,6 +342,18 @@ impl DiskAPI for Disk {
|
||||
Disk::Remote(remote_disk) => remote_disk.disk_info(opts).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn ns_scanner(
|
||||
&self,
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<DataUsageCache> {
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await,
|
||||
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskStore> {
|
||||
|
||||
@@ -21,7 +21,13 @@ use super::{
|
||||
RemoteFileWriter, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
|
||||
};
|
||||
use crate::{
|
||||
disk::error::DiskError, error::{Error, Result}, heal::{data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::HealScanMode}, store_api::{FileInfo, RawFileInfo}
|
||||
disk::error::DiskError,
|
||||
error::{Error, Result},
|
||||
heal::{
|
||||
data_usage_cache::{DataUsageCache, DataUsageEntry},
|
||||
heal_commands::HealScanMode,
|
||||
},
|
||||
store_api::{FileInfo, RawFileInfo},
|
||||
};
|
||||
use protos::proto_gen::node_service::RenamePartRequst;
|
||||
|
||||
|
||||
@@ -419,7 +419,13 @@ impl Erasure {
|
||||
till_offset
|
||||
}
|
||||
|
||||
pub async fn heal(&self, writers: &mut [Option<BitrotWriter>], readers: Vec<Option<BitrotReader>>, total_length: usize, prefer: &[bool]) -> Result<()> {
|
||||
pub async fn heal(
|
||||
&self,
|
||||
writers: &mut [Option<BitrotWriter>],
|
||||
readers: Vec<Option<BitrotReader>>,
|
||||
total_length: usize,
|
||||
prefer: &[bool],
|
||||
) -> Result<()> {
|
||||
if writers.len() != self.parity_shards + self.data_shards {
|
||||
return Err(Error::from_string("invalid argument"));
|
||||
}
|
||||
@@ -451,7 +457,7 @@ impl Erasure {
|
||||
continue;
|
||||
}
|
||||
match w.as_mut().unwrap().write(shards[i].as_ref()).await {
|
||||
Ok(_) => {},
|
||||
Ok(_) => {}
|
||||
Err(e) => errs.push(e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@ use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
disk::DiskStore, endpoints::{EndpointServerPools, PoolEndpoints, SetupType}, heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState}, store::ECStore
|
||||
disk::DiskStore,
|
||||
endpoints::{EndpointServerPools, PoolEndpoints, SetupType},
|
||||
heal::{background_heal_ops::HealRoutine, heal_ops::AllHealState},
|
||||
store::ECStore,
|
||||
};
|
||||
|
||||
pub const DISK_ASSUME_UNKNOWN_SIZE: u64 = 1 << 30;
|
||||
|
||||
@@ -4,12 +4,18 @@ use tokio::{
|
||||
select,
|
||||
sync::{
|
||||
broadcast::Receiver as B_Receiver,
|
||||
mpsc::{self, Receiver, Sender}, RwLock,
|
||||
mpsc::{self, Receiver, Sender},
|
||||
RwLock,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
disk::error::DiskError, error::{Error, Result}, heal::heal_ops::NOP_HEAL, new_object_layer_fn, store_api::StorageAPI, utils::path::SLASH_SEPARATOR
|
||||
disk::error::DiskError,
|
||||
error::{Error, Result},
|
||||
heal::heal_ops::NOP_HEAL,
|
||||
new_object_layer_fn,
|
||||
store_api::StorageAPI,
|
||||
utils::path::SLASH_SEPARATOR,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -148,7 +154,7 @@ async fn heal_disk_format(opts: HealOpts) -> Result<(HealResultItem, Option<Erro
|
||||
let store = lock.as_ref().expect("Not init");
|
||||
let (res, err) = store.heal_format(opts.dry_run).await?;
|
||||
// return any error, ignore error returned when disks have
|
||||
// already healed.
|
||||
// already healed.
|
||||
if err.is_some() {
|
||||
return Ok((HealResultItem::default(), err));
|
||||
}
|
||||
|
||||
@@ -858,7 +858,8 @@ impl FolderScanner {
|
||||
// Replace if existed before.
|
||||
if let Some(flat) = self.new_cache.size_recursive(&this_hash.key()) {
|
||||
self.update_cache.delete_recursive(&this_hash);
|
||||
self.update_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat);
|
||||
self.update_cache
|
||||
.replace_hashed(&this_hash, &Some(folder.parent.clone()), &flat);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{atomic::{AtomicU32, Ordering}, Arc},
|
||||
sync::{
|
||||
atomic::{AtomicU32, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
|
||||
@@ -1,19 +1,3 @@
|
||||
use bytesize::ByteSize;
|
||||
use http::HeaderMap;
|
||||
use path_clean::PathClean;
|
||||
use rand::Rng;
|
||||
use rmp_serde::Serializer;
|
||||
use s3s::dto::ReplicationConfiguration;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::u64;
|
||||
use s3s::{S3Error, S3ErrorCode};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::time::sleep;
|
||||
use crate::config::common::save_config;
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
|
||||
@@ -21,6 +5,22 @@ use crate::error::{Error, Result};
|
||||
use crate::new_object_layer_fn;
|
||||
use crate::set_disk::SetDisks;
|
||||
use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectOptions};
|
||||
use bytesize::ByteSize;
|
||||
use http::HeaderMap;
|
||||
use path_clean::PathClean;
|
||||
use rand::Rng;
|
||||
use rmp_serde::Serializer;
|
||||
use s3s::dto::ReplicationConfiguration;
|
||||
use s3s::{S3Error, S3ErrorCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::u64;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use super::data_scanner::SizeSummary;
|
||||
use super::data_usage::DATA_USAGE_ROOT;
|
||||
@@ -146,7 +146,7 @@ impl Default for SizeHistogram {
|
||||
impl SizeHistogram {
|
||||
fn add(&mut self, size: u64) {
|
||||
for (idx, interval) in OBJECTS_HISTOGRAM_INTERVALS.iter().enumerate() {
|
||||
if size >= interval.start && size <= interval.end {
|
||||
if size >= interval.start && size <= interval.end {
|
||||
self.0[idx] += 1;
|
||||
break;
|
||||
}
|
||||
@@ -167,7 +167,7 @@ impl Default for VersionsHistogram {
|
||||
impl VersionsHistogram {
|
||||
fn add(&mut self, size: u64) {
|
||||
for (idx, interval) in OBJECTS_VERSION_COUNT_INTERVALS.iter().enumerate() {
|
||||
if size >= interval.start && size <= interval.end {
|
||||
if size >= interval.start && size <= interval.end {
|
||||
self.0[idx] += 1;
|
||||
break;
|
||||
}
|
||||
@@ -259,7 +259,10 @@ impl DataUsageEntry {
|
||||
replication_stats.replica_count += summary.replica_count as u64;
|
||||
|
||||
for (arn, st) in &summary.repl_target_stats {
|
||||
let tgt_stat = replication_stats.targets.entry(arn.to_string()).or_insert(ReplicationStats::default());
|
||||
let tgt_stat = replication_stats
|
||||
.targets
|
||||
.entry(arn.to_string())
|
||||
.or_insert(ReplicationStats::default());
|
||||
tgt_stat.pending_size += st.pending_size as u64;
|
||||
tgt_stat.failed_size += st.failed_size as u64;
|
||||
tgt_stat.replicated_size += st.replicated_size as u64;
|
||||
@@ -412,7 +415,7 @@ impl DataUsageCache {
|
||||
}
|
||||
Ok(d)
|
||||
}
|
||||
|
||||
|
||||
pub async fn save(&self, name: &str) -> Result<()> {
|
||||
let buf = self.marshal_msg()?;
|
||||
let buf_clone = buf.clone();
|
||||
@@ -429,7 +432,7 @@ impl DataUsageCache {
|
||||
});
|
||||
save_config(&store, name, &buf).await
|
||||
}
|
||||
|
||||
|
||||
pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) {
|
||||
let hash = hash_path(path);
|
||||
self.cache.insert(hash.key(), e);
|
||||
@@ -447,7 +450,10 @@ impl DataUsageCache {
|
||||
pub fn replace_hashed(&mut self, hash: &DataUsageHash, parent: &Option<DataUsageHash>, e: &DataUsageEntry) {
|
||||
self.cache.insert(hash.key(), e.clone());
|
||||
if let Some(parent) = parent {
|
||||
self.cache.entry(parent.key()).or_insert(DataUsageEntry::default()).add_child(hash);
|
||||
self.cache
|
||||
.entry(parent.key())
|
||||
.or_insert(DataUsageEntry::default())
|
||||
.add_child(hash);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -456,7 +462,11 @@ impl DataUsageCache {
|
||||
}
|
||||
|
||||
pub fn find_children_copy(&mut self, h: DataUsageHash) -> DataUsageHashMap {
|
||||
self.cache.entry(h.string()).or_insert(DataUsageEntry::default()).children.clone()
|
||||
self.cache
|
||||
.entry(h.string())
|
||||
.or_insert(DataUsageEntry::default())
|
||||
.children
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn flatten(&self, root: &DataUsageEntry) -> DataUsageEntry {
|
||||
@@ -488,7 +498,7 @@ impl DataUsageCache {
|
||||
let p = self.cache.entry(parent.key()).or_insert(DataUsageEntry::default());
|
||||
p.add_child(hash);
|
||||
}
|
||||
},
|
||||
}
|
||||
None => return,
|
||||
}
|
||||
}
|
||||
@@ -517,8 +527,8 @@ impl DataUsageCache {
|
||||
flat.replication_stats = None;
|
||||
}
|
||||
return Some(flat);
|
||||
},
|
||||
None => None
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -544,10 +554,8 @@ impl DataUsageCache {
|
||||
|
||||
pub fn is_compacted(&self, hash: &DataUsageHash) -> bool {
|
||||
match self.cache.get(&hash.key()) {
|
||||
Some(due) => {
|
||||
due.compacted
|
||||
},
|
||||
None => false
|
||||
Some(due) => due.compacted,
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -576,9 +584,7 @@ impl DataUsageCache {
|
||||
let mut leaves = Vec::new();
|
||||
let mut remove = total - limit;
|
||||
add(self, path, &mut leaves);
|
||||
leaves.sort_by(|a, b| {
|
||||
a.objects.cmp(&b.objects)
|
||||
});
|
||||
leaves.sort_by(|a, b| a.objects.cmp(&b.objects));
|
||||
|
||||
while remove > 0 && leaves.len() > 0 {
|
||||
let e = leaves.first().unwrap();
|
||||
@@ -602,7 +608,6 @@ impl DataUsageCache {
|
||||
remove -= removing;
|
||||
leaves.remove(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub fn total_children_rec(&self, path: &str) -> usize {
|
||||
@@ -652,8 +657,13 @@ fn add(data_usage_cache: &DataUsageCache, path: &DataUsageHash, leaves: &mut Vec
|
||||
return;
|
||||
}
|
||||
|
||||
let sz = data_usage_cache.size_recursive(&path.key()).unwrap_or(DataUsageEntry::default());
|
||||
leaves.push(Inner{objects: sz.objects, path: path.clone()});
|
||||
let sz = data_usage_cache
|
||||
.size_recursive(&path.key())
|
||||
.unwrap_or(DataUsageEntry::default());
|
||||
leaves.push(Inner {
|
||||
objects: sz.objects,
|
||||
path: path.clone(),
|
||||
});
|
||||
for ch in e.children.iter() {
|
||||
add(data_usage_cache, &DataUsageHash(ch.clone()), leaves);
|
||||
}
|
||||
|
||||
@@ -1 +1 @@
|
||||
pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage";
|
||||
pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage";
|
||||
|
||||
@@ -7,7 +7,13 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, error::{Error, Result}, global::GLOBAL_BackgroundHealState, heal::heal_ops::HEALING_TRACKER_FILENAME, new_object_layer_fn, store_api::{BucketInfo, StorageAPI}, utils::fs::read_file
|
||||
disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
|
||||
error::{Error, Result},
|
||||
global::GLOBAL_BackgroundHealState,
|
||||
heal::heal_ops::HEALING_TRACKER_FILENAME,
|
||||
new_object_layer_fn,
|
||||
store_api::{BucketInfo, StorageAPI},
|
||||
utils::fs::read_file,
|
||||
};
|
||||
|
||||
pub type HealScanMode = usize;
|
||||
|
||||
@@ -142,7 +142,6 @@ impl HealSequence {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl HealSequence {
|
||||
@@ -447,11 +446,13 @@ impl AllHealState {
|
||||
async fn periodic_heal_seqs_clean(&mut self) {
|
||||
let _ = self.mu.write().await;
|
||||
let now = SystemTime::now();
|
||||
|
||||
|
||||
let mut keys_to_reomve = Vec::new();
|
||||
for (k, v) in self.heal_seq_map.iter() {
|
||||
let r = v.read().await;
|
||||
if r.has_ended().await && (UNIX_EPOCH + Duration::from_secs(*(r.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now {
|
||||
if r.has_ended().await
|
||||
&& (UNIX_EPOCH + Duration::from_secs(*(r.end_time.read().await)) + KEEP_HEAL_SEQ_STATE_DURATION) < now
|
||||
{
|
||||
keys_to_reomve.push(k.clone())
|
||||
}
|
||||
}
|
||||
@@ -546,7 +547,8 @@ impl AllHealState {
|
||||
}
|
||||
}
|
||||
|
||||
self.heal_seq_map.insert(path_s.to_string(), Arc::new(RwLock::new(heal_sequence.clone())));
|
||||
self.heal_seq_map
|
||||
.insert(path_s.to_string(), Arc::new(RwLock::new(heal_sequence.clone())));
|
||||
|
||||
let client_token = heal_sequence.client_token.clone();
|
||||
if *GLOBAL_IsDistErasure.read().await {
|
||||
|
||||
@@ -2,7 +2,7 @@ pub mod background_heal_ops;
|
||||
pub mod data_scanner;
|
||||
pub mod data_scanner_metric;
|
||||
pub mod data_usage;
|
||||
pub mod data_usage_cache;
|
||||
pub mod error;
|
||||
pub mod heal_commands;
|
||||
pub mod heal_ops;
|
||||
pub mod data_usage_cache;
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::{
|
||||
disk::{
|
||||
error::DiskError,
|
||||
format::{DistributionAlgoVersion, FormatV3},
|
||||
new_disk, DiskInfo, DiskOption, DiskStore,
|
||||
new_disk, DiskAPI, DiskInfo, DiskOption, DiskStore,
|
||||
},
|
||||
endpoints::{Endpoints, PoolEndpoints},
|
||||
error::{Error, Result},
|
||||
@@ -31,8 +31,7 @@ use crate::{
|
||||
ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
|
||||
},
|
||||
store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file},
|
||||
|
||||
utils::hash,,
|
||||
utils::hash,
|
||||
};
|
||||
|
||||
use tokio::time::Duration;
|
||||
@@ -522,7 +521,7 @@ impl StorageAPI for Sets {
|
||||
let before_derives = formats_to_drives_info(&self.endpoints.endpoints, &formats, &errs);
|
||||
res.before = vec![HealDriveInfo::default(); before_derives.len()];
|
||||
res.after = vec![HealDriveInfo::default(); before_derives.len()];
|
||||
|
||||
|
||||
for v in before_derives.iter() {
|
||||
res.before.push(v.clone());
|
||||
res.after.push(v.clone());
|
||||
@@ -536,15 +535,16 @@ impl StorageAPI for Sets {
|
||||
}
|
||||
|
||||
let format_op_id = Uuid::new_v4().to_string();
|
||||
let (new_format_sets, current_disks_info) = new_heal_format_sets(&ref_format, self.set_count, self.set_drive_count, &formats, &errs);
|
||||
let (new_format_sets, current_disks_info) =
|
||||
new_heal_format_sets(&ref_format, self.set_count, self.set_drive_count, &formats, &errs);
|
||||
if !dry_run {
|
||||
let mut tmp_new_formats = vec![None; self.set_count*self.set_drive_count];
|
||||
let mut tmp_new_formats = vec![None; self.set_count * self.set_drive_count];
|
||||
for (i, set) in new_format_sets.iter().enumerate() {
|
||||
for (j, fm) in set.iter().enumerate() {
|
||||
if let Some(fm) = fm {
|
||||
res.after[i*self.set_drive_count+j].uuid = fm.erasure.this.to_string();
|
||||
res.after[i*self.set_drive_count+j].state = DRIVE_STATE_OK.to_string();
|
||||
tmp_new_formats[i*self.set_drive_count+j] = Some(fm.clone());
|
||||
res.after[i * self.set_drive_count + j].uuid = fm.erasure.this.to_string();
|
||||
res.after[i * self.set_drive_count + j].state = DRIVE_STATE_OK.to_string();
|
||||
tmp_new_formats[i * self.set_drive_count + j] = Some(fm.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -690,7 +690,7 @@ fn new_heal_format_sets(
|
||||
let mut current_disks_info = vec![vec![DiskInfo::default(); set_drive_count]; set_count];
|
||||
for (i, set) in ref_format.erasure.sets.iter().enumerate() {
|
||||
for (j, value) in set.iter().enumerate() {
|
||||
if let Some(Some(err)) = errs.get(i*set_drive_count+j) {
|
||||
if let Some(Some(err)) = errs.get(i * set_drive_count + j) {
|
||||
match err.downcast_ref::<DiskError>() {
|
||||
Some(DiskError::UnformattedDisk) => {
|
||||
let mut fm = FormatV3::new(set_count, set_drive_count);
|
||||
@@ -702,11 +702,11 @@ fn new_heal_format_sets(
|
||||
fm.erasure.version = ref_format.erasure.version.clone();
|
||||
fm.erasure.distribution_algo = ref_format.erasure.distribution_algo.clone();
|
||||
new_formats[i][j] = Some(fm);
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if let (Some(format), None) = (&formats[i*set_drive_count+j], &errs[i*set_drive_count+j]) {
|
||||
if let (Some(format), None) = (&formats[i * set_drive_count + j], &errs[i * set_drive_count + j]) {
|
||||
if let Some(info) = &format.disk_info {
|
||||
if !info.endpoint.is_empty() {
|
||||
current_disks_info[i][j] = info.clone();
|
||||
|
||||
@@ -44,7 +44,6 @@ use http::HeaderMap;
|
||||
use lazy_static::lazy_static;
|
||||
use rand::Rng;
|
||||
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use std::cmp::Ordering;
|
||||
use std::slice::Iter;
|
||||
use std::{
|
||||
@@ -54,11 +53,12 @@ use std::{
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::fs;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{mpsc, RwLock, Semaphore};
|
||||
|
||||
use crate::heal::data_usage_cache::DataUsageCache;
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
use crate::heal::data_usage_cache::DataUsageCache;
|
||||
|
||||
const MAX_UPLOADS_LIST: usize = 10000;
|
||||
|
||||
@@ -491,7 +491,12 @@ impl ECStore {
|
||||
internal_get_pool_info_existing_with_opts(&self.pools, bucket, object, opts).await
|
||||
}
|
||||
|
||||
pub async fn ns_scanner(&self, updates: Sender<DataUsageInfo>, want_cycle: usize, heal_scan_mode: HealScanMode) -> Result<()> {
|
||||
pub async fn ns_scanner(
|
||||
&self,
|
||||
updates: Sender<DataUsageInfo>,
|
||||
want_cycle: usize,
|
||||
heal_scan_mode: HealScanMode,
|
||||
) -> Result<()> {
|
||||
let all_buckets = self.list_bucket(&BucketOptions::default()).await?;
|
||||
if all_buckets.is_empty() {
|
||||
let _ = updates.send(DataUsageInfo::default()).await;
|
||||
@@ -509,21 +514,21 @@ impl ECStore {
|
||||
for set in pool.disk_set.iter() {
|
||||
let index = result_index;
|
||||
let results_clone = results.clone();
|
||||
futures.push(async move {
|
||||
futures.push(async move {
|
||||
let (tx, mut rx) = mpsc::channel(100);
|
||||
let task = tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Some(info) => {
|
||||
results_clone.write().await[index] = info;
|
||||
},
|
||||
}
|
||||
None => {
|
||||
return ;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
let _ = task.await;
|
||||
});
|
||||
result_index += 1;
|
||||
@@ -1427,9 +1432,8 @@ impl StorageAPI for ECStore {
|
||||
match err.downcast_ref::<DiskError>() {
|
||||
Some(DiskError::NoHealRequired) => {
|
||||
count_no_heal += 1;
|
||||
},
|
||||
}
|
||||
_ => {
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1438,7 +1442,6 @@ impl StorageAPI for ECStore {
|
||||
r.set_count += result.set_count;
|
||||
r.before.append(&mut result.before);
|
||||
r.after.append(&mut result.after);
|
||||
|
||||
}
|
||||
if count_no_heal == self.pools.len() {
|
||||
return Ok((r, Some(Error::new(DiskError::NoHealRequired))));
|
||||
@@ -1449,7 +1452,13 @@ impl StorageAPI for ECStore {
|
||||
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<(HealResultItem, Option<Error>)> {
|
||||
async fn heal_object(
|
||||
&self,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
version_id: &str,
|
||||
opts: &HealOpts,
|
||||
) -> Result<(HealResultItem, Option<Error>)> {
|
||||
let object = utils::path::encode_dir_object(object);
|
||||
let errs = Arc::new(RwLock::new(vec![None; self.pools.len()]));
|
||||
let results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.pools.len()]));
|
||||
|
||||
@@ -102,12 +102,6 @@ impl FileInfo {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn inline_data(&self) -> bool {
|
||||
self.metadata.as_ref().map_or(false, |metadata| {
|
||||
metadata.contains_key(&format!("{}inline-data", RESERVED_METADATA_PREFIX_LOWER)) && !self.is_remote()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_etag(&self) -> Option<String> {
|
||||
if let Some(meta) = &self.metadata {
|
||||
meta.get("etag").cloned()
|
||||
@@ -917,9 +911,15 @@ pub trait StorageAPI: ObjectIO {
|
||||
async fn put_object_tags(&self, bucket: &str, object: &str, tags: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
async fn delete_object_tags(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
|
||||
async fn heal_format(&self, dry_run: bool) ->Result<(HealResultItem, Option<Error>)>;
|
||||
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)>;
|
||||
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem>;
|
||||
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<(HealResultItem, Option<Error>)>;
|
||||
async fn heal_object(
|
||||
&self,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
version_id: &str,
|
||||
opts: &HealOpts,
|
||||
) -> Result<(HealResultItem, Option<Error>)>;
|
||||
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()>;
|
||||
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)>;
|
||||
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;
|
||||
|
||||
@@ -2,10 +2,13 @@ use crate::config::{storageclass, KVS};
|
||||
use crate::disk::DiskAPI;
|
||||
use crate::{
|
||||
disk::{
|
||||
error::DiskError, format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, new_disk, DiskInfoOptions, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET
|
||||
error::DiskError,
|
||||
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
|
||||
new_disk, DiskInfoOptions, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET,
|
||||
},
|
||||
endpoints::Endpoints,
|
||||
error::{Error, Result}, heal::heal_commands::init_healing_tracker,
|
||||
error::{Error, Result},
|
||||
heal::heal_commands::init_healing_tracker,
|
||||
};
|
||||
use futures::future::join_all;
|
||||
use std::{
|
||||
@@ -232,10 +235,12 @@ pub async fn load_format_erasure(disk: &DiskStore, heal: bool) -> Result<FormatV
|
||||
let mut fm = FormatV3::try_from(data.as_slice())?;
|
||||
|
||||
if heal {
|
||||
let info = disk.disk_info(&DiskInfoOptions {
|
||||
noop: heal,
|
||||
..Default::default()
|
||||
}).await?;
|
||||
let info = disk
|
||||
.disk_info(&DiskInfoOptions {
|
||||
noop: heal,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
fm.disk_info = Some(info);
|
||||
}
|
||||
|
||||
|
||||
@@ -2,14 +2,14 @@ use crate::error::{Error, Result};
|
||||
|
||||
pub fn parse_bool(str: &str) -> Result<bool> {
|
||||
match str {
|
||||
"1"| "t"| "T"| "true"| "TRUE"| "True"| "on"| "ON"| "On"| "enabled" => {
|
||||
"1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => {
|
||||
return Ok(true);
|
||||
},
|
||||
"0"| "f"| "F"| "false"| "FALSE"| "False"| "off"| "OFF"| "Off"| "disabled" => {
|
||||
}
|
||||
"0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => {
|
||||
return Ok(false);
|
||||
}
|
||||
_ => {
|
||||
return Err(Error::from_string(format!("ParseBool: parsing {}", str)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pub mod bool_flag;
|
||||
pub mod crypto;
|
||||
pub mod ellipses;
|
||||
pub mod bool_flag;
|
||||
pub mod fs;
|
||||
pub mod hash;
|
||||
pub mod net;
|
||||
|
||||
Reference in New Issue
Block a user