This commit is contained in:
junxiang Mu
2024-10-15 10:29:05 +08:00
parent 7723419142
commit e4f5ca7ea8
18 changed files with 2204 additions and 474 deletions

7
Cargo.lock generated
View File

@@ -478,6 +478,7 @@ dependencies = [
"common",
"crc32fast",
"futures",
"glob",
"hex-simd",
"http",
"lazy_static",
@@ -713,6 +714,12 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gloo-timers"
version = "0.3.0"

View File

@@ -1,9 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
}
}
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -345,6 +345,17 @@ message DeleteVolumeResponse {
optional string error_info = 2;
}
message DiskInfoRequest {
string disk = 1;
string opts = 2;
}
message DiskInfoResponse {
bool success = 1;
string disk_info = 2;
optional string error_info = 3;
}
// lock api have same argument type
message GenerallyLockRequest {
string args = 1;
@@ -392,6 +403,7 @@ service NodeService {
rpc DeleteVersions(DeleteVersionsRequest) returns (DeleteVersionsResponse) {};
rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {};
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {};
rpc DiskInfo(DiskInfoRequest) returns (DiskInfoResponse) {};
/* -------------------------------lock service-------------------------- */

View File

@@ -12,9 +12,10 @@ rust-version.workspace = true
async-trait.workspace = true
backon.workspace = true
bytes.workspace = true
common.workspace = true
glob = "0.3.1"
thiserror.workspace = true
futures.workspace = true
common.workspace = true
tracing.workspace = true
serde.workspace = true
time.workspace = true

View File

@@ -19,6 +19,7 @@ use crate::config::common::{read_config, save_config};
use crate::error::{Error, Result};
use crate::disk::BUCKET_META_PREFIX;
use crate::file_meta::FileMetaShallowVersion;
use crate::store::ECStore;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
@@ -397,6 +398,16 @@ where
s.serialize_bytes(&buf)
}
#[derive(Debug, Default)]
pub struct MetadataResolutionParams {
pub dir_quorum: usize,
pub obj_quorum: usize,
pub requested_versions: usize,
pub bucket: String,
pub strict: bool,
pub candidates: Vec<Vec<FileMetaShallowVersion>>,
}
#[cfg(test)]
mod test {

View File

@@ -0,0 +1,88 @@
use std::{
ptr,
sync::{atomic::{AtomicPtr, AtomicU64, Ordering}, Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::spawn;
use crate::error::{Error, Result};
type UpdateFn<T> = Box<dyn Fn() -> Result<T>>;
#[derive(Clone, Debug, Default)]
pub struct Opts {
return_last_good: bool,
no_wait: bool,
}
pub struct Cache<T: Copy> {
update_fn: UpdateFn<T>,
ttl: Duration,
opts: Opts,
val: AtomicPtr<T>,
last_update_ms: AtomicU64,
updating: Arc<Mutex<bool>>,
}
impl<T: Copy> Cache<T> {
pub fn new(update_fn: UpdateFn<T>, ttl: Duration, opts: Opts) -> Self {
let val = AtomicPtr::new(ptr::null_mut());
Self {
update_fn,
ttl,
opts,
val,
last_update_ms: AtomicU64::new(0),
updating: Arc::new(Mutex::new(false)),
}
}
pub fn get(&self) -> Result<T> {
let v_ptr = self.val.load(Ordering::SeqCst);
let v = if v_ptr.is_null() { None } else { Some(unsafe { *v_ptr }) };
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
if v.is_some() && now - self.last_update_ms.load(Ordering::SeqCst) < self.ttl.as_secs() {
return Ok(v.unwrap());
}
if self.opts.no_wait && v.is_some() && now - self.last_update_ms.load(Ordering::SeqCst) < self.ttl.as_secs() * 2 {
if self.updating.try_lock().is_ok() {
let this = self.clone();
spawn(async {
let _ = this.update().await;
});
self.update();
}
return Ok(v.unwrap());
}
match self.updating.lock() {
Ok(_) => {
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH + Duration::from_secs(self.last_update_ms.load(Ordering::SeqCst))) {
if duration < self.ttl {
return Ok(v.unwrap());
}
}
match self.update() {
}
},
Err(err) => {
return Err(Error::from_string(err.to_string()));
}
}
todo!()
}
async fn update(&self) -> Result<()> {
todo!()
}
}

View File

@@ -0,0 +1 @@
pub mod cache;

View File

@@ -1,8 +1,9 @@
use super::error::{is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files, os_is_not_exist, os_is_permission};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
os, DeleteOptions, DiskAPI, DiskLocation, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq,
ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
os, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskMetrics, FileInfoVersions, FileReader, FileWriter,
MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo,
WalkDirOptions,
};
use crate::disk::error::{
convert_access_error, is_sys_err_handle_invalid, is_sys_err_invalid_arg, is_sys_err_is_dir, is_sys_err_not_dir,
@@ -560,6 +561,10 @@ impl LocalDisk {
Ok(f)
}
fn get_metrics(&self) -> DiskMetrics {
DiskMetrics::default()
}
}
fn is_root_path(path: impl AsRef<Path>) -> bool {
@@ -1599,6 +1604,12 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo> {
let mut info = DiskInfo::default();
Ok(info)
}
}
#[cfg(test)]

View File

@@ -26,7 +26,7 @@ use protos::proto_gen::node_service::{
node_service_client::NodeServiceClient, ReadAtRequest, ReadAtResponse, WriteRequest, WriteResponse,
};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc};
use std::{collections::HashMap, fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc, usize};
use time::OffsetDateTime;
use tokio::{
fs::File,
@@ -133,6 +133,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
// CleanAbandonedData
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()>;
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>>;
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo>;
}
#[derive(Debug, Serialize, Deserialize)]
@@ -152,6 +153,45 @@ impl DiskLocation {
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct DiskInfoOptions {
pub disk_id: String,
pub metrics: bool,
pub noop: bool,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct DiskInfo {
pub total: u64,
pub free: u64,
pub used: u64,
pub used_inodes: u64,
pub free_inodes: u64,
pub major: u32,
pub minor: u32,
pub nr_requests: u64,
pub fs_type: String,
pub root_disk: bool,
pub healing: bool,
pub scanning: bool,
pub endpoint: String,
pub mount_path: String,
pub id: String,
pub rotational: bool,
pub metrics: DiskMetrics,
pub error: String,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct DiskMetrics {
api_calls: HashMap<String, u64>,
total_waiting: u32,
total_errors_availability: u64,
total_errors_timeout: u64,
total_writes: u64,
total_deletes: u64,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct FileInfoVersions {
// Name of the volume.
@@ -263,6 +303,26 @@ impl MetaCacheEntry {
return Ok(Some(fi));
}
pub fn file_info_versions(&self, bucket: &str) -> Result<FileInfoVersions> {
if self.is_dir() {
return Ok(FileInfoVersions {
volume: bucket.to_string(),
name: self.name.clone(),
versions: vec![FileInfo {
volume: bucket.to_string(),
name: self.name.clone(),
..Default::default()
}],
..Default::default()
});
}
let mut fm = FileMeta::new();
fm.unmarshal_msg(&self.metadata)?;
Ok(fm.into_file_info_versions(bucket, self.name.as_str(), false)?)
}
}
#[derive(Debug, Default)]

View File

@@ -4,10 +4,10 @@ use futures::lock::Mutex;
use protos::{
node_service_time_out_client,
proto_gen::node_service::{
DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest,
ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest,
ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, UpdateMetadataRequest, WalkDirRequest,
WriteAllRequest, WriteMetadataRequest,
DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, DiskInfoRequest,
ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest,
ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, UpdateMetadataRequest,
WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
},
};
use tonic::Request;
@@ -15,9 +15,9 @@ use tracing::info;
use uuid::Uuid;
use super::{
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskLocation, DiskOption, FileInfoVersions, FileReader, FileWriter,
MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp,
UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
endpoint::Endpoint, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter,
RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
};
use crate::{
disk::error::DiskError,
@@ -658,4 +658,26 @@ impl DiskAPI for RemoteDisk {
Ok(())
}
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo> {
info!("delete_volume");
let opts = serde_json::to_string(&opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
let request = Request::new(DiskInfoRequest {
disk: self.root.to_string_lossy().to_string(),
opts,
});
let response = client.disk_info(request).await?.into_inner();
if !response.success {
return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
}
let disk_info = serde_json::from_str::<DiskInfo>(&response.disk_info)?;
Ok(disk_info)
}
}

View File

@@ -0,0 +1,39 @@
pub type HealScanMode = usize;
pub type HealItemType = String;
#[derive(Clone, Copy, Debug, Default)]
pub struct HealOpts {
pub recursive: bool,
pub dry_run: bool,
pub remove: bool,
pub recreate: bool,
pub scan_mode: HealScanMode,
pub update_parity: bool,
pub no_lock: bool,
pub pool: Option<usize>,
pub set: Option<usize>,
}
#[derive(Debug)]
struct HealDriveInfo {
uuid: String,
endpoint: String,
state: String,
}
#[derive(Debug)]
pub struct HealResultItem {
pub result_index: usize,
pub heal_item_type: HealItemType,
pub bucket: String,
pub object: String,
pub version_id: String,
pub detail: String,
pub parity_blocks: usize,
pub data_blocks: usize,
pub disk_count: usize,
pub set_count: usize,
pub before: Vec<HealDriveInfo>,
pub after: Vec<HealDriveInfo>,
pub object_size: usize,
}

View File

@@ -0,0 +1,178 @@
use crate::{
disk::MetaCacheEntry,
error::{Error, Result},
};
use lazy_static::lazy_static;
use std::{collections::HashMap, time::Instant};
use uuid::Uuid;
use super::heal_commands::{HealItemType, HealOpts, HealResultItem, HealScanMode};
type HealStatusSummary = String;
type ItemsMap = HashMap<HealItemType, usize>;
pub type HealObjectFn = Box<dyn Fn(&str, &str, &str, HealScanMode) -> Result<()> + Send>;
pub type HealEntryFn = Box<dyn Fn(String, MetaCacheEntry, HealScanMode) -> Result<()> + Send>;
lazy_static! {
static ref HEAL_NOT_STARTED_STATUS: HealStatusSummary = String::from("not started");
static ref bgHealingUUID: String = String::from("0000-0000-0000-0000");
}
lazy_static! {}
#[derive(Debug)]
pub struct HealSequenceStatus {
summary: HealStatusSummary,
failure_detail: String,
start_time: Instant,
heal_setting: HealOpts,
items: Vec<HealResultItem>,
}
impl Default for HealSequenceStatus {
fn default() -> Self {
Self {
summary: Default::default(),
failure_detail: Default::default(),
start_time: Instant::now(),
heal_setting: Default::default(),
items: Default::default(),
}
}
}
#[derive(Debug)]
pub struct HealSequence {
pub bucket: String,
pub object: String,
pub report_progress: bool,
pub start_time: Instant,
pub end_time: Instant,
pub client_token: String,
pub client_address: String,
pub force_started: bool,
pub setting: HealOpts,
pub current_status: HealSequenceStatus,
pub last_sent_result_index: usize,
pub scanned_items_map: ItemsMap,
pub healed_items_map: ItemsMap,
pub heal_failed_items_map: ItemsMap,
pub last_heal_activity: Instant,
}
impl HealSequence {
pub fn new(bucket: &str, obj_profix: &str, client_addr: &str, hs: HealOpts, force_start: bool) -> Self {
let client_token = Uuid::new_v4().to_string();
Self {
bucket: bucket.to_string(),
object: obj_profix.to_string(),
report_progress: true,
client_token,
client_address: client_addr.to_string(),
force_started: force_start,
setting: hs,
current_status: HealSequenceStatus {
summary: HEAL_NOT_STARTED_STATUS.to_string(),
heal_setting: hs,
..Default::default()
},
..Default::default()
}
}
}
impl HealSequence {
fn get_scanned_items_count(&self) -> usize {
self.scanned_items_map.values().sum()
}
fn get_scanned_items_map(&self) -> ItemsMap {
self.scanned_items_map.clone()
}
fn get_healed_items_map(&self) -> ItemsMap {
self.healed_items_map.clone()
}
fn get_heal_failed_items_map(&self) -> ItemsMap {
self.heal_failed_items_map.clone()
}
fn count_failed(&mut self, heal_type: HealItemType) {
*self.heal_failed_items_map.entry(heal_type).or_insert(0) += 1;
self.last_heal_activity = Instant::now();
}
fn count_scanned(&mut self, heal_type: HealItemType) {
*self.scanned_items_map.entry(heal_type).or_insert(0) += 1;
self.last_heal_activity = Instant::now()
}
fn count_healed(&mut self, heal_type: HealItemType) {
*self.healed_items_map.entry(heal_type).or_insert(0) += 1;
self.last_heal_activity = Instant::now()
}
fn is_quitting(&self) -> bool {
todo!()
}
fn has_ended(&self) -> bool {
if self.client_token == bgHealingUUID.to_string() {
return false;
}
!(self.end_time == self.start_time)
}
fn stop(&self) {
todo!()
}
fn push_heal_result_item(&self, r: HealResultItem) -> Result<()> {
todo!()
}
fn heal_disk_meta() -> Result<()> {
todo!()
}
fn heal_items(&self, buckets_only: bool) -> Result<()> {
if self.client_token == bgHealingUUID.to_string() {
return Ok(());
}
todo!()
}
fn traverse_and_heal(&self) {
let buckets_only = false;
}
fn heal_rustfs_sys_meta(&self, meta_prefix: String) -> Result<()> {
todo!()
}
}
impl Default for HealSequence {
fn default() -> Self {
Self {
bucket: Default::default(),
object: Default::default(),
report_progress: Default::default(),
start_time: Instant::now(),
end_time: Instant::now(),
client_token: Default::default(),
client_address: Default::default(),
force_started: Default::default(),
setting: Default::default(),
current_status: Default::default(),
last_sent_result_index: Default::default(),
scanned_items_map: Default::default(),
healed_items_map: Default::default(),
heal_failed_items_map: Default::default(),
last_heal_activity: Instant::now(),
}
}
}

2
ecstore/src/heal/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod heal_commands;
pub mod heal_ops;

View File

@@ -1,3 +1,4 @@
pub mod cache_value;
mod chunk_stream;
mod config;
pub mod disk;
@@ -7,6 +8,7 @@ pub mod erasure;
pub mod error;
mod file_meta;
mod global;
mod heal;
pub mod peer;
mod quorum;
pub mod set_disk;

View File

@@ -3,7 +3,10 @@
use crate::bucket::metadata;
use crate::bucket::metadata_sys::set_bucket_metadata;
use crate::disk::endpoint::EndpointType;
use crate::disk::MetaCacheEntry;
use crate::global::{is_dist_erasure, set_object_layer, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES};
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode};
use crate::heal::heal_ops::HealObjectFn;
use crate::store_api::ObjectIO;
use crate::{
bucket::metadata::BucketMetadata,
@@ -23,6 +26,7 @@ use crate::{
use backon::{ExponentialBuilder, Retryable};
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port};
use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
use s3s::dto::{ObjectLockConfiguration, ObjectLockEnabled};
use std::{
@@ -776,6 +780,138 @@ impl StorageAPI for ECStore {
}
unimplemented!()
}
async fn heal_format(&self, dry_run: bool) -> Result<HealResultItem> {
unimplemented!()
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
unimplemented!()
}
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<HealResultItem> {
let object = utils::path::encode_dir_object(object);
let mut errs = HashMap::new();
let mut results = HashMap::new();
for (idx, pool) in self.pools.iter().enumerate() {
//TODO: IsSuspended
match pool.heal_object(bucket, &object, version_id, opts).await {
Ok(mut result) => {
result.object = utils::path::decode_dir_object(&result.object);
results.insert(idx, result);
}
Err(err) => {
errs.insert(idx, err);
}
}
}
// Return the first nil error
for i in 0..self.pools.len() {
if errs.get(&i).is_none() {
return Ok(results.remove(&i).unwrap());
}
}
// No pool returned a nil error, return the first non 'not found' error
for (k, err) in errs.iter() {
match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => return Ok(results.remove(&k).unwrap()),
}
}
// At this stage, all errors are 'not found'
if !version_id.is_empty() {
return Err(Error::new(DiskError::FileVersionNotFound));
}
Err(Error::new(DiskError::FileNotFound))
}
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()> {
let heal_entry = |bucket: String, entry: MetaCacheEntry, scan_mode: HealScanMode| async move {
if entry.is_dir() {
return Ok(());
}
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.rustfs.sys'
if bucket == RUSTFS_META_BUCKET {
if Pattern::new("buckets/*/.metacache/*")
.map(|p| p.matches(&entry.name))
.unwrap_or(false)
|| Pattern::new("tmp/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
|| Pattern::new("multipart/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
|| Pattern::new("tmp-old/*").map(|p| p.matches(&entry.name)).unwrap_or(false)
{
return Ok(());
}
}
match entry.file_info_versions(&bucket) {
Ok(fivs) => {
if opts.remove && !opts.dry_run {
if let Err(err) = self.check_abandoned_parts(&bucket, &entry.name, opts).await {
return Err(Error::from_string(format!(
"unable to check object {}/{} for abandoned data: {}",
bucket, entry.name, err
)));
}
}
for version in fivs.versions.iter() {
let version_id = version.version_id.map_or("".to_string(), |version_id| version_id.to_string());
if let Err(err) = func(&bucket, &entry.name, &version_id, scan_mode) {
match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::FileVersionNotFound) => {}
_ => return Err(err),
}
}
}
}
Err(_) => {
return func(&bucket, &entry.name, "", scan_mode);
}
}
Ok(())
};
for (idx, pool) in self.pools.iter().enumerate() {
if opts.pool.is_some() && opts.pool.unwrap() != idx {
continue;
}
//TODO: IsSuspended
for (idx, set) in pool.disk_set.iter().enumerate() {
if opts.set.is_some() && opts.set.unwrap() != idx {
continue;
}
set.list
}
}
todo!()
}
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()> {
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
return self.pools[0].check_abandoned_parts(bucket, &object, opts).await;
}
let mut errs = Vec::new();
for pool in self.pools.iter() {
//TODO: IsSuspended
if let Err(err) = pool.check_abandoned_parts(bucket, &object, opts).await {
errs.push(err);
}
}
if !errs.is_empty() {
return Err(errs[0]);
}
Ok(())
}
}
async fn init_local_peer(endpoint_pools: &EndpointServerPools, host: &String, port: &String) {

View File

@@ -1,6 +1,12 @@
use std::collections::HashMap;
use crate::error::{Error, Result};
use crate::{
error::{Error, Result},
heal::{
heal_commands::{HealOpts, HealResultItem},
heal_ops::HealObjectFn,
},
};
use futures::StreamExt;
use http::HeaderMap;
use rmp_serde::Serializer;
@@ -601,4 +607,9 @@ pub trait StorageAPI: ObjectIO {
uploaded_parts: Vec<CompletePart>,
opts: &ObjectOptions,
) -> Result<ObjectInfo>;
async fn heal_format(&self, dry_run: bool) -> Result<HealResultItem>;
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem>;
async fn heal_object(&self, bucket: &str, object: &str, version_id: &str, opts: &HealOpts) -> Result<HealResultItem>;
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, func: HealObjectFn) -> Result<()>;
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;
}

View File

@@ -1125,6 +1125,47 @@ impl Node for NodeService {
}
}
async fn disk_info(&self, request: Request<DiskInfoRequest>) -> Result<Response<DiskInfoResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
let opts = match serde_json::from_str::<DiskInfoOptions>(&request.opts) {
Ok(opts) => opts,
Err(_) => {
return Ok(tonic::Response::new(ReadMultipleResponse {
success: false,
read_multiple_resps: Vec::new(),
error_info: Some("can not decode ReadMultipleReq".to_string()),
}));
}
};
match disk.disk_info(&opts).await {
Ok(disk_info) => match serde_json::to_string(&disk_info) {
Ok(disk_info) => Ok(tonic::Response::new(DiskInfoResponse {
success: true,
disk_info,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DiskInfoResponse {
success: false,
disk_info: "".to_string(),
error_info: Some(err.to_string()),
})),
},
Err(err) => Ok(tonic::Response::new(DiskInfoResponse {
success: false,
disk_info: "".to_string(),
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(DiskInfoResponse {
success: false,
disk_info: "".to_string(),
error_info: Some("can not find disk".to_string()),
}))
}
}
async fn lock(&self, request: Request<GenerallyLockRequest>) -> Result<Response<GenerallyLockResponse>, Status> {
let request = request.into_inner();
match &serde_json::from_str::<LockArgs>(&request.args) {