auto heal(3)

Signed-off-by: mujunxiang <1948535941@qq.com>
This commit is contained in:
mujunxiang
2024-11-19 17:15:37 +08:00
parent 6291087ecb
commit e94d462652
50 changed files with 1822 additions and 2175 deletions

11
Cargo.lock generated
View File

@@ -427,7 +427,9 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
name = "common"
version = "0.0.1"
dependencies = [
"async-trait",
"lazy_static",
"scopeguard",
"tokio",
"tonic",
"tracing-error",
@@ -655,6 +657,7 @@ dependencies = [
"url",
"uuid",
"winapi",
"workers",
"xxhash-rust",
]
@@ -3206,6 +3209,14 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "workers"
version = "0.0.1"
dependencies = [
"common",
"tokio",
]
[[package]]
name = "write16"
version = "1.0.0"

View File

@@ -9,7 +9,7 @@ members = [
"common/protos",
"api/admin",
"reader",
"router",
"router", "common/workers",
]
[workspace.package]
@@ -85,3 +85,4 @@ uuid = { version = "1.11.0", features = [
log = "0.4.22"
axum = "0.7.7"
md-5 = "0.10.6"
workers = { path = "./common/workers" }

View File

@@ -4,7 +4,9 @@ version.workspace = true
edition.workspace = true
[dependencies]
async-trait.workspace = true
lazy_static.workspace = true
scopeguard = "1.2.0"
tokio.workspace = true
tonic.workspace = true
tracing-error.workspace = true
tracing-error.workspace = true

View File

@@ -135,14 +135,14 @@ mod test {
let id = "foo";
let source = "dandan";
let timeout = Duration::from_secs(5);
assert_eq!(true, l_rw_lock.get_lock(id, source, &timeout).await);
assert!(l_rw_lock.get_lock(id, source, &timeout).await);
l_rw_lock.un_lock().await;
l_rw_lock.lock().await;
assert_eq!(false, l_rw_lock.get_r_lock(id, source, &timeout).await);
assert!(!(l_rw_lock.get_r_lock(id, source, &timeout).await));
l_rw_lock.un_lock().await;
assert_eq!(true, l_rw_lock.get_r_lock(id, source, &timeout).await);
assert!(l_rw_lock.get_r_lock(id, source, &timeout).await);
Ok(())
}
@@ -156,7 +156,7 @@ mod test {
let one_fn = async {
let one = Arc::clone(&l_rw_lock);
let timeout = Duration::from_secs(1);
assert_eq!(true, one.get_lock(id, source, &timeout).await);
assert!(one.get_lock(id, source, &timeout).await);
sleep(Duration::from_secs(5)).await;
l_rw_lock.un_lock().await;
};
@@ -164,9 +164,9 @@ mod test {
let two_fn = async {
let two = Arc::clone(&l_rw_lock);
let timeout = Duration::from_secs(2);
assert_eq!(false, two.get_r_lock(id, source, &timeout).await);
assert!(!(two.get_r_lock(id, source, &timeout).await));
sleep(Duration::from_secs(5)).await;
assert_eq!(true, two.get_r_lock(id, source, &timeout).await);
assert!(two.get_r_lock(id, source, &timeout).await);
two.un_r_lock().await;
};

View File

@@ -287,7 +287,7 @@ mod test {
})
.await?;
assert_eq!(result, true);
assert!(result);
Ok(())
}
}

View File

@@ -1,10 +1,9 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -12,112 +11,114 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::mem;
use core::cmp::Ordering;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,16 @@ message PingResponse {
bytes body = 2;
}
message HealBucketRequest {
string bucket = 1;
string options = 2;
}
message HealBucketResponse {
bool success = 1;
optional string error_info = 2;
}
message ListBucketRequest {
string options = 1;
}
@@ -410,6 +420,7 @@ message GenerallyLockResponse {
service NodeService {
/* -------------------------------meta service-------------------------- */
rpc Ping(PingRequest) returns (PingResponse) {};
rpc HealBucket(HealBucketRequest) returns (HealBucketResponse) {};
rpc ListBucket(ListBucketRequest) returns (ListBucketResponse) {};
rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {};
rpc GetBucketInfo(GetBucketInfoRequest) returns (GetBucketInfoResponse) {};

11
common/workers/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "workers"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
common.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1 @@
pub mod workers;

View File

@@ -0,0 +1,87 @@
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
pub struct Workers {
available: Mutex<usize>, // 可用的工作槽
notify: Notify, // 用于通知等待的任务
limit: usize, // 最大并发工作数
}
impl Workers {
// 创建 Workers 对象,允许最多 n 个作业并发执行
pub fn new(n: usize) -> Result<Arc<Workers>, &'static str> {
if n == 0 {
return Err("n must be > 0");
}
Ok(Arc::new(Workers {
available: Mutex::new(n),
notify: Notify::new(),
limit: n,
}))
}
// 让一个作业获得执行的机会
pub async fn take(&self) {
let mut available = self.available.lock().await;
while *available == 0 {
// 等待直到有可用槽
self.notify.notified().await;
available = self.available.lock().await;
}
*available -= 1; // 减少可用槽
}
// 让一个作业释放其机会
pub async fn give(&self) {
let mut available = self.available.lock().await;
*available += 1; // 增加可用槽
self.notify.notify_one(); // 通知一个等待的任务
}
// 等待所有并发作业完成
pub async fn wait(&self) {
loop {
{
let available = self.available.lock().await;
if *available == self.limit {
break;
}
}
// 等待直到所有槽都被释放
self.notify.notified().await;
}
}
pub async fn available(&self) -> usize {
*self.available.lock().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_workers() {
let workers = Arc::new(Workers::new(5).unwrap());
for _ in 0..4 {
let workers = workers.clone();
tokio::spawn(async move {
workers.take().await;
sleep(Duration::from_secs(3)).await;
workers.give().await;
});
}
// Sleep: wait for spawn task started
sleep(Duration::from_secs(1)).await;
workers.wait().await;
if workers.available().await != workers.limit {
assert!(false);
}
}
}

View File

@@ -33,13 +33,13 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
let response = client.lock(request).await?.into_inner();
println!("request ended");
if let Some(error_info) = response.error_info {
assert!(false, "can not get lock: {}", error_info);
panic!("can not get lock: {}", error_info);
}
let request = Request::new(GenerallyLockRequest { args });
let response = client.un_lock(request).await?.into_inner();
if let Some(error_info) = response.error_info {
assert!(false, "can not get un_lock: {}", error_info);
panic!("can not get un_lock: {}", error_info);
}
Ok(())
@@ -58,17 +58,16 @@ async fn test_lock_unlock_ns_lock() -> Result<(), Box<dyn Error>> {
vec![locker],
)
.await;
assert_eq!(
ns.0.write()
.await
.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await
.unwrap(),
true
);
assert!(ns
.0
.write()
.await
.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await
.unwrap());
ns.0.write().await.un_lock().await.unwrap();
Ok(())

View File

@@ -60,6 +60,7 @@ s3s-policy.workspace = true
rand.workspace = true
pin-project-lite.workspace = true
md-5.workspace = true
workers.workspace = true
[target.'cfg(not(windows))'.dependencies]

View File

@@ -103,7 +103,7 @@ impl Hasher {
}
impl BitrotAlgorithm {
pub fn new(&self) -> Hasher {
pub fn new_hasher(&self) -> Hasher {
match self {
BitrotAlgorithm::SHA256 => Hasher::SHA256(Sha256::new()),
BitrotAlgorithm::HighwayHash256 | BitrotAlgorithm::HighwayHash256S => {
@@ -186,10 +186,8 @@ pub fn new_bitrot_reader(
}
pub async fn close_bitrot_writers(writers: &mut [Option<BitrotWriter>]) -> Result<()> {
for w in writers.into_iter() {
if let Some(w) = w {
let _ = w.close().await?;
}
for w in writers.iter_mut().flatten() {
w.close().await?;
}
Ok(())
@@ -207,7 +205,7 @@ pub fn bitrot_shard_file_size(size: usize, shard_size: usize, algo: BitrotAlgori
if algo != BitrotAlgorithm::HighwayHash256S {
return size;
}
size.div_ceil(shard_size) * algo.new().size() + size
size.div_ceil(shard_size) * algo.new_hasher().size() + size
}
pub fn bitrot_verify(
@@ -219,7 +217,7 @@ pub fn bitrot_verify(
mut shard_size: usize,
) -> Result<()> {
if algo != BitrotAlgorithm::HighwayHash256S {
let mut h = algo.new();
let mut h = algo.new_hasher();
h.update(r.get_ref());
if h.finalize() != want {
return Err(Error::new(DiskError::FileCorrupt));
@@ -227,7 +225,7 @@ pub fn bitrot_verify(
return Ok(());
}
let mut h = algo.new();
let mut h = algo.new_hasher();
let mut hash_buf = vec![0; h.size()];
let mut left = want_size;
@@ -271,7 +269,7 @@ impl WholeBitrotWriter {
volume: volume.to_string(),
file_path: file_path.to_string(),
_shard_size: shard_size,
hash: algo.new(),
hash: algo.new_hasher(),
}
}
}
@@ -353,7 +351,7 @@ impl StreamingBitrotWriter {
algo: BitrotAlgorithm,
shard_size: usize,
) -> Result<Self> {
let hasher = algo.new();
let hasher = algo.new_hasher();
let (tx, mut rx) = mpsc::channel::<Option<Vec<u8>>>(10);
let total_file_size = length.div_ceil(shard_size) * hasher.size() + length;
@@ -362,7 +360,7 @@ impl StreamingBitrotWriter {
let task = spawn(async move {
loop {
if let Some(Some(buf)) = rx.recv().await {
let _ = writer.write(&buf).await.unwrap();
writer.write(&buf).await.unwrap();
continue;
}
@@ -389,7 +387,7 @@ impl Writer for StreamingBitrotWriter {
return Ok(());
}
self.hasher.reset();
self.hasher.update(&buf);
self.hasher.update(buf);
let hash_bytes = self.hasher.clone().finalize();
let _ = self.tx.send(Some(hash_bytes)).await?;
let _ = self.tx.send(Some(buf.to_vec())).await?;
@@ -430,7 +428,7 @@ impl StreamingBitrotReader {
till_offset: usize,
shard_size: usize,
) -> Self {
let hasher = algo.new();
let hasher = algo.new_hasher();
Self {
disk,
_data: data.to_vec(),
@@ -489,7 +487,7 @@ pub struct BitrotFileWriter {
impl BitrotFileWriter {
pub fn new(inner: FileWriter, algo: BitrotAlgorithm, _shard_size: usize) -> Self {
let hasher = algo.new();
let hasher = algo.new_hasher();
Self {
inner,
hasher,
@@ -513,7 +511,7 @@ impl Writer for BitrotFileWriter {
return Ok(());
}
self.hasher.reset();
self.hasher.update(&buf);
self.hasher.update(buf);
let hash_bytes = self.hasher.clone().finalize();
let _ = self.inner.write(&hash_bytes).await?;
let _ = self.inner.write(buf).await?;
@@ -544,7 +542,7 @@ struct BitrotFileReader {
impl BitrotFileReader {
pub fn new(inner: FileReader, algo: BitrotAlgorithm, _till_offset: usize, shard_size: usize) -> Self {
let hasher = algo.new();
let hasher = algo.new_hasher();
Self {
inner,
// till_offset: ceil(till_offset, shard_size) * hasher.size() + till_offset,
@@ -648,7 +646,7 @@ mod test {
}
let checksum = decode_to_vec(checksums.get(algo).unwrap()).unwrap();
let mut h = algo.new();
let mut h = algo.new_hasher();
let mut msg = Vec::with_capacity(h.size() * h.block_size());
let mut sum = Vec::with_capacity(h.size());
@@ -656,7 +654,7 @@ mod test {
h.update(&msg);
sum = h.finalize();
msg.extend(sum.clone());
h = algo.new();
h = algo.new_hasher();
}
if checksum != sum {

View File

@@ -39,10 +39,8 @@ pub fn check_bucket_name_common(bucket_name: &str, strict: bool) -> Result<(), E
if !VALID_BUCKET_NAME_STRICT.is_match(bucket_name_trimmed) {
return Err(Error::msg("Bucket name contains invalid characters"));
}
} else {
if !VALID_BUCKET_NAME.is_match(bucket_name_trimmed) {
return Err(Error::msg("Bucket name contains invalid characters"));
}
} else if !VALID_BUCKET_NAME.is_match(bucket_name_trimmed) {
return Err(Error::msg("Bucket name contains invalid characters"));
}
Ok(())
}

View File

@@ -40,10 +40,8 @@ impl VersioningApi for VersioningConfiguration {
}
if let Some(status) = self.status.as_ref() {
if status.as_str() == BucketVersioningStatus::ENABLED {
if prefix.is_empty() {
return false;
}
if status.as_str() == BucketVersioningStatus::ENABLED && prefix.is_empty() {
return false;
// TODO: ExcludeFolders
}

View File

@@ -54,8 +54,10 @@ impl<T: Clone + Debug + Send + 'static> Cache<T> {
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
if v.is_some() && now - self.last_update_ms.load(Ordering::SeqCst) < self.ttl.as_secs() {
return Ok(v.unwrap());
if now - self.last_update_ms.load(Ordering::SeqCst) < self.ttl.as_secs() {
if let Some(v) = v {
return Ok(v);
}
}
if self.opts.no_wait && v.is_some() && now - self.last_update_ms.load(Ordering::SeqCst) < self.ttl.as_secs() * 2 {
@@ -110,7 +112,7 @@ impl<T: Clone + Debug + Send + 'static> Cache<T> {
return Ok(());
}
return Err(err);
Err(err)
}
}
}

View File

@@ -45,12 +45,12 @@ impl Clone for ListPathRawOptions {
fallback_disks: self.fallback_disks.clone(),
bucket: self.bucket.clone(),
path: self.path.clone(),
recursice: self.recursice.clone(),
recursice: self.recursice,
filter_prefix: self.filter_prefix.clone(),
forward_to: self.forward_to.clone(),
min_disks: self.min_disks.clone(),
report_not_found: self.report_not_found.clone(),
per_disk_limit: self.per_disk_limit.clone(),
min_disks: self.min_disks,
report_not_found: self.report_not_found,
per_disk_limit: self.per_disk_limit,
..Default::default()
}
}
@@ -80,7 +80,7 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
.walk_dir(WalkDirOptions {
bucket: opts_clone.bucket.clone(),
base_dir: opts_clone.path.clone(),
recursive: opts_clone.recursice.clone(),
recursive: opts_clone.recursice,
report_notfound: opts_clone.report_not_found,
filter_prefix: opts_clone.filter_prefix.clone(),
forward_to: opts_clone.forward_to.clone(),
@@ -210,13 +210,11 @@ pub async fn list_path_raw(mut rx: B_Receiver<bool>, opts: ListPathRawOptions) -
}
// Break if all at EOF or error.
if at_eof + has_err == readers.len() {
if has_err > 0 {
if let Some(finished_fn) = opts.finished.as_ref() {
finished_fn(&errs).await;
}
break;
if at_eof + has_err == readers.len() && has_err > 0 {
if let Some(finished_fn) = opts.finished.as_ref() {
finished_fn(&errs).await;
}
break;
}
if agree == readers.len() {

View File

@@ -15,7 +15,7 @@ use s3s::dto::StreamingBlob;
use s3s::Body;
use tracing::error;
const CONFIG_PREFIX: &str = "config";
pub const CONFIG_PREFIX: &str = "config";
const CONFIG_FILE: &str = "config.json";
pub const STORAGE_CLASS_SUB_SYS: &str = "storage_class";
@@ -161,32 +161,29 @@ async fn apply_dynamic_config(cfg: &mut Config, api: &ECStore) -> Result<()> {
Ok(())
}
async fn apply_dynamic_config_for_sub_sys(cfg: &mut Config, api: &ECStore, subsys: &String) -> Result<()> {
async fn apply_dynamic_config_for_sub_sys(cfg: &mut Config, api: &ECStore, subsys: &str) -> Result<()> {
let set_drive_counts = api.set_drive_counts();
match subsys.as_str() {
STORAGE_CLASS_SUB_SYS => {
let kvs = match cfg.get_value(STORAGE_CLASS_SUB_SYS, DEFAULT_KV_KEY) {
Some(res) => res,
None => KVS::new(),
};
if subsys == STORAGE_CLASS_SUB_SYS {
let kvs = match cfg.get_value(STORAGE_CLASS_SUB_SYS, DEFAULT_KV_KEY) {
Some(res) => res,
None => KVS::new(),
};
for (i, count) in set_drive_counts.iter().enumerate() {
match storageclass::lookup_config(&kvs, *count) {
Ok(res) => {
if i == 0 && GLOBAL_StorageClass.get().is_none() {
if let Err(r) = GLOBAL_StorageClass.set(res) {
error!("GLOBAL_StorageClass.set failed {:?}", r);
}
for (i, count) in set_drive_counts.iter().enumerate() {
match storageclass::lookup_config(&kvs, *count) {
Ok(res) => {
if i == 0 && GLOBAL_StorageClass.get().is_none() {
if let Err(r) = GLOBAL_StorageClass.set(res) {
error!("GLOBAL_StorageClass.set failed {:?}", r);
}
}
Err(err) => {
error!("init storageclass err:{:?}", &err);
break;
}
}
Err(err) => {
error!("init storageclass err:{:?}", &err);
break;
}
}
}
_ => {}
}
Ok(())

View File

@@ -37,9 +37,9 @@ fn parse_bitrot_config(s: &str) -> Result<Duration> {
match parse_bool(s) {
Ok(enabled) => {
if enabled {
return Ok(Duration::from_secs_f64(0.0));
Ok(Duration::from_secs_f64(0.0))
} else {
return Ok(Duration::from_secs_f64(-1.0));
Ok(Duration::from_secs_f64(-1.0))
}
}
Err(_) => {

View File

@@ -1,5 +1,6 @@
pub mod common;
pub mod error;
#[allow(dead_code)]
pub mod heal;
pub mod storageclass;
@@ -22,6 +23,12 @@ pub static RUSTFS_CONFIG_PREFIX: &str = "config";
pub struct ConfigSys {}
impl Default for ConfigSys {
fn default() -> Self {
Self::new()
}
}
impl ConfigSys {
pub fn new() -> Self {
Self {}
@@ -47,6 +54,12 @@ pub struct KV {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct KVS(Vec<KV>);
impl Default for KVS {
fn default() -> Self {
Self::new()
}
}
impl KVS {
pub fn new() -> Self {
KVS(Vec::new())
@@ -72,6 +85,12 @@ impl KVS {
#[derive(Debug, Clone)]
pub struct Config(HashMap<String, HashMap<String, KVS>>);
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
impl Config {
pub fn new() -> Self {
let mut cfg = Config(HashMap::new());

View File

@@ -228,7 +228,7 @@ pub fn parse_storage_class(env: &str) -> Result<StorageClass> {
// only two elements allowed in the string - "scheme" and "number of parity drives"
if s.len() != 2 {
return Err(Error::msg(&format!(
return Err(Error::msg(format!(
"Invalid storage class format: {}. Expected 'Scheme:Number of parity drives'.",
env
)));
@@ -236,13 +236,13 @@ pub fn parse_storage_class(env: &str) -> Result<StorageClass> {
// only allowed scheme is "EC"
if s[0] != SCHEME_PREFIX {
return Err(Error::msg(&format!("Unsupported scheme {}. Supported scheme is EC.", s[0])));
return Err(Error::msg(format!("Unsupported scheme {}. Supported scheme is EC.", s[0])));
}
// Number of parity drives should be integer
let parity_drives: usize = match s[1].parse() {
Ok(num) => num,
Err(_) => return Err(Error::msg(&format!("Failed to parse parity value: {}.", s[1]))),
Err(_) => return Err(Error::msg(format!("Failed to parse parity value: {}.", s[1]))),
};
Ok(StorageClass { parity: parity_drives })

View File

@@ -30,7 +30,13 @@ pub struct Endpoint {
impl Display for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.url.scheme() == "file" {
write!(f, "{}", fs::canonicalize(self.url.path()).map_err(|_| std::fmt::Error)?.to_string_lossy().to_string())
write!(
f,
"{}",
fs::canonicalize(self.url.path())
.map_err(|_| std::fmt::Error)?
.to_string_lossy()
)
} else {
write!(f, "{}", self.url)
}

View File

@@ -265,14 +265,7 @@ pub fn os_err_to_file_err(e: io::Error) -> Error {
}
pub fn is_err_file_not_found(err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<DiskError>() {
match e {
DiskError::FileNotFound => true,
_ => false,
}
} else {
false
}
matches!(err.downcast_ref::<DiskError>(), Some(DiskError::FileNotFound))
}
pub fn is_sys_err_no_space(e: &io::Error) -> bool {
@@ -426,18 +419,32 @@ pub fn convert_access_error(e: io::Error, per_err: DiskError) -> Error {
}
pub fn is_all_not_found(errs: &[Option<Error>]) -> bool {
for err in errs.iter() {
if let Some(err) = err {
if let Some(err) = err.downcast_ref::<DiskError>() {
match err {
DiskError::FileNotFound | DiskError::VolumeNotFound | &DiskError::FileVersionNotFound => {
continue;
}
_ => return false,
for err in errs.iter().flatten() {
if let Some(err) = err.downcast_ref::<DiskError>() {
match err {
DiskError::FileNotFound | DiskError::VolumeNotFound | &DiskError::FileVersionNotFound => {
continue;
}
_ => return false,
}
}
}
!errs.is_empty()
}
pub fn is_all_buckets_not_found(errs: &[Option<Error>]) -> bool {
if errs.is_empty() {
return false;
}
let mut not_found_count = 0;
for err in errs.iter().flatten() {
match err.downcast_ref() {
Some(DiskError::VolumeNotFound) | Some(DiskError::DiskNotFound) => {
not_found_count += 1;
}
_ => {}
}
}
errs.len() == not_found_count
}

View File

@@ -929,7 +929,7 @@ impl DiskAPI for LocalDisk {
}
async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
let volume_dir = self.get_bucket_path(&volume)?;
let volume_dir = self.get_bucket_path(volume)?;
check_path_length(volume_dir.join(path).to_string_lossy().as_ref())?;
let mut resp = CheckPartsResp {
results: vec![0; fi.parts.len()],
@@ -955,22 +955,16 @@ impl DiskAPI for LocalDisk {
resp.results[i] = CHECK_PART_SUCCESS;
}
Err(err) => {
match os_err_to_file_err(err).downcast_ref() {
Some(DiskError::FileNotFound) => {
if !skip_access_checks(volume) {
if let Err(err) = access(&volume_dir).await {
match err.kind() {
ErrorKind::NotFound => {
resp.results[i] = CHECK_PART_VOLUME_NOT_FOUND;
continue;
}
_ => {}
}
if let Some(DiskError::FileNotFound) = os_err_to_file_err(err).downcast_ref() {
if !skip_access_checks(volume) {
if let Err(err) = access(&volume_dir).await {
if err.kind() == ErrorKind::NotFound {
resp.results[i] = CHECK_PART_VOLUME_NOT_FOUND;
continue;
}
}
resp.results[i] = CHECK_PART_FILE_NOT_FOUND;
}
_ => {}
resp.results[i] = CHECK_PART_FILE_NOT_FOUND;
}
continue;
}
@@ -1966,7 +1960,7 @@ impl DiskAPI for LocalDisk {
for info in obj_infos.iter() {
let done = ScannerMetrics::time(ScannerMetric::ApplyVersion);
let sz: usize;
(obj_deleted, sz) = item.apply_actions(&info, &size_s).await;
(obj_deleted, sz) = item.apply_actions(info, &size_s).await;
done().await;
if obj_deleted {
@@ -2031,7 +2025,7 @@ impl DiskAPI for LocalDisk {
}
match HealingTracker::unmarshal_msg(&b) {
Ok(h) => Some(h),
Err(_) => Some(HealingTracker::default())
Err(_) => Some(HealingTracker::default()),
}
}
}
@@ -2046,10 +2040,7 @@ async fn get_disk_info(drive_path: PathBuf) -> Result<(Info, bool)> {
if root_disk_threshold > 0 {
disk_info.total <= root_disk_threshold
} else {
match is_root_disk(&drive_path, SLASH_SEPARATOR) {
Ok(result) => result,
Err(_) => false,
}
is_root_disk(&drive_path, SLASH_SEPARATOR).unwrap_or_default()
}
} else {
false

View File

@@ -39,7 +39,6 @@ use std::{
io::{Cursor, SeekFrom},
path::PathBuf,
sync::Arc,
usize,
};
use time::OffsetDateTime;
use tokio::{
@@ -683,7 +682,7 @@ impl MetaCacheEntry {
let mut fm = FileMeta::new();
fm.unmarshal_msg(&self.metadata)?;
Ok(fm.into_file_info_versions(bucket, self.name.as_str(), false)?)
fm.into_file_info_versions(bucket, self.name.as_str(), false)
}
pub fn matches(&self, other: &MetaCacheEntry, strict: bool) -> Result<(Option<MetaCacheEntry>, bool)> {
@@ -842,7 +841,7 @@ impl MetaCacheEntries {
selected = Some(MetaCacheEntry {
name: selected.as_ref().unwrap().name.clone(),
cached: Some(FileMeta {
meta_ver: selected.as_ref().unwrap().cached.as_ref().unwrap().meta_ver.clone(),
meta_ver: selected.as_ref().unwrap().cached.as_ref().unwrap().meta_ver,
..Default::default()
}),
_reusable: true,

View File

@@ -56,7 +56,7 @@ pub fn is_root_disk(disk_path: &str, root_disk: &str) -> Result<bool> {
return Ok(false);
}
Ok(same_disk(disk_path, root_disk)?)
same_disk(disk_path, root_disk)
}
pub async fn make_dir_all(path: impl AsRef<Path>, base_dir: impl AsRef<Path>) -> Result<()> {

View File

@@ -117,7 +117,7 @@ impl Endpoints {
// GetString - returns endpoint string of i-th endpoint (0-based),
// and empty string for invalid indexes.
pub fn get_string(&self, i: usize) -> String {
if i < 0 || i >= self.0.len() {
if i >= self.0.len() {
return "".to_string();
}

View File

@@ -445,7 +445,7 @@ impl Erasure {
self.encoder.as_ref().unwrap().reconstruct(&mut bufs)?;
}
let shards = bufs.into_iter().filter_map(|x| x).collect::<Vec<_>>();
let shards = bufs.into_iter().flatten().collect::<Vec<_>>();
if shards.len() != self.parity_shards + self.data_shards {
return Err(Error::from_string("can not reconstruct data"));
}

View File

@@ -37,7 +37,7 @@ pub async fn set_global_deployment_id(id: Uuid) {
pub async fn get_global_deployment_id() -> Uuid {
let id_ptr = globalDeploymentIDPtr.read().await;
id_ptr.clone()
*id_ptr
}
pub async fn set_global_endpoints(eps: Vec<PoolEndpoints>) {

View File

@@ -1,5 +1,5 @@
use s3s::{S3Error, S3ErrorCode};
use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
@@ -7,17 +7,30 @@ use tokio::{
},
time::interval,
};
use tracing::info;
use tracing::{error, info};
use uuid::Uuid;
use crate::{
config::RUSTFS_CONFIG_PREFIX, disk::{endpoint::Endpoint, error::DiskError, DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, error::{Error, Result}, global::{GLOBAL_BackgroundHealRoutine, GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP}, heal::{data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT}, data_usage_cache::DataUsageCache, heal_commands::{init_healing_tracker, load_healing_tracker}, heal_ops::NOP_HEAL}, new_object_layer_fn, store::get_disk_via_endpoint, store_api::{BucketInfo, BucketOptions, StorageAPI}, utils::path::{path_join, SLASH_SEPARATOR}
};
use super::{
heal_commands::{HealOpts, HealResultItem},
heal_ops::{new_bg_heal_sequence, HealSequence},
};
use crate::heal::error::ERR_RETRY_HEALING;
use crate::{
config::RUSTFS_CONFIG_PREFIX,
disk::{endpoint::Endpoint, error::DiskError, DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
error::{Error, Result},
global::{GLOBAL_BackgroundHealRoutine, GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP},
heal::{
data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
data_usage_cache::DataUsageCache,
heal_commands::{init_healing_tracker, load_healing_tracker},
heal_ops::NOP_HEAL,
},
new_object_layer_fn,
store::get_disk_via_endpoint,
store_api::{BucketInfo, BucketOptions, StorageAPI},
utils::path::{path_join, SLASH_SEPARATOR},
};
pub static DEFAULT_MONITOR_NEW_DISK_INTERVAL: Duration = Duration::from_secs(10);
@@ -30,6 +43,9 @@ pub async fn init_auto_heal() {
.await
.push_heal_local_disks(&get_local_disks_to_heal().await)
.await;
tokio::spawn(async {
monitor_local_disks_and_heal().await;
});
}
}
}
@@ -99,10 +115,27 @@ async fn monitor_local_disks_and_heal() {
for disk in heal_disks.into_ref().iter() {
let disk_clone = disk.clone();
tokio::spawn(async move {
GLOBAL_BackgroundHealState.write().await.set_disk_healing_status(disk_clone.clone(), true).await;
GLOBAL_BackgroundHealState
.write()
.await
.set_disk_healing_status(disk_clone.clone(), true)
.await;
if heal_fresh_disk(&disk_clone).await.is_err() {
GLOBAL_BackgroundHealState
.write()
.await
.set_disk_healing_status(disk_clone.clone(), false)
.await;
return;
}
GLOBAL_BackgroundHealState
.write()
.await
.pop_heal_local_disks(&[disk_clone])
.await;
});
}
interval.reset();
}
}
@@ -110,16 +143,20 @@ async fn heal_fresh_disk(endpoint: &Endpoint) -> Result<()> {
let (pool_idx, set_idx) = (endpoint.pool_idx.unwrap(), endpoint.disk_idx.unwrap());
let disk = match get_disk_via_endpoint(endpoint).await {
Some(disk) => disk,
None => return Err(Error::from_string(format!("Unexpected error disk must be initialized by now after formatting: {}", endpoint.to_string()))),
None => {
return Err(Error::from_string(format!(
"Unexpected error disk must be initialized by now after formatting: {}",
endpoint
)))
}
};
if let Err(err) = disk.disk_info(&DiskInfoOptions::default()).await {
match err.downcast_ref() {
Some(DiskError::DriveIsRoot) => {
return Ok(());
},
Some(DiskError::UnformattedDisk) => {
},
}
Some(DiskError::UnformattedDisk) => {}
_ => {
return Err(err);
}
@@ -134,14 +171,21 @@ async fn heal_fresh_disk(endpoint: &Endpoint) -> Result<()> {
return Ok(());
}
_ => {
info!("Unable to load healing tracker on '{}': {}, re-initializing..", disk.to_string(), err.to_string());
},
info!(
"Unable to load healing tracker on '{}': {}, re-initializing..",
disk.to_string(),
err.to_string()
);
}
}
init_healing_tracker(disk.clone(), &Uuid::new_v4().to_string()).await?
}
};
info!("Healing drive '{}' - 'mc admin heal alias/ --verbose' to check the current status.", endpoint.to_string());
info!(
"Healing drive '{}' - 'mc admin heal alias/ --verbose' to check the current status.",
endpoint.to_string()
);
let layer = new_object_layer_fn();
let lock = layer.read().await;
@@ -151,11 +195,15 @@ async fn heal_fresh_disk(endpoint: &Endpoint) -> Result<()> {
};
let mut buckets = store.list_bucket(&BucketOptions::default()).await?;
buckets.push(BucketInfo {
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(RUSTFS_CONFIG_PREFIX)]).to_string_lossy().to_string(),
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(RUSTFS_CONFIG_PREFIX)])
.to_string_lossy()
.to_string(),
..Default::default()
});
buckets.push(BucketInfo {
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(BUCKET_META_PREFIX)]).to_string_lossy().to_string(),
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(BUCKET_META_PREFIX)])
.to_string_lossy()
.to_string(),
..Default::default()
});
@@ -170,55 +218,123 @@ async fn heal_fresh_disk(endpoint: &Endpoint) -> Result<()> {
}
});
let mut cache = match DataUsageCache::load(&store.pools[pool_idx].disk_set[set_idx], DATA_USAGE_CACHE_NAME).await {
Ok(cache) => {
let data_usage_info = cache.dui(DATA_USAGE_ROOT, &Vec::new());
tracker.objects_total_count = data_usage_info.objects_total_count;
tracker.objects_total_size = data_usage_info.objects_total_size;
cache
},
Err(_) => DataUsageCache::default()
if let Ok(cache) = DataUsageCache::load(&store.pools[pool_idx].disk_set[set_idx], DATA_USAGE_CACHE_NAME).await {
let data_usage_info = cache.dui(DATA_USAGE_ROOT, &Vec::new());
tracker.objects_total_count = data_usage_info.objects_total_count;
tracker.objects_total_size = data_usage_info.objects_total_size;
};
let location = disk.get_disk_location();
tracker.set_queue_buckets(&buckets).await;
tracker.save().await?;
// store.pools[pool_idx].disk_set[set_idx].
todo!()
let tracker = Arc::new(RwLock::new(tracker));
let qb = tracker.read().await.queue_buckets.clone();
store.pools[pool_idx].disk_set[set_idx]
.clone()
.heal_erasure_set(&qb, tracker.clone())
.await?;
let mut tracker_w = tracker.write().await;
if tracker_w.items_failed > 0 && tracker_w.retry_attempts < 4 {
tracker_w.retry_attempts += 1;
tracker_w.reset_healing().await;
if let Err(err) = tracker_w.update().await {
info!("update tracker failed: {}", err.to_string());
}
return Err(Error::from_string(ERR_RETRY_HEALING));
}
if tracker_w.items_failed > 0 {
info!(
"Healing of drive '{}' is incomplete, retried {} times (healed: {}, skipped: {}, failed: {}).",
disk.to_string(),
tracker_w.retry_attempts,
tracker_w.items_healed,
tracker_w.item_skipped,
tracker_w.items_failed
);
} else if tracker_w.retry_attempts > 0 {
info!(
"Healing of drive '{}' is incomplete, retried {} times (healed: {}, skipped: {}).",
disk.to_string(),
tracker_w.retry_attempts,
tracker_w.items_healed,
tracker_w.item_skipped
);
} else {
info!(
"Healing of drive '{}' is finished (healed: {}, skipped: {}).",
disk.to_string(),
tracker_w.items_healed,
tracker_w.item_skipped
);
}
if tracker_w.heal_id.is_empty() {
if let Err(err) = tracker_w.delete().await {
error!("delete tracker failed: {}", err.to_string());
}
}
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::from(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()))),
};
let disks = store.get_disks(pool_idx, set_idx).await?;
for disk in disks.into_iter() {
if disk.is_none() {
continue;
}
let mut tracker = match load_healing_tracker(&disk).await {
Ok(tracker) => tracker,
Err(err) => {
match err.downcast_ref() {
Some(DiskError::FileNotFound) => {}
_ => {
info!("Unable to load healing tracker on '{:?}': {}, re-initializing..", disk, err.to_string());
}
}
continue;
}
};
if tracker.heal_id == tracker_w.heal_id {
tracker.finished = true;
tracker.update().await?;
}
}
Ok(())
}
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct HealTask {
pub bucket: String,
pub object: String,
pub version_id: String,
pub opts: HealOpts,
pub resp_tx: Option<Arc<Sender<HealResult>>>,
pub resp_rx: Option<Arc<Receiver<HealResult>>>,
pub resp_tx: Option<Sender<HealResult>>,
pub resp_rx: Option<Receiver<HealResult>>,
}
impl HealTask {
pub fn new(bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Self {
let (tx, rx) = mpsc::channel(10);
Self {
bucket: bucket.to_string(),
object: object.to_string(),
version_id: version_id.to_string(),
opts: opts.clone(),
resp_tx: Some(tx.into()),
resp_rx: Some(rx.into()),
opts: *opts,
resp_tx: None,
resp_rx: None,
}
}
}
pub struct HealResult {
pub result: HealResultItem,
_err: Option<Error>,
pub err: Option<Error>,
}
pub struct HealRoutine {
tasks_tx: Sender<HealTask>,
pub tasks_tx: Sender<HealTask>,
tasks_rx: Receiver<HealTask>,
workers: usize,
}
@@ -265,13 +381,10 @@ impl HealRoutine {
let lock = layer.read().await;
let store = lock.as_ref().expect("Not init");
if task.object.is_empty() {
match store
.heal_object(&task.bucket, &task.object, &task.version_id, &task.opts)
.await
{
Ok((res, err)) => {
match store.heal_bucket(&task.bucket, &task.opts).await {
Ok(res) => {
d_res = res;
d_err = err;
d_err = None;
}
Err(err) => d_err = Some(err),
}
@@ -292,7 +405,7 @@ impl HealRoutine {
let _ = resp_tx
.send(HealResult {
result: d_res,
_err: d_err,
err: d_err,
})
.await;
} else {
@@ -325,5 +438,5 @@ async fn heal_disk_format(opts: HealOpts) -> Result<(HealResultItem, Option<Erro
if err.is_some() {
return Ok((HealResultItem::default(), err));
}
return Ok((res, err));
Ok((res, err))
}

View File

@@ -115,23 +115,27 @@ async fn run_data_scanner() {
let mut buf = read_config(store, &DATA_USAGE_BLOOM_NAME_PATH)
.await
.map_or(Vec::new(), |buf| buf);
if buf.len() == 8 {
cycle_info.next = match Cursor::new(buf).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
} else if buf.len() > 8 {
cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
let _ = cycle_info.unmarshal_msg(&buf.split_off(8));
match buf.len().cmp(&8) {
std::cmp::Ordering::Less => {}
std::cmp::Ordering::Equal => {
cycle_info.next = match Cursor::new(buf).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
}
std::cmp::Ordering::Greater => {
cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::<LittleEndian>() {
Ok(buf) => buf,
Err(_) => {
error!("can not decode DATA_USAGE_BLOOM_NAME_PATH");
return;
}
};
let _ = cycle_info.unmarshal_msg(&buf.split_off(8));
}
}
loop {
@@ -377,11 +381,7 @@ pub struct ScannerItem {
impl ScannerItem {
pub fn transform_meda_dir(&mut self) {
let split = self
.prefix
.split(SLASH_SEPARATOR)
.map(|s| PathBuf::from(s))
.collect::<Vec<_>>();
let split = self.prefix.split(SLASH_SEPARATOR).map(PathBuf::from).collect::<Vec<_>>();
if split.len() > 1 {
self.prefix = path_join(&split[0..split.len() - 1]).to_string_lossy().to_string();
} else {
@@ -700,13 +700,14 @@ impl FolderScanner {
// Scan existing...
for folder in existing_folders.iter() {
let h = hash_path(&folder.name);
if !into.compacted && self.old_cache.is_compacted(&h) {
if !h.mod_(self.old_cache.info.next_cycle, DATA_USAGE_UPDATE_DIR_CYCLES) {
self.new_cache
.copy_with_children(&self.old_cache, &h, &Some(folder.parent.clone()));
into.add_child(&h);
continue;
}
if !into.compacted
&& self.old_cache.is_compacted(&h)
&& !h.mod_(self.old_cache.info.next_cycle, DATA_USAGE_UPDATE_DIR_CYCLES)
{
self.new_cache
.copy_with_children(&self.old_cache, &h, &Some(folder.parent.clone()));
into.add_child(&h);
continue;
}
(self.update_current_path)(&folder.name).await;
scan(folder, into, self).await;
@@ -748,7 +749,7 @@ impl FolderScanner {
(self.update_current_path)(k).await;
if bucket != resolver.bucket {
let _ = bg_seq
bg_seq
.clone()
.write()
.await
@@ -841,7 +842,7 @@ impl FolderScanner {
}
} else {
let mut w = found_objs_clone.write().await;
*w = *w || true;
*w = true;
}
return;
}
@@ -866,10 +867,13 @@ impl FolderScanner {
{
Ok(_) => {
success_versions += 1;
let mut w = found_objs_clone.write().await;
*w = *w || true;
*w = true;
}
Err(_) => {
fail_versions += 1;
}
Err(_) => fail_versions += 1,
}
}
custom.insert("success_versions", success_versions.to_string());
@@ -901,14 +905,11 @@ impl FolderScanner {
break;
}
if !was_compacted {
self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), &into);
self.new_cache.replace_hashed(&this_hash, &Some(folder.parent.clone()), into);
}
if !into.compacted && self.new_cache.info.name != folder.name {
let mut flat = self
.new_cache
.size_recursive(&this_hash.key())
.unwrap_or(DataUsageEntry::default());
let mut flat = self.new_cache.size_recursive(&this_hash.key()).unwrap_or_default();
flat.compacted = true;
let compact = if flat.objects < DATA_SCANNER_COMPACT_LEAST_OBJECT.try_into().unwrap() {
true
@@ -1079,3 +1080,5 @@ pub async fn scan_data_folder(
close_disk().await;
Ok(s.new_cache)
}
// pub fn eval_action_from_lifecycle(lc: &BucketLifecycleConfiguration, lr: &ObjectLockConfiguration, rcfg: &ReplicationConfiguration obj: &ObjectInfo)

View File

@@ -101,10 +101,11 @@ impl LockedLastMinuteLatency {
{
let old = self.cached.clone();
self.cached = AccElem::default();
let mut a = AccElem::default();
a.size = old.size;
a.total = old.total;
a.n = old.n;
let a = AccElem {
size: old.size,
total: old.total,
n: old.n,
};
let _ = self.mu.write().await;
self.latency.add_all(t - 1, &a);
}
@@ -130,6 +131,12 @@ pub struct ScannerMetrics {
current_paths: HashMap<String, String>,
}
impl Default for ScannerMetrics {
fn default() -> Self {
Self::new()
}
}
impl ScannerMetrics {
pub fn new() -> Self {
Self {

View File

@@ -122,7 +122,7 @@ pub async fn store_data_usage_in_backend(mut rx: Receiver<DataUsageInfo>) {
Some(data_usage_info) => {
if let Ok(data) = serde_json::to_vec(&data_usage_info) {
if attempts > 10 {
let _ = save_config(store, &format!("{}{}", DATA_USAGE_OBJ_NAME_PATH.to_string(), ".bkp"), &data).await;
let _ = save_config(store, &format!("{}{}", *DATA_USAGE_OBJ_NAME_PATH, ".bkp"), &data).await;
attempts += 1;
}
let _ = save_config(store, &DATA_USAGE_OBJ_NAME_PATH, &data).await;

View File

@@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::path::Path;
use std::time::{Duration, SystemTime};
use std::u64;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
@@ -158,7 +157,7 @@ impl SizeHistogram {
for (count, oh) in self.0.iter().zip(OBJECTS_HISTOGRAM_INTERVALS.iter()) {
if ByteSize::kib(1).as_u64() == oh.start && oh.end == ByteSize::mib(1).as_u64() - 1 {
res.insert(oh.name.to_string(), spl_count);
} else if ByteSize::kib(1).as_u64() <= oh.start && oh.end <= ByteSize::mib(1).as_u64() - 1 {
} else if ByteSize::kib(1).as_u64() <= oh.start && oh.end < ByteSize::mib(1).as_u64() {
spl_count += count;
res.insert(oh.name.to_string(), *count);
} else {
@@ -310,7 +309,7 @@ impl DataUsageEntry {
s_rep.replica_size += o_rep.replica_size;
s_rep.replica_count += o_rep.replica_count;
for (arn, stat) in o_rep.targets.iter() {
let st = s_rep.targets.entry(arn.clone()).or_insert(ReplicationStats::default());
let st = s_rep.targets.entry(arn.clone()).or_default();
*st = ReplicationStats {
pending_size: stat.pending_size + st.pending_size,
failed_size: stat.failed_size + st.failed_size,
@@ -456,7 +455,7 @@ impl DataUsageCache {
tokio::spawn(async move {
let _ = save_config(&store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await;
});
save_config(&store, name, &buf).await
save_config(store, name, &buf).await
}
pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) {
@@ -465,7 +464,7 @@ impl DataUsageCache {
if !parent.is_empty() {
let phash = hash_path(parent);
let p = {
let p = self.cache.entry(phash.key()).or_insert(DataUsageEntry::default());
let p = self.cache.entry(phash.key()).or_default();
p.add_child(&hash);
p.clone()
};
@@ -476,10 +475,7 @@ impl DataUsageCache {
pub fn replace_hashed(&mut self, hash: &DataUsageHash, parent: &Option<DataUsageHash>, e: &DataUsageEntry) {
self.cache.insert(hash.key(), e.clone());
if let Some(parent) = parent {
self.cache
.entry(parent.key())
.or_insert(DataUsageEntry::default())
.add_child(hash);
self.cache.entry(parent.key()).or_default().add_child(hash);
}
}
@@ -488,11 +484,7 @@ impl DataUsageCache {
}
pub fn find_children_copy(&mut self, h: DataUsageHash) -> DataUsageHashMap {
self.cache
.entry(h.string())
.or_insert(DataUsageEntry::default())
.children
.clone()
self.cache.entry(h.string()).or_default().children.clone()
}
pub fn flatten(&self, root: &DataUsageEntry) -> DataUsageEntry {
@@ -511,21 +503,18 @@ impl DataUsageCache {
}
pub fn copy_with_children(&mut self, src: &DataUsageCache, hash: &DataUsageHash, parent: &Option<DataUsageHash>) {
match src.cache.get(&hash.string()) {
Some(e) => {
self.cache.insert(hash.key(), e.clone());
for ch in e.children.iter() {
if *ch == hash.key() {
return;
}
self.copy_with_children(src, &DataUsageHash(ch.to_string()), &Some(hash.clone()));
}
if let Some(parent) = parent {
let p = self.cache.entry(parent.key()).or_insert(DataUsageEntry::default());
p.add_child(hash);
if let Some(e) = src.cache.get(&hash.string()) {
self.cache.insert(hash.key(), e.clone());
for ch in e.children.iter() {
if *ch == hash.key() {
return;
}
self.copy_with_children(src, &DataUsageHash(ch.to_string()), &Some(hash.clone()));
}
if let Some(parent) = parent {
let p = self.cache.entry(parent.key()).or_default();
p.add_child(hash);
}
None => return,
}
}
@@ -552,7 +541,7 @@ impl DataUsageCache {
if flat.replication_stats.is_some() && flat.replication_stats.as_ref().unwrap().empty() {
flat.replication_stats = None;
}
return Some(flat);
Some(flat)
}
None => None,
}
@@ -623,7 +612,7 @@ impl DataUsageCache {
}
if e.children.len() > limit && compact_self {
let mut flat = self.size_recursive(&path.key()).unwrap_or(DataUsageEntry::default());
let mut flat = self.size_recursive(&path.key()).unwrap_or_default();
flat.compacted = true;
self.delete_recursive(path);
self.replace_hashed(path, &None, &flat);
@@ -639,7 +628,7 @@ impl DataUsageCache {
add(self, path, &mut leaves);
leaves.sort_by(|a, b| a.objects.cmp(&b.objects));
while remove > 0 && leaves.len() > 0 {
while remove > 0 && !leaves.is_empty() {
let e = leaves.first().unwrap();
let candidate = e.path.clone();
if candidate == *path && !compact_self {
@@ -676,7 +665,7 @@ impl DataUsageCache {
let mut n = root.children.len();
for ch in root.children.iter() {
n += self.total_children_rec(&ch);
n += self.total_children_rec(ch);
}
n
}
@@ -724,7 +713,7 @@ impl DataUsageCache {
None => return DataUsageInfo::default(),
};
let flat = self.flatten(&e);
let dui = DataUsageInfo {
DataUsageInfo {
last_update: self.info.last_update,
objects_total_count: flat.objects as u64,
versions_total_count: flat.versions as u64,
@@ -733,8 +722,7 @@ impl DataUsageCache {
buckets_count: e.children.len() as u64,
buckets_usage: self.buckets_usage_info(buckets),
..Default::default()
};
dui
}
}
pub fn buckets_usage_info(&self, buckets: &[BucketInfo]) -> HashMap<String, BucketUsageInfo> {
@@ -807,9 +795,7 @@ fn add(data_usage_cache: &DataUsageCache, path: &DataUsageHash, leaves: &mut Vec
return;
}
let sz = data_usage_cache
.size_recursive(&path.key())
.unwrap_or(DataUsageEntry::default());
let sz = data_usage_cache.size_recursive(&path.key()).unwrap_or_default();
leaves.push(Inner {
objects: sz.objects,
path: path.clone(),

View File

@@ -1,2 +1,5 @@
pub const ERR_IGNORE_FILE_CONTRIB: &str = "ignore this file's contribution toward data-usage";
pub const ERR_SKIP_FILE: &str = "skip this file";
pub const ERR_HEAL_STOP_SIGNALLED: &str = "heal stop signaled";
pub const ERR_HEAL_IDLE_TIMEOUT: &str = "healing results were not consumed for too long";
pub const ERR_RETRY_HEALING: &str = "some items failed to heal, we will retry healing this drive again";

View File

@@ -1,9 +1,8 @@
use std::{
path::Path,
time::{SystemTime, UNIX_EPOCH},
};
use std::{path::Path, time::SystemTime};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::RwLock;
use crate::{
@@ -38,7 +37,11 @@ pub const DRIVE_STATE_ROOT_MOUNT: &str = "root-mount";
pub const DRIVE_STATE_UNKNOWN: &str = "unknown";
pub const DRIVE_STATE_UNFORMATTED: &str = "unformatted"; // only returned by disk
#[derive(Clone, Copy, Debug, Default)]
lazy_static! {
pub static ref TIME_SENTINEL: OffsetDateTime = OffsetDateTime::from_unix_timestamp(0).unwrap();
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize)]
pub struct HealOpts {
pub recursive: bool,
pub dry_run: bool,
@@ -102,8 +105,8 @@ pub struct HealingDisk {
pub disk_index: Option<usize>,
pub endpoint: String,
pub path: String,
pub started: u64,
pub last_update: u64,
pub started: Option<OffsetDateTime>,
pub last_update: Option<SystemTime>,
pub retry_attempts: u64,
pub objects_total_count: u64,
pub objects_total_size: u64,
@@ -132,8 +135,8 @@ pub struct HealingTracker {
pub disk_index: Option<usize>,
pub path: String,
pub endpoint: String,
pub started: u64,
pub last_update: u64,
pub started: Option<OffsetDateTime>,
pub last_update: Option<SystemTime>,
pub objects_total_count: u64,
pub objects_total_size: u64,
pub items_healed: u64,
@@ -161,8 +164,7 @@ pub struct HealingTracker {
impl HealingTracker {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self)
.map_err(|err| Error::from_string(err.to_string()))
serde_json::to_vec(self).map_err(|err| Error::from_string(err.to_string()))
}
pub fn unmarshal_msg(data: &[u8]) -> Result<Self> {
@@ -187,7 +189,7 @@ impl HealingTracker {
self.object = String::new();
}
pub async fn get_last_update(&self) -> u64 {
pub async fn get_last_update(&self) -> Option<SystemTime> {
let _ = self.mu.read().await;
self.last_update
@@ -234,7 +236,7 @@ impl HealingTracker {
pub async fn update(&mut self) -> Result<()> {
if let Some(disk) = &self.disk {
if healing(&disk.path().to_string_lossy().to_string()).await?.is_none() {
if healing(disk.path().to_string_lossy().as_ref()).await?.is_none() {
return Err(Error::from_string(format!("healingTracker: drive {} is not marked as healing", self.id)));
}
let _ = self.mu.write().await;
@@ -262,14 +264,11 @@ impl HealingTracker {
(self.pool_index, self.set_index, self.disk_index) = store.get_pool_and_set(&self.id).await?;
}
self.last_update = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
self.last_update = Some(SystemTime::now());
let htracker_bytes = self.marshal_msg()?;
GLOBAL_BackgroundHealState.write().await.update_heal_status(&self).await;
GLOBAL_BackgroundHealState.write().await.update_heal_status(self).await;
if let Some(disk) = &self.disk {
let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME);
@@ -280,7 +279,7 @@ impl HealingTracker {
Ok(())
}
async fn delete(&self) -> Result<()> {
pub async fn delete(&self) -> Result<()> {
if let Some(disk) = &self.disk {
let file_path = Path::new(BUCKET_META_PREFIX).join(HEALING_TRACKER_FILENAME);
return disk
@@ -299,7 +298,7 @@ impl HealingTracker {
Ok(())
}
async fn is_healed(&self, bucket: &str) -> bool {
pub async fn is_healed(&self, bucket: &str) -> bool {
let _ = self.mu.read().await;
for v in self.healed_buckets.iter() {
if v == bucket {
@@ -310,7 +309,7 @@ impl HealingTracker {
false
}
async fn resume(&mut self) {
pub async fn resume(&mut self) {
let _ = self.mu.write().await;
self.items_healed = self.resume_items_healed;
@@ -321,7 +320,7 @@ impl HealingTracker {
self.bytes_skipped = self.resume_bytes_skipped;
}
async fn bucket_done(&mut self, bucket: &str) {
pub async fn bucket_done(&mut self, bucket: &str) {
let _ = self.mu.write().await;
self.resume_items_healed = self.items_healed;
@@ -383,34 +382,34 @@ impl Clone for HealingTracker {
Self {
disk: self.disk.clone(),
id: self.id.clone(),
pool_index: self.pool_index.clone(),
set_index: self.set_index.clone(),
disk_index: self.disk_index.clone(),
pool_index: self.pool_index,
set_index: self.set_index,
disk_index: self.disk_index,
path: self.path.clone(),
endpoint: self.endpoint.clone(),
started: self.started.clone(),
last_update: self.last_update.clone(),
objects_total_count: self.objects_total_count.clone(),
objects_total_size: self.objects_total_size.clone(),
items_healed: self.items_healed.clone(),
items_failed: self.items_failed.clone(),
item_skipped: self.item_skipped.clone(),
bytes_done: self.bytes_done.clone(),
bytes_failed: self.bytes_failed.clone(),
bytes_skipped: self.bytes_skipped.clone(),
started: self.started,
last_update: self.last_update,
objects_total_count: self.objects_total_count,
objects_total_size: self.objects_total_size,
items_healed: self.items_healed,
items_failed: self.items_failed,
item_skipped: self.item_skipped,
bytes_done: self.bytes_done,
bytes_failed: self.bytes_failed,
bytes_skipped: self.bytes_skipped,
bucket: self.bucket.clone(),
object: self.object.clone(),
resume_items_healed: self.resume_items_healed.clone(),
resume_items_failed: self.resume_items_failed.clone(),
resume_items_skipped: self.resume_items_skipped.clone(),
resume_bytes_done: self.resume_bytes_done.clone(),
resume_bytes_failed: self.resume_bytes_failed.clone(),
resume_bytes_skipped: self.resume_bytes_skipped.clone(),
resume_items_healed: self.resume_items_healed,
resume_items_failed: self.resume_items_failed,
resume_items_skipped: self.resume_items_skipped,
resume_bytes_done: self.resume_bytes_done,
resume_bytes_failed: self.resume_bytes_failed,
resume_bytes_skipped: self.resume_bytes_skipped,
queue_buckets: self.queue_buckets.clone(),
healed_buckets: self.healed_buckets.clone(),
heal_id: self.heal_id.clone(),
retry_attempts: self.retry_attempts.clone(),
finished: self.finished.clone(),
retry_attempts: self.retry_attempts,
finished: self.finished,
mu: RwLock::new(false),
}
}
@@ -441,22 +440,19 @@ pub async fn load_healing_tracker(disk: &Option<DiskStore>) -> Result<HealingTra
}
pub async fn init_healing_tracker(disk: DiskStore, heal_id: &str) -> Result<HealingTracker> {
let mut healing_tracker = HealingTracker::default();
healing_tracker.id = disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string());
healing_tracker.heal_id = heal_id.to_string();
healing_tracker.path = disk.to_string();
healing_tracker.endpoint = disk.endpoint().to_string();
healing_tracker.started = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let disk_location = disk.get_disk_location();
healing_tracker.pool_index = disk_location.pool_idx;
healing_tracker.set_index = disk_location.set_idx;
healing_tracker.disk_index = disk_location.disk_idx;
healing_tracker.disk = Some(disk);
Ok(healing_tracker)
Ok(HealingTracker {
id: disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string()),
heal_id: heal_id.to_string(),
path: disk.to_string(),
endpoint: disk.endpoint().to_string(),
started: Some(OffsetDateTime::now_utc()),
pool_index: disk_location.pool_idx,
set_index: disk_location.set_idx,
disk_index: disk_location.disk_idx,
disk: Some(disk),
..Default::default()
})
}
pub async fn healing(derive_path: &str) -> Result<Option<HealingTracker>> {

View File

@@ -1,12 +1,34 @@
use super::{
background_heal_ops::HealTask,
data_scanner::HEAL_DELETE_DANGLING,
error::ERR_SKIP_FILE,
heal_commands::{
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker,
HEAL_ITEM_BUCKET_METADATA,
},
};
use crate::heal::heal_commands::{HEAL_ITEM_BUCKET, HEAL_ITEM_OBJECT};
use crate::store_api::StorageAPI;
use crate::{
config::common::CONFIG_PREFIX,
disk::RUSTFS_META_BUCKET,
global::GLOBAL_BackgroundHealRoutine,
heal::{
error::ERR_HEAL_STOP_SIGNALLED,
heal_commands::{HealDriveInfo, DRIVE_STATE_OK},
},
};
use crate::{
disk::{endpoint::Endpoint, MetaCacheEntry},
endpoints::Endpoints,
error::{Error, Result},
global::GLOBAL_IsDistErasure,
heal::heal_commands::{HealStartSuccess, HEAL_UNKNOWN_SCAN},
new_object_layer_fn,
utils::path::has_profix,
};
use lazy_static::lazy_static;
use s3s::{S3Error, S3ErrorCode};
use std::{
collections::HashMap,
future::Future,
@@ -24,19 +46,13 @@ use tokio::{
},
time::{interval, sleep},
};
use tracing::info;
use uuid::Uuid;
use super::{
background_heal_ops::HealTask,
data_scanner::HEAL_DELETE_DANGLING,
heal_commands::{HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker},
};
type HealStatusSummary = String;
type ItemsMap = HashMap<HealItemType, usize>;
pub type HealObjectFn = Arc<dyn Fn(&str, &str, &str, HealScanMode) -> Result<()> + Send + Sync>;
pub type HealEntryFn =
Box<dyn Fn(String, MetaCacheEntry, HealScanMode) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send>;
Arc<dyn Fn(String, MetaCacheEntry, HealScanMode) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync + 'static>;
pub const BG_HEALING_UUID: &str = "0000-0000-0000-0000";
pub const HEALING_TRACKER_FILENAME: &str = ".healing.bin";
@@ -174,19 +190,19 @@ impl HealSequence {
}
impl HealSequence {
fn get_scanned_items_count(&self) -> usize {
fn _get_scanned_items_count(&self) -> usize {
self.scanned_items_map.values().sum()
}
fn get_scanned_items_map(&self) -> ItemsMap {
fn _get_scanned_items_map(&self) -> ItemsMap {
self.scanned_items_map.clone()
}
fn get_healed_items_map(&self) -> ItemsMap {
fn _get_healed_items_map(&self) -> ItemsMap {
self.healed_items_map.clone()
}
fn get_heal_failed_items_map(&self) -> ItemsMap {
fn _get_heal_failed_items_map(&self) -> ItemsMap {
self.heal_failed_items_map.clone()
}
@@ -223,11 +239,11 @@ impl HealSequence {
}
async fn has_ended(&self) -> bool {
if self.client_token == BG_HEALING_UUID.to_string() {
if self.client_token == *BG_HEALING_UUID {
return false;
}
!(*(self.end_time.read().await) == self.start_time)
*(self.end_time.read().await) != self.start_time
}
async fn stop(&self) {
@@ -238,6 +254,7 @@ impl HealSequence {
async fn push_heal_result_item(&self, r: &HealResultItem) -> Result<()> {
let mut r = r.clone();
let mut interval_timer = interval(HEAL_UNCONSUMED_TIMEOUT);
#[allow(unused_assignments)]
let mut items_len = 0;
loop {
{
@@ -282,31 +299,108 @@ impl HealSequence {
task.opts.scan_mode = HEAL_UNKNOWN_SCAN;
}
self.count_scanned(heal_type);
self.count_scanned(heal_type.clone());
if source.no_wait {}
todo!()
}
fn heal_disk_meta() -> Result<()> {
todo!()
}
fn heal_items(&self, buckets_only: bool) -> Result<()> {
if self.client_token == BG_HEALING_UUID.to_string() {
if source.no_wait {
let task_str = format!("{:?}", task);
if GLOBAL_BackgroundHealRoutine.read().await.tasks_tx.try_send(task).is_ok() {
info!("Task in the queue: {:?}", task_str);
}
return Ok(());
}
todo!()
let (resp_tx, mut resp_rx) = mpsc::channel(1);
task.resp_tx = Some(resp_tx);
let task_str = format!("{:?}", task);
if GLOBAL_BackgroundHealRoutine.read().await.tasks_tx.try_send(task).is_ok() {
info!("Task in the queue: {:?}", task_str);
}
let count_ok_drives = |drivers: &[HealDriveInfo]| {
let mut count = 0;
for drive in drivers.iter() {
if drive.state == DRIVE_STATE_OK {
count += 1;
}
}
count
};
loop {
match resp_rx.recv().await {
Some(mut res) => {
if res.err.is_none() {
self.count_healed(heal_type.clone());
} else {
self.count_failed(heal_type.clone());
}
if !self.report_progress {
if let Some(err) = res.err {
if err.to_string() == ERR_SKIP_FILE {
return Ok(());
}
return Err(err);
} else {
return Ok(());
}
}
res.result.heal_item_type = heal_type.clone();
if let Some(err) = res.err.as_ref() {
res.result.detail = err.to_string();
}
if res.result.parity_blocks > 0
&& res.result.data_blocks > 0
&& res.result.data_blocks > res.result.parity_blocks
{
let got = count_ok_drives(&res.result.after);
if got < res.result.parity_blocks {
res.result.detail = format!(
"quorum loss - expected {} minimum, got drive states in OK {}",
res.result.parity_blocks, got
);
}
}
return self.push_heal_result_item(&res.result).await;
}
None => return Ok(()),
}
}
}
async fn traverse_and_heal(&self) {
async fn heal_disk_meta(h: Arc<RwLock<HealSequence>>) -> Result<()> {
HealSequence::heal_rustfs_sys_meta(h, CONFIG_PREFIX).await
}
async fn heal_items(h: Arc<RwLock<HealSequence>>, buckets_only: bool) -> Result<()> {
if h.read().await.client_token == *BG_HEALING_UUID {
return Ok(());
}
Self::heal_disk_meta(h.clone()).await?;
let bucket = h.read().await.bucket.clone();
Self::heal_bucket(h.clone(), &bucket, buckets_only).await
}
async fn traverse_and_heal(h: Arc<RwLock<HealSequence>>) {
let buckets_only = false;
let result = match Self::heal_items(h.clone(), buckets_only).await {
Ok(_) => None,
Err(err) => Some(err),
};
let _ = h.read().await.traverse_and_heal_done_tx.read().await.send(result).await;
}
fn heal_rustfs_sys_meta(&self, meta_prefix: String) -> Result<()> {
todo!()
async fn heal_rustfs_sys_meta(h: Arc<RwLock<HealSequence>>, meta_prefix: &str) -> Result<()> {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::from(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()))),
};
let setting = h.read().await.setting;
store
.heal_objects(RUSTFS_META_BUCKET, meta_prefix, &setting, h.clone(), true)
.await
}
async fn is_done(&self) -> bool {
@@ -316,14 +410,101 @@ impl HealSequence {
}
false
}
pub async fn heal_bucket(hs: Arc<RwLock<HealSequence>>, bucket: &str, bucket_only: bool) -> Result<()> {
let (object, setting) = {
let mut hs_w = hs.write().await;
hs_w.queue_heal_task(
HealSource {
bucket: bucket.to_string(),
..Default::default()
},
HEAL_ITEM_BUCKET.to_string(),
)
.await?;
if bucket_only {
return Ok(());
}
if !hs_w.setting.recursive {
if !hs_w.object.is_empty() {
HealSequence::heal_object(hs.clone(), bucket, &hs_w.object, "", hs_w.setting.scan_mode).await?;
}
return Ok(());
}
(hs_w.object.clone(), hs_w.setting)
};
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::from(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()))),
};
store.heal_objects(bucket, &object, &setting, hs.clone(), false).await
}
pub async fn heal_object(
hs: Arc<RwLock<HealSequence>>,
bucket: &str,
object: &str,
version_id: &str,
_scan_mode: HealScanMode,
) -> Result<()> {
let mut hs_w = hs.write().await;
if hs_w.is_quitting().await {
return Err(Error::from_string(ERR_HEAL_STOP_SIGNALLED));
}
let setting = hs_w.setting;
hs_w.queue_heal_task(
HealSource {
bucket: bucket.to_string(),
object: object.to_string(),
version_id: version_id.to_string(),
opts: Some(setting),
..Default::default()
},
HEAL_ITEM_OBJECT.to_string(),
)
.await?;
Ok(())
}
pub async fn heal_meta_object(
hs: Arc<RwLock<HealSequence>>,
bucket: &str,
object: &str,
version_id: &str,
_scan_mode: HealScanMode,
) -> Result<()> {
let mut hs_w = hs.write().await;
if hs_w.is_quitting().await {
return Err(Error::from_string(ERR_HEAL_STOP_SIGNALLED));
}
hs_w.queue_heal_task(
HealSource {
bucket: bucket.to_string(),
object: object.to_string(),
version_id: version_id.to_string(),
..Default::default()
},
HEAL_ITEM_BUCKET_METADATA.to_string(),
)
.await?;
Ok(())
}
}
pub async fn heal_sequence_start(h: Arc<RwLock<HealSequence>>) {
let r = h.read().await;
{
let mut current_status_w = r.current_status.write().await;
(*current_status_w).summary = HEAL_RUNNING_STATUS.to_string();
(*current_status_w).start_time = SystemTime::now()
current_status_w.summary = HEAL_RUNNING_STATUS.to_string();
current_status_w.start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
@@ -331,7 +512,7 @@ pub async fn heal_sequence_start(h: Arc<RwLock<HealSequence>>) {
let h_clone = h.clone();
spawn(async move {
h_clone.read().await.traverse_and_heal().await;
HealSequence::traverse_and_heal(h_clone).await;
});
let h_clone_1 = h.clone();
@@ -339,32 +520,27 @@ pub async fn heal_sequence_start(h: Arc<RwLock<HealSequence>>) {
select! {
_ = r.is_done() => {
*(r.end_time.write().await) = SystemTime::now();
let mut current_status_w = r.current_status.write().await;
(*current_status_w).summary = HEAL_FINISHED_STATUS.to_string();
let mut current_status_w = r.current_status.write().await;
current_status_w.summary = HEAL_FINISHED_STATUS.to_string();
spawn(async move {
let binding = h_clone_1.read().await;
let mut rx_w = binding.traverse_and_heal_done_rx.write().await;
rx_w.recv().await;
spawn(async move {
let binding = h_clone_1.read().await;
let mut rx_w = binding.traverse_and_heal_done_rx.write().await;
rx_w.recv().await;
});
}
result = x.recv() => {
match result {
Some(err) => {
match err {
Some(err) => {
let mut current_status_w = r.current_status.write().await;
(current_status_w).summary = HEAL_STOPPED_STATUS.to_string();
(current_status_w).failure_detail = err.to_string();
},
None => {
let mut current_status_w = r.current_status.write().await;
(current_status_w).summary = HEAL_FINISHED_STATUS.to_string();
}
if let Some(err) = result {
match err {
Some(err) => {
let mut current_status_w = r.current_status.write().await;
(current_status_w).summary = HEAL_STOPPED_STATUS.to_string();
(current_status_w).failure_detail = err.to_string();
},
None => {
let mut current_status_w = r.current_status.write().await;
(current_status_w).summary = HEAL_FINISHED_STATUS.to_string();
}
},
None => {
}
}
@@ -555,18 +731,16 @@ impl AllHealState {
let path_s = path.to_str().unwrap();
if r.force_started {
self.stop_heal_sequence(path_s).await?;
} else {
if let Some(hs) = self.get_heal_sequence(path_s).await {
if !hs.read().await.has_ended().await {
return Err(Error::from_string(format!("Heal is already running on the given path (use force-start option to stop and start afresh). The heal was started by IP {} at {:?}, token is {}", r.client_address, r.start_time, r.client_token)));
}
} else if let Some(hs) = self.get_heal_sequence(path_s).await {
if !hs.read().await.has_ended().await {
return Err(Error::from_string(format!("Heal is already running on the given path (use force-start option to stop and start afresh). The heal was started by IP {} at {:?}, token is {}", r.client_address, r.start_time, r.client_token)));
}
}
let _ = self.mu.write().await;
for (k, v) in self.heal_seq_map.iter() {
if !v.read().await.has_ended().await && (has_profix(&k, path_s) || has_profix(path_s, &k)) {
if !v.read().await.has_ended().await && (has_profix(k, path_s) || has_profix(path_s, k)) {
return Err(Error::from_string(format!(
"The provided heal sequence path overlaps with an existing heal path: {}",
k

View File

@@ -77,7 +77,7 @@ fn get_default_opts(
pub fn extract_metadata(headers: &HeaderMap<HeaderValue>) -> HashMap<String, String> {
let mut metadata = HashMap::new();
extract_metadata_from_mime(&headers, &mut metadata);
extract_metadata_from_mime(headers, &mut metadata);
metadata
}

View File

@@ -1,14 +1,26 @@
use async_trait::async_trait;
use futures::future::join_all;
use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest};
use protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::RwLock;
use tonic::Request;
use tracing::warn;
use crate::disk::DiskAPI;
use crate::disk::error::{is_all_buckets_not_found, is_all_not_found};
use crate::disk::{DiskAPI, DiskStore};
use crate::global::GLOBAL_LOCAL_DISK_MAP;
use crate::heal::heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK,
HEAL_ITEM_BUCKET,
};
use crate::heal::heal_ops::RUESTFS_RESERVED_BUCKET;
use crate::quorum::{bucket_op_ignored_errs, reduce_write_quorum_errs};
use crate::store::all_local_disk;
use crate::utils::wildcard::is_rustfs_meta_bucket_name;
use crate::{
disk::{self, error::DiskError, VolumeInfo},
endpoints::{EndpointServerPools, Node},
@@ -20,6 +32,7 @@ type Client = Arc<Box<dyn PeerS3Client>>;
#[async_trait]
pub trait PeerS3Client: Debug + Sync + Send + 'static {
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem>;
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>;
@@ -61,6 +74,79 @@ impl S3PeerSys {
}
impl S3PeerSys {
pub async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
let mut opts = *opts;
let mut futures = Vec::with_capacity(self.clients.len());
for client in self.clients.iter() {
// client_clon
futures.push(async move {
match client.get_bucket_info(bucket, &BucketOptions::default()).await {
Ok(_) => None,
Err(err) => Some(err),
}
});
}
let errs = join_all(futures).await;
let mut pool_errs = Vec::new();
for pool_idx in 0..self.pools_count {
let mut per_pool_errs = Vec::new();
for (i, client) in self.clients.iter().enumerate() {
if let Some(v) = client.get_pools() {
if v.contains(&pool_idx) {
per_pool_errs.push(errs[i].clone());
}
}
}
let qu = per_pool_errs.len() / 2;
pool_errs.push(reduce_write_quorum_errs(&per_pool_errs, &bucket_op_ignored_errs(), qu));
}
if !opts.recreate {
opts.remove = is_all_not_found(&pool_errs);
opts.recursive = !opts.remove;
}
let mut futures = Vec::new();
let heal_bucket_results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.clients.len()]));
for (idx, client) in self.clients.iter().enumerate() {
let opts_clone = opts;
let heal_bucket_results_clone = heal_bucket_results.clone();
futures.push(async move {
match client.heal_bucket(bucket, &opts_clone).await {
Ok(res) => {
heal_bucket_results_clone.write().await[idx] = res;
None
}
Err(err) => Some(err),
}
});
}
let errs = join_all(futures).await;
for pool_idx in 0..self.pools_count {
let mut per_pool_errs = Vec::new();
for (i, client) in self.clients.iter().enumerate() {
if let Some(v) = client.get_pools() {
if v.contains(&pool_idx) {
per_pool_errs.push(errs[i].clone());
}
}
}
let qu = per_pool_errs.len() / 2;
if let Some(pool_err) = reduce_write_quorum_errs(&per_pool_errs, &bucket_op_ignored_errs(), qu) {
return Err(pool_err);
}
}
for (i, err) in errs.iter().enumerate() {
if err.is_none() {
return Ok(heal_bucket_results.read().await[i].clone());
}
}
Err(DiskError::VolumeNotFound.into())
}
pub async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
@@ -236,6 +322,11 @@ impl PeerS3Client for LocalPeerS3Client {
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
heal_bucket_local(bucket, opts).await
}
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let local_disks = all_local_disk().await;
@@ -420,6 +511,29 @@ impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
let options: String = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
let request = Request::new(HealBucketRequest {
bucket: bucket.to_string(),
options,
});
let response = client.heal_bucket(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or_default()));
}
Ok(HealResultItem {
heal_item_type: HEAL_ITEM_BUCKET.to_string(),
bucket: bucket.to_string(),
set_count: 0,
..Default::default()
})
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
@@ -538,3 +652,134 @@ pub fn is_reserved_or_invalid_bucket(bucket_entry: &str, strict: bool) -> bool {
result || is_meta_bucket(bucket_entry) || is_reserved_bucket(bucket_entry)
}
pub async fn heal_bucket_local(bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
let disks = clone_drives().await;
let before_state = Arc::new(RwLock::new(vec![String::new(); disks.len()]));
let after_state = Arc::new(RwLock::new(vec![String::new(); disks.len()]));
let mut futures = Vec::new();
for (index, disk) in disks.iter().enumerate() {
let disk = disk.clone();
let bucket = bucket.to_string();
let bs_clone = before_state.clone();
let as_clone = after_state.clone();
futures.push(async move {
let disk = match disk {
Some(disk) => disk,
None => {
bs_clone.write().await[index] = DRIVE_STATE_OFFLINE.to_string();
as_clone.write().await[index] = DRIVE_STATE_OFFLINE.to_string();
return Some(Error::new(DiskError::DiskNotFound));
}
};
bs_clone.write().await[index] = DRIVE_STATE_OK.to_string();
as_clone.write().await[index] = DRIVE_STATE_OK.to_string();
if bucket == RUESTFS_RESERVED_BUCKET {
return None;
}
match disk.stat_volume(&bucket).await {
Ok(_) => None,
Err(err) => match err.downcast_ref() {
Some(DiskError::DiskNotFound) => {
bs_clone.write().await[index] = DRIVE_STATE_OFFLINE.to_string();
as_clone.write().await[index] = DRIVE_STATE_OFFLINE.to_string();
Some(err)
}
Some(DiskError::VolumeNotFound) => {
bs_clone.write().await[index] = DRIVE_STATE_MISSING.to_string();
as_clone.write().await[index] = DRIVE_STATE_MISSING.to_string();
Some(err)
}
_ => {
bs_clone.write().await[index] = DRIVE_STATE_CORRUPT.to_string();
as_clone.write().await[index] = DRIVE_STATE_CORRUPT.to_string();
Some(err)
}
},
}
});
}
let errs = join_all(futures).await;
let mut res = HealResultItem {
heal_item_type: HEAL_ITEM_BUCKET.to_string(),
bucket: bucket.to_string(),
disk_count: disks.len(),
set_count: 0,
..Default::default()
};
if opts.dry_run {
return Ok(res);
}
for (disk, state) in disks.iter().zip(before_state.read().await.iter()) {
res.before.push(HealDriveInfo {
uuid: "".to_string(),
endpoint: disk.clone().map(|s| s.to_string()).unwrap_or_default(),
state: state.to_string(),
});
}
if !is_rustfs_meta_bucket_name(bucket) && !is_all_buckets_not_found(&errs) && opts.remove {
let mut futures = Vec::new();
for disk in disks.iter() {
let disk = disk.clone();
let bucket = bucket.to_string();
futures.push(async move {
match disk {
Some(disk) => {
let _ = disk.delete_volume(&bucket).await;
None
}
None => Some(Error::new(DiskError::DiskNotFound)),
}
});
}
let _ = join_all(futures).await;
}
if !opts.remove {
let mut futures = Vec::new();
for (idx, disk) in disks.iter().enumerate() {
let disk = disk.clone();
let bucket = bucket.to_string();
let bs_clone = before_state.clone();
let as_clone = after_state.clone();
let errs_clone = errs.clone();
futures.push(async move {
if bs_clone.read().await[idx] == DRIVE_STATE_MISSING {
match disk.as_ref().unwrap().make_volume(&bucket).await {
Ok(_) => {
as_clone.write().await[idx] = DRIVE_STATE_OK.to_string();
return None;
}
Err(err) => {
return Some(err);
}
}
}
errs_clone[idx].clone()
});
}
let _ = join_all(futures).await;
}
for (disk, state) in disks.iter().zip(after_state.read().await.iter()) {
res.before.push(HealDriveInfo {
uuid: "".to_string(),
endpoint: disk.clone().map(|s| s.to_string()).unwrap_or_default(),
state: state.to_string(),
});
}
Ok(res)
}
async fn clone_drives() -> Vec<Option<DiskStore>> {
GLOBAL_LOCAL_DISK_MAP.read().await.values().cloned().collect::<Vec<_>>()
}

View File

@@ -40,6 +40,16 @@ pub fn object_op_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
base
}
// bucket_op_ignored_errs
pub fn bucket_op_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
let mut base = base_ignored_errs();
let ext: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(DiskError::DiskAccessDenied), Box::new(DiskError::UnformattedDisk)];
base.extend(ext);
base
}
// 用于检查错误是否被忽略的函数
fn is_err_ignored(err: &Error, ignored_errs: &[Box<dyn CheckErrorFn>]) -> bool {
ignored_errs.iter().any(|ignored_err| ignored_err.is(err))

View File

@@ -17,12 +17,9 @@ use crate::{
endpoints::{Endpoints, PoolEndpoints},
error::{Error, Result},
global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
heal::{
heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
DRIVE_STATE_OK, HEAL_ITEM_METADATA,
},
heal_ops::HealObjectFn,
heal::heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK,
HEAL_ITEM_METADATA,
},
set_disk::SetDisks,
store_api::{
@@ -34,6 +31,7 @@ use crate::{
utils::hash,
};
use crate::heal::heal_ops::HealSequence;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
@@ -179,7 +177,7 @@ impl Sets {
self.connect_disks().await;
// TODO: config interval
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(15 * 3));
let mut interval = tokio::time::interval(Duration::from_secs(15 * 3));
let cloned_token = self.ctx.clone();
loop {
tokio::select! {
@@ -549,11 +547,9 @@ impl StorageAPI for Sets {
}
// Save new formats `format.json` on unformatted disks.
for (fm, disk) in tmp_new_formats.iter_mut().zip(disks.iter()) {
if fm.is_some() && disk.is_some() {
if save_format_file(disk, fm, &format_op_id).await.is_err() {
let _ = disk.as_ref().unwrap().close().await;
*fm = None;
}
if fm.is_some() && disk.is_some() && save_format_file(disk, fm, &format_op_id).await.is_err() {
let _ = disk.as_ref().unwrap().close().await;
*fm = None;
}
}
@@ -591,7 +587,14 @@ impl StorageAPI for Sets {
.heal_object(bucket, object, version_id, opts)
.await
}
async fn heal_objects(&self, _bucket: &str, _prefix: &str, _opts: &HealOpts, _func: HealObjectFn) -> Result<()> {
async fn heal_objects(
&self,
_bucket: &str,
_prefix: &str,
_opts: &HealOpts,
_hs: Arc<RwLock<HealSequence>>,
_is_meta: bool,
) -> Result<()> {
unimplemented!()
}
async fn get_pool_and_set(&self, _id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)> {
@@ -604,13 +607,11 @@ impl StorageAPI for Sets {
async fn _close_storage_disks(disks: &[Option<DiskStore>]) {
let mut futures = Vec::with_capacity(disks.len());
for disk in disks.iter() {
if let Some(disk) = disk {
let disk = disk.clone();
futures.push(tokio::spawn(async move {
let _ = disk.close().await;
}));
}
for disk in disks.iter().flatten() {
let disk = disk.clone();
futures.push(tokio::spawn(async move {
let _ = disk.close().await;
}));
}
let _ = join_all(futures).await;
}
@@ -690,19 +691,16 @@ fn new_heal_format_sets(
for (i, set) in ref_format.erasure.sets.iter().enumerate() {
for j in 0..set.len() {
if let Some(Some(err)) = errs.get(i * set_drive_count + j) {
match err.downcast_ref::<DiskError>() {
Some(DiskError::UnformattedDisk) => {
let mut fm = FormatV3::new(set_count, set_drive_count);
fm.id = ref_format.id;
fm.format = ref_format.format.clone();
fm.version = ref_format.version.clone();
fm.erasure.this = ref_format.erasure.sets[i][j];
fm.erasure.sets = ref_format.erasure.sets.clone();
fm.erasure.version = ref_format.erasure.version.clone();
fm.erasure.distribution_algo = ref_format.erasure.distribution_algo.clone();
new_formats[i][j] = Some(fm);
}
_ => {}
if let Some(DiskError::UnformattedDisk) = err.downcast_ref::<DiskError>() {
let mut fm = FormatV3::new(set_count, set_drive_count);
fm.id = ref_format.id;
fm.format = ref_format.format.clone();
fm.version = ref_format.version.clone();
fm.erasure.this = ref_format.erasure.sets[i][j];
fm.erasure.sets = ref_format.erasure.sets.clone();
fm.erasure.version = ref_format.erasure.version.clone();
fm.erasure.distribution_algo = ref_format.erasure.distribution_algo.clone();
new_formats[i][j] = Some(fm);
}
}
if let (Some(format), None) = (&formats[i * set_drive_count + j], &errs[i * set_drive_count + j]) {

View File

@@ -11,7 +11,7 @@ use crate::global::{
};
use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT};
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
use crate::heal::heal_ops::HealObjectFn;
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::new_object_layer_fn;
use crate::store_api::{ListMultipartsInfo, ObjectIO};
use crate::store_err::{
@@ -43,6 +43,7 @@ use http::HeaderMap;
use lazy_static::lazy_static;
use rand::Rng;
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
use s3s::{S3Error, S3ErrorCode};
use std::cmp::Ordering;
use std::slice::Iter;
use std::time::SystemTime;
@@ -535,7 +536,7 @@ impl ECStore {
}
});
if let Err(err) = set
.ns_scanner(&all_buckets_clone, want_cycle.try_into().unwrap(), tx, heal_scan_mode)
.ns_scanner(&all_buckets_clone, want_cycle as u32, tx, heal_scan_mode)
.await
{
let mut f_w = first_err_clone.write().await;
@@ -554,7 +555,7 @@ impl ECStore {
let mut ctx_clone = cancel.subscribe();
let all_buckets_clone = all_buckets.clone();
let task = tokio::spawn(async move {
let mut last_update = None;
let mut last_update: Option<SystemTime> = None;
let mut interval = interval(Duration::from_secs(30));
let all_merged = Arc::new(RwLock::new(DataUsageCache::default()));
loop {
@@ -563,17 +564,11 @@ impl ECStore {
return;
}
_ = update_close_rx.recv() => {
last_update = match tokio::spawn(update_scan(all_merged.clone(), results.clone(), last_update.clone(), all_buckets_clone.clone(), updates.clone())).await {
Ok(v) => v,
Err(_) => return,
};
update_scan(all_merged.clone(), results.clone(), &mut last_update, all_buckets_clone.clone(), updates.clone()).await;
return;
}
_ = interval.tick() => {
last_update = match tokio::spawn(update_scan(all_merged.clone(), results.clone(), last_update.clone(), all_buckets_clone.clone(), updates.clone())).await {
Ok(v) => v,
Err(_) => return,
};
update_scan(all_merged.clone(), results.clone(), &mut last_update, all_buckets_clone.clone(), updates.clone()).await;
}
}
}
@@ -616,8 +611,7 @@ impl ECStore {
let mut idx_res = Vec::with_capacity(self.pools.len());
let mut idx = 0;
for result in results {
for (idx, result) in results.into_iter().enumerate() {
match result {
Ok(res) => {
idx_res.push(IndexRes {
@@ -634,8 +628,6 @@ impl ECStore {
});
}
}
idx += 1;
}
// TODO: test order
@@ -693,10 +685,10 @@ impl ECStore {
async fn update_scan(
all_merged: Arc<RwLock<DataUsageCache>>,
results: Arc<RwLock<Vec<DataUsageCache>>>,
last_update: Option<SystemTime>,
last_update: &mut Option<SystemTime>,
all_buckets: Vec<BucketInfo>,
updates: Sender<DataUsageInfo>,
) -> Option<SystemTime> {
) {
let mut w = all_merged.write().await;
*w = DataUsageCache {
info: DataUsageCacheInfo {
@@ -707,15 +699,14 @@ async fn update_scan(
};
for info in results.read().await.iter() {
if info.info.last_update.is_none() {
return last_update;
return;
}
w.merge(info);
}
if w.info.last_update > last_update && w.root().is_none() {
if w.info.last_update > *last_update && w.root().is_none() {
let _ = updates.send(w.dui(&w.info.name, &all_buckets)).await;
return w.info.last_update;
*last_update = w.info.last_update;
}
last_update
}
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
@@ -1379,7 +1370,7 @@ impl StorageAPI for ECStore {
Ok(ListMultipartsInfo {
key_marker: key_marker.to_owned(),
upload_id_marker: upload_id_marker.to_owned(),
max_uploads: max_uploads,
max_uploads,
uploads,
prefix: prefix.to_owned(),
delimiter: delimiter.to_owned(),
@@ -1543,8 +1534,8 @@ impl StorageAPI for ECStore {
Ok((r, None))
}
async fn heal_bucket(&self, _bucket: &str, _opts: &HealOpts) -> Result<HealResultItem> {
unimplemented!()
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
self.peer_sys.heal_bucket(bucket, opts).await
}
async fn heal_object(
&self,
@@ -1604,7 +1595,95 @@ impl StorageAPI for ECStore {
Ok((HealResultItem::default(), Some(Error::new(DiskError::FileNotFound))))
}
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()> {
async fn heal_objects(
&self,
bucket: &str,
prefix: &str,
opts: &HealOpts,
hs: Arc<RwLock<HealSequence>>,
is_meta: bool,
) -> Result<()> {
let opts_clone = *opts;
let heal_entry: HealEntryFn = Arc::new(move |bucket: String, entry: MetaCacheEntry, scan_mode: HealScanMode| {
let opts_clone = opts_clone;
let hs_clone = hs.clone();
Box::pin(async move {
if entry.is_dir() {
return Ok(());
}
if bucket == RUSTFS_META_BUCKET
&& Pattern::new("buckets/*/.metacache/*")
.map(|p| p.matches(&entry.name))
.unwrap_or(false)
|| Pattern::new("tmp/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
|| Pattern::new("multipart/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
|| Pattern::new("tmp-old/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
{
return Ok(());
}
let fivs = match entry.file_info_versions(&bucket) {
Ok(fivs) => fivs,
Err(_) => {
if is_meta {
return HealSequence::heal_meta_object(hs_clone.clone(), &bucket, &entry.name, "", scan_mode).await;
} else {
return HealSequence::heal_object(hs_clone.clone(), &bucket, &entry.name, "", scan_mode).await;
}
}
};
if opts_clone.remove && !opts_clone.dry_run {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => {
return Err(Error::from(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())))
}
};
if let Err(err) = store.check_abandoned_parts(&bucket, &entry.name, &opts_clone).await {
info!("unable to check object {}/{} for abandoned data: {}", bucket, entry.name, err.to_string());
}
}
for version in fivs.versions.iter() {
if is_meta {
if let Err(err) = HealSequence::heal_meta_object(
hs_clone.clone(),
&bucket,
&version.name,
&version.version_id.map(|v| v.to_string()).unwrap_or("".to_string()),
scan_mode,
)
.await
{
match err.downcast_ref() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => {
return Err(err);
}
}
}
} else if let Err(err) = HealSequence::heal_object(
hs_clone.clone(),
&bucket,
&version.name,
&version.version_id.map(|v| v.to_string()).unwrap_or("".to_string()),
scan_mode,
)
.await
{
match err.downcast_ref() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => {
return Err(err);
}
}
}
}
Ok(())
})
});
let mut first_err = None;
for (idx, pool) in self.pools.iter().enumerate() {
if opts.pool.is_some() && opts.pool.unwrap() != idx {
@@ -1617,7 +1696,7 @@ impl StorageAPI for ECStore {
continue;
}
if let Err(err) = set.list_and_heal(bucket, prefix, opts, func.clone()).await {
if let Err(err) = set.list_and_heal(bucket, prefix, opts, heal_entry.clone()).await {
if first_err.is_none() {
first_err = Some(err)
}
@@ -1855,67 +1934,6 @@ fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
Ok(())
}
pub async fn heal_entry(
bucket: String,
entry: MetaCacheEntry,
scan_mode: HealScanMode,
opts: HealOpts,
func: HealObjectFn,
) -> Result<()> {
if entry.is_dir() {
return Ok(());
}
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.rustfs.sys'
if bucket == RUSTFS_META_BUCKET {
if Pattern::new("buckets/*/.metacache/*")
.map(|p| p.matches(&entry.name))
.unwrap_or(false)
|| Pattern::new("tmp/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
|| Pattern::new("multipart/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
|| Pattern::new("tmp-old/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
{
return Ok(());
}
}
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(Error::msg("errServerNotInitialized")),
};
match entry.file_info_versions(&bucket) {
Ok(fivs) => {
if opts.remove && !opts.dry_run {
if let Err(err) = store.check_abandoned_parts(&bucket, &entry.name, &opts).await {
return Err(Error::from_string(format!(
"unable to check object {}/{} for abandoned data: {}",
bucket, entry.name, err
)));
}
}
for version in fivs.versions.iter() {
let version_id = version.version_id.map_or("".to_string(), |version_id| version_id.to_string());
if let Err(err) = func(&bucket, &entry.name, &version_id, scan_mode) {
match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => return Err(err),
}
}
}
}
Err(_) => {
return func(&bucket, &entry.name, "", scan_mode);
}
}
Ok(())
}
async fn get_disk_infos(disks: &[Option<DiskStore>]) -> Vec<Option<DiskInfo>> {
let opts = &DiskInfoOptions::default();
let mut res = vec![None; disks.len()];

View File

@@ -1,12 +1,8 @@
use std::collections::HashMap;
use crate::heal::heal_ops::HealSequence;
use crate::{
disk::DiskStore,
error::{Error, Result},
heal::{
heal_commands::{HealOpts, HealResultItem},
heal_ops::HealObjectFn,
},
heal::heal_commands::{HealOpts, HealResultItem},
utils::path::decode_dir_object,
xhttp,
};
@@ -15,7 +11,10 @@ use http::HeaderMap;
use rmp_serde::Serializer;
use s3s::dto::StreamingBlob;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use uuid::Uuid;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
@@ -920,7 +919,14 @@ pub trait StorageAPI: ObjectIO {
version_id: &str,
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)>;
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()>;
async fn heal_objects(
&self,
bucket: &str,
prefix: &str,
opts: &HealOpts,
hs: Arc<RwLock<HealSequence>>,
is_meta: bool,
) -> Result<()>;
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)>;
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;
}

View File

@@ -2,14 +2,8 @@ use crate::error::{Error, Result};
pub fn parse_bool(str: &str) -> Result<bool> {
match str {
"1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => {
return Ok(true);
}
"0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => {
return Ok(false);
}
_ => {
return Err(Error::from_string(format!("ParseBool: parsing {}", str)));
}
"1" | "t" | "T" | "true" | "TRUE" | "True" | "on" | "ON" | "On" | "enabled" => Ok(true),
"0" | "f" | "F" | "false" | "FALSE" | "False" | "off" | "OFF" | "Off" | "disabled" => Ok(false),
_ => Err(Error::from_string(format!("ParseBool: parsing {}", str))),
}
}

View File

@@ -40,6 +40,10 @@ impl Pattern {
pub fn len(&self) -> usize {
self.seq.len()
}
pub fn is_empty(&self) -> bool {
self.seq.is_empty()
}
}
/// contains a list of patterns provided in the input.

View File

@@ -1,3 +1,5 @@
use crate::disk::RUSTFS_META_BUCKET;
pub fn match_simple(pattern: &str, name: &str) -> bool {
if pattern.is_empty() {
return name == pattern;
@@ -65,3 +67,7 @@ pub fn match_as_pattern_prefix(pattern: &str, text: &str) -> bool {
}
text.len() <= pattern.len()
}
pub fn is_rustfs_meta_bucket_name(bucket: &str) -> bool {
bucket.starts_with(RUSTFS_META_BUCKET)
}

View File

@@ -6,7 +6,7 @@ use ecstore::{
UpdateMetadataOpts, WalkDirOptions,
},
erasure::Writer,
heal::data_usage_cache::DataUsageCache,
heal::{data_usage_cache::DataUsageCache, heal_commands::HealOpts},
peer::{LocalPeerS3Client, PeerS3Client},
store::{all_local_disk_path, find_local_disk},
store_api::{BucketOptions, DeleteBucketOptions, FileInfo, MakeBucketOptions},
@@ -21,14 +21,14 @@ use protos::{
DeleteBucketResponse, DeletePathsRequest, DeletePathsResponse, DeleteRequest, DeleteResponse, DeleteVersionRequest,
DeleteVersionResponse, DeleteVersionsRequest, DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse,
DiskInfoRequest, DiskInfoResponse, GenerallyLockRequest, GenerallyLockResponse, GetBucketInfoRequest,
GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest,
ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest,
MakeVolumesResponse, NsScannerRequest, NsScannerResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse,
ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse,
ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse,
RenamePartRequst, RenamePartResponse, StatVolumeRequest, StatVolumeResponse, UpdateMetadataRequest,
UpdateMetadataResponse, VerifyFileRequest, VerifyFileResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest,
WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse,
GetBucketInfoResponse, HealBucketRequest, HealBucketResponse, ListBucketRequest, ListBucketResponse, ListDirRequest,
ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest,
MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, NsScannerRequest, NsScannerResponse, PingRequest,
PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse,
ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse,
RenameFileRequst, RenameFileResponse, RenamePartRequst, RenamePartResponse, StatVolumeRequest, StatVolumeResponse,
UpdateMetadataRequest, UpdateMetadataResponse, VerifyFileRequest, VerifyFileResponse, WalkDirRequest, WalkDirResponse,
WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse,
},
};
use tokio::sync::mpsc;
@@ -110,6 +110,32 @@ impl Node for NodeService {
}))
}
async fn heal_bucket(&self, request: Request<HealBucketRequest>) -> Result<Response<HealBucketResponse>, Status> {
debug!("heal bucket");
let request = request.into_inner();
let options = match serde_json::from_str::<HealOpts>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(HealBucketResponse {
success: false,
error_info: Some(format!("decode HealOpts failed: {}", err)),
}))
}
};
match self.local_peer.heal_bucket(&request.bucket, &options).await {
Ok(_) => Ok(tonic::Response::new(HealBucketResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(HealBucketResponse {
success: false,
error_info: Some(format!("heal bucket failed: {}", err)),
})),
}
}
async fn list_bucket(&self, request: Request<ListBucketRequest>) -> Result<Response<ListBucketResponse>, Status> {
debug!("list bucket");

View File

@@ -51,7 +51,7 @@ impl S3Access for FS {
let req_info = ReqInfo {
card: cx.credentials().cloned(),
action: action,
action,
..Default::default()
};

View File

@@ -341,15 +341,14 @@ impl S3 for FS {
let content_type = {
if let Some(content_type) = info.content_type {
let ct = match ContentType::from_str(&content_type) {
match ContentType::from_str(&content_type) {
Ok(res) => Some(res),
Err(err) => {
error!("parse content-type err {} {:?}", &content_type, err);
//
None
}
};
ct
}
} else {
None
}
@@ -411,15 +410,14 @@ impl S3 for FS {
let content_type = {
if let Some(content_type) = info.content_type {
let ct = match ContentType::from_str(&content_type) {
match ContentType::from_str(&content_type) {
Ok(res) => Some(res),
Err(err) => {
error!("parse content-type err {} {:?}", &content_type, err);
//
None
}
};
ct
}
} else {
None
}
@@ -1536,9 +1534,7 @@ impl S3 for FS {
}
}
let mut grants = Vec::new();
grants.push(Grant {
let grants = vec![Grant {
grantee: Some(Grantee {
type_: Type::from_static(Type::CANONICAL_USER),
display_name: None,
@@ -1547,12 +1543,11 @@ impl S3 for FS {
uri: None,
}),
permission: Some(Permission::from_static(Permission::FULL_CONTROL)),
});
}];
Ok(S3Response::new(GetBucketAclOutput {
grants: Some(grants),
owner: Some(RUSTFS_OWNER.to_owned()),
..Default::default()
}))
}
@@ -1589,7 +1584,7 @@ impl S3 for FS {
v.grants.is_some_and(|gs| {
//
!gs.is_empty()
&& gs.get(0).is_some_and(|g| {
&& gs.first().is_some_and(|g| {
g.to_owned()
.permission
.is_some_and(|p| p.as_str() == Permission::FULL_CONTROL)
@@ -1617,9 +1612,7 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
let mut grants = Vec::new();
grants.push(Grant {
let grants = vec![Grant {
grantee: Some(Grantee {
type_: Type::from_static(Type::CANONICAL_USER),
display_name: None,
@@ -1628,7 +1621,7 @@ impl S3 for FS {
uri: None,
}),
permission: Some(Permission::from_static(Permission::FULL_CONTROL)),
});
}];
Ok(S3Response::new(GetObjectAclOutput {
grants: Some(grants),
@@ -1665,7 +1658,7 @@ impl S3 for FS {
v.grants.is_some_and(|gs| {
//
!gs.is_empty()
&& gs.get(0).is_some_and(|g| {
&& gs.first().is_some_and(|g| {
g.to_owned()
.permission
.is_some_and(|p| p.as_str() == Permission::FULL_CONTROL)