mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -1207,6 +1207,12 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
|
||||
[[package]]
|
||||
name = "hybrid-array"
|
||||
version = "0.2.1"
|
||||
@@ -1676,9 +1682,12 @@ version = "0.0.1"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"psutil",
|
||||
"serde",
|
||||
"time",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -50,6 +50,7 @@ hyper-util = { version = "0.1.10", features = [
|
||||
] }
|
||||
http = "1.1.0"
|
||||
http-body = "1.0.1"
|
||||
humantime = "2.1.0"
|
||||
lock = { path = "./common/lock" }
|
||||
lazy_static = "1.5.0"
|
||||
mime = "0.3.17"
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
// automatically generated by the FlatBuffers compiler, do not modify
|
||||
|
||||
|
||||
// @generated
|
||||
|
||||
use core::cmp::Ordering;
|
||||
use core::mem;
|
||||
use core::cmp::Ordering;
|
||||
|
||||
extern crate flatbuffers;
|
||||
use self::flatbuffers::{EndianScalar, Follow};
|
||||
@@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow};
|
||||
#[allow(unused_imports, dead_code)]
|
||||
pub mod models {
|
||||
|
||||
use core::cmp::Ordering;
|
||||
use core::mem;
|
||||
use core::mem;
|
||||
use core::cmp::Ordering;
|
||||
|
||||
extern crate flatbuffers;
|
||||
use self::flatbuffers::{EndianScalar, Follow};
|
||||
extern crate flatbuffers;
|
||||
use self::flatbuffers::{EndianScalar, Follow};
|
||||
|
||||
pub enum PingBodyOffset {}
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
pub enum PingBodyOffset {}
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
|
||||
pub struct PingBody<'a> {
|
||||
pub _tab: flatbuffers::Table<'a>,
|
||||
pub struct PingBody<'a> {
|
||||
pub _tab: flatbuffers::Table<'a>,
|
||||
}
|
||||
|
||||
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
|
||||
type Inner = PingBody<'a>;
|
||||
#[inline]
|
||||
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
|
||||
Self { _tab: flatbuffers::Table::new(buf, loc) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PingBody<'a> {
|
||||
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
|
||||
|
||||
pub const fn get_fully_qualified_name() -> &'static str {
|
||||
"models.PingBody"
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
|
||||
PingBody { _tab: table }
|
||||
}
|
||||
#[allow(unused_mut)]
|
||||
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
|
||||
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
|
||||
args: &'args PingBodyArgs<'args>
|
||||
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
|
||||
let mut builder = PingBodyBuilder::new(_fbb);
|
||||
if let Some(x) = args.payload { builder.add_payload(x); }
|
||||
builder.finish()
|
||||
}
|
||||
|
||||
|
||||
#[inline]
|
||||
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
|
||||
// Safety:
|
||||
// Created from valid Table for this object
|
||||
// which contains a valid value in this slot
|
||||
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
|
||||
}
|
||||
}
|
||||
|
||||
impl flatbuffers::Verifiable for PingBody<'_> {
|
||||
#[inline]
|
||||
fn run_verifier(
|
||||
v: &mut flatbuffers::Verifier, pos: usize
|
||||
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
|
||||
use self::flatbuffers::Verifiable;
|
||||
v.visit_table(pos)?
|
||||
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
|
||||
.finish();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
pub struct PingBodyArgs<'a> {
|
||||
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
|
||||
}
|
||||
impl<'a> Default for PingBodyArgs<'a> {
|
||||
#[inline]
|
||||
fn default() -> Self {
|
||||
PingBodyArgs {
|
||||
payload: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
|
||||
type Inner = PingBody<'a>;
|
||||
#[inline]
|
||||
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
|
||||
Self {
|
||||
_tab: flatbuffers::Table::new(buf, loc),
|
||||
}
|
||||
}
|
||||
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
|
||||
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
|
||||
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
|
||||
}
|
||||
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
|
||||
#[inline]
|
||||
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
|
||||
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
|
||||
}
|
||||
#[inline]
|
||||
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
|
||||
let start = _fbb.start_table();
|
||||
PingBodyBuilder {
|
||||
fbb_: _fbb,
|
||||
start_: start,
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
|
||||
let o = self.fbb_.end_table(self.start_);
|
||||
flatbuffers::WIPOffset::new(o.value())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PingBody<'a> {
|
||||
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
|
||||
impl core::fmt::Debug for PingBody<'_> {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
let mut ds = f.debug_struct("PingBody");
|
||||
ds.field("payload", &self.payload());
|
||||
ds.finish()
|
||||
}
|
||||
}
|
||||
} // pub mod models
|
||||
|
||||
pub const fn get_fully_qualified_name() -> &'static str {
|
||||
"models.PingBody"
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
|
||||
PingBody { _tab: table }
|
||||
}
|
||||
#[allow(unused_mut)]
|
||||
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
|
||||
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
|
||||
args: &'args PingBodyArgs<'args>,
|
||||
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
|
||||
let mut builder = PingBodyBuilder::new(_fbb);
|
||||
if let Some(x) = args.payload {
|
||||
builder.add_payload(x);
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
|
||||
// Safety:
|
||||
// Created from valid Table for this object
|
||||
// which contains a valid value in this slot
|
||||
unsafe {
|
||||
self._tab
|
||||
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl flatbuffers::Verifiable for PingBody<'_> {
|
||||
#[inline]
|
||||
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
|
||||
use self::flatbuffers::Verifiable;
|
||||
v.visit_table(pos)?
|
||||
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
|
||||
.finish();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
pub struct PingBodyArgs<'a> {
|
||||
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
|
||||
}
|
||||
impl<'a> Default for PingBodyArgs<'a> {
|
||||
#[inline]
|
||||
fn default() -> Self {
|
||||
PingBodyArgs { payload: None }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
|
||||
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
|
||||
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
|
||||
}
|
||||
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
|
||||
#[inline]
|
||||
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
|
||||
self.fbb_
|
||||
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
|
||||
}
|
||||
#[inline]
|
||||
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
|
||||
let start = _fbb.start_table();
|
||||
PingBodyBuilder {
|
||||
fbb_: _fbb,
|
||||
start_: start,
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
|
||||
let o = self.fbb_.end_table(self.start_);
|
||||
flatbuffers::WIPOffset::new(o.value())
|
||||
}
|
||||
}
|
||||
|
||||
impl core::fmt::Debug for PingBody<'_> {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
let mut ds = f.debug_struct("PingBody");
|
||||
ds.field("payload", &self.payload());
|
||||
ds.finish()
|
||||
}
|
||||
}
|
||||
} // pub mod models
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -19,7 +19,7 @@ use crate::disk::os::{check_path_length, is_empty_dir};
|
||||
use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
|
||||
use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, SizeSummary};
|
||||
use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, ShouldSleepFn, SizeSummary};
|
||||
use crate::heal::data_scanner_metric::{ScannerMetric, ScannerMetrics};
|
||||
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
|
||||
use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE};
|
||||
@@ -1974,6 +1974,7 @@ impl DiskAPI for LocalDisk {
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
we_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache> {
|
||||
self.scanning.fetch_add(1, Ordering::SeqCst);
|
||||
defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) });
|
||||
@@ -2089,6 +2090,7 @@ impl DiskAPI for LocalDisk {
|
||||
})
|
||||
}),
|
||||
scan_mode,
|
||||
we_sleep,
|
||||
)
|
||||
.await?;
|
||||
data_usage_info.info.last_update = Some(SystemTime::now());
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::{
|
||||
error::{Error, Result},
|
||||
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion},
|
||||
heal::{
|
||||
data_scanner::ShouldSleepFn,
|
||||
data_usage_cache::{DataUsageCache, DataUsageEntry},
|
||||
heal_commands::{HealScanMode, HealingTracker},
|
||||
},
|
||||
@@ -350,11 +351,12 @@ impl DiskAPI for Disk {
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
we_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache> {
|
||||
info!("ns_scanner");
|
||||
match self {
|
||||
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await,
|
||||
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await,
|
||||
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await,
|
||||
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,6 +469,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
we_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache>;
|
||||
async fn healing(&self) -> Option<HealingTracker>;
|
||||
}
|
||||
@@ -1045,7 +1048,7 @@ type NodeClient = NodeServiceClient<
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteFileWriter {
|
||||
pub root: PathBuf,
|
||||
pub endpoint: Endpoint,
|
||||
pub volume: String,
|
||||
pub path: String,
|
||||
pub is_append: bool,
|
||||
@@ -1054,7 +1057,7 @@ pub struct RemoteFileWriter {
|
||||
}
|
||||
|
||||
impl RemoteFileWriter {
|
||||
pub async fn new(root: PathBuf, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result<Self> {
|
||||
pub async fn new(endpoint: Endpoint, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result<Self> {
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let in_stream = ReceiverStream::new(rx);
|
||||
|
||||
@@ -1063,7 +1066,7 @@ impl RemoteFileWriter {
|
||||
let resp_stream = response.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
root,
|
||||
endpoint,
|
||||
volume,
|
||||
path,
|
||||
is_append,
|
||||
@@ -1081,7 +1084,7 @@ impl Writer for RemoteFileWriter {
|
||||
|
||||
async fn write(&mut self, buf: &[u8]) -> Result<()> {
|
||||
let request = WriteRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: self.volume.to_string(),
|
||||
path: self.path.to_string(),
|
||||
is_append: self.is_append,
|
||||
@@ -1236,7 +1239,7 @@ impl Reader for LocalFileReader {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteFileReader {
|
||||
pub root: PathBuf,
|
||||
pub endpoint: Endpoint,
|
||||
pub volume: String,
|
||||
pub path: String,
|
||||
tx: Sender<ReadAtRequest>,
|
||||
@@ -1244,7 +1247,7 @@ pub struct RemoteFileReader {
|
||||
}
|
||||
|
||||
impl RemoteFileReader {
|
||||
pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeClient) -> Result<Self> {
|
||||
pub async fn new(endpoint: Endpoint, volume: String, path: String, mut client: NodeClient) -> Result<Self> {
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let in_stream = ReceiverStream::new(rx);
|
||||
|
||||
@@ -1253,7 +1256,7 @@ impl RemoteFileReader {
|
||||
let resp_stream = response.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
root,
|
||||
endpoint,
|
||||
volume,
|
||||
path,
|
||||
tx,
|
||||
@@ -1266,7 +1269,7 @@ impl RemoteFileReader {
|
||||
impl Reader for RemoteFileReader {
|
||||
async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
|
||||
let request = ReadAtRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: self.volume.to_string(),
|
||||
path: self.path.to_string(),
|
||||
offset: offset.try_into().unwrap(),
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::{
|
||||
disk::error::DiskError,
|
||||
error::{Error, Result},
|
||||
heal::{
|
||||
data_scanner::ShouldSleepFn,
|
||||
data_usage_cache::{DataUsageCache, DataUsageEntry},
|
||||
heal_commands::{HealScanMode, HealingTracker},
|
||||
},
|
||||
@@ -129,7 +130,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadAllRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
});
|
||||
@@ -151,7 +152,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WriteAllRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
data,
|
||||
@@ -173,7 +174,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
options,
|
||||
@@ -195,7 +196,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(VerifyFileRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
file_info,
|
||||
@@ -219,7 +220,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(CheckPartsRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
file_info,
|
||||
@@ -242,7 +243,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(RenamePartRequst {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
src_volume: src_volume.to_string(),
|
||||
src_path: src_path.to_string(),
|
||||
dst_volume: dst_volume.to_string(),
|
||||
@@ -264,7 +265,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(RenameFileRequst {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
src_volume: src_volume.to_string(),
|
||||
src_path: src_path.to_string(),
|
||||
dst_volume: dst_volume.to_string(),
|
||||
@@ -284,7 +285,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("create_file");
|
||||
Ok(FileWriter::Remote(
|
||||
RemoteFileWriter::new(
|
||||
self.root.clone(),
|
||||
self.endpoint.clone(),
|
||||
volume.to_string(),
|
||||
path.to_string(),
|
||||
false,
|
||||
@@ -300,7 +301,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("append_file");
|
||||
Ok(FileWriter::Remote(
|
||||
RemoteFileWriter::new(
|
||||
self.root.clone(),
|
||||
self.endpoint.clone(),
|
||||
volume.to_string(),
|
||||
path.to_string(),
|
||||
true,
|
||||
@@ -316,7 +317,7 @@ impl DiskAPI for RemoteDisk {
|
||||
info!("read_file");
|
||||
Ok(FileReader::Remote(
|
||||
RemoteFileReader::new(
|
||||
self.root.clone(),
|
||||
self.endpoint.clone(),
|
||||
volume.to_string(),
|
||||
path.to_string(),
|
||||
node_service_time_out_client(&self.addr)
|
||||
@@ -333,7 +334,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ListDirRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
});
|
||||
|
||||
@@ -353,7 +354,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WalkDirRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
walk_dir_options,
|
||||
});
|
||||
|
||||
@@ -386,7 +387,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(RenameDataRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
src_volume: src_volume.to_string(),
|
||||
src_path: src_path.to_string(),
|
||||
file_info,
|
||||
@@ -411,7 +412,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(MakeVolumesRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volumes: volumes.iter().map(|s| (*s).to_string()).collect(),
|
||||
});
|
||||
|
||||
@@ -430,7 +431,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(MakeVolumeRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
});
|
||||
|
||||
@@ -449,7 +450,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ListVolumesRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
});
|
||||
|
||||
let response = client.list_volumes(request).await?.into_inner();
|
||||
@@ -473,7 +474,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(StatVolumeRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
});
|
||||
|
||||
@@ -495,7 +496,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeletePathsRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
paths,
|
||||
});
|
||||
@@ -517,7 +518,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(UpdateMetadataRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
file_info,
|
||||
@@ -540,7 +541,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(WriteMetadataRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
file_info,
|
||||
@@ -569,7 +570,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadVersionRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
version_id: version_id.to_string(),
|
||||
@@ -593,7 +594,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadXlRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
read_data,
|
||||
@@ -625,7 +626,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteVersionRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
path: path.to_string(),
|
||||
file_info,
|
||||
@@ -659,7 +660,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteVersionsRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
versions: versions_str,
|
||||
opts,
|
||||
@@ -694,7 +695,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(ReadMultipleRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
read_multiple_req,
|
||||
});
|
||||
|
||||
@@ -719,7 +720,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DeleteVolumeRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
volume: volume.to_string(),
|
||||
});
|
||||
|
||||
@@ -739,7 +740,7 @@ impl DiskAPI for RemoteDisk {
|
||||
.await
|
||||
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
|
||||
let request = Request::new(DiskInfoRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
opts,
|
||||
});
|
||||
|
||||
@@ -759,6 +760,7 @@ impl DiskAPI for RemoteDisk {
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
_we_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache> {
|
||||
info!("ns_scanner");
|
||||
let cache = serde_json::to_string(cache)?;
|
||||
@@ -770,7 +772,7 @@ impl DiskAPI for RemoteDisk {
|
||||
let in_stream = ReceiverStream::new(rx);
|
||||
let mut response = client.ns_scanner(in_stream).await?.into_inner();
|
||||
let request = NsScannerRequest {
|
||||
disk: self.root.to_string_lossy().to_string(),
|
||||
disk: self.endpoint.to_string(),
|
||||
cache,
|
||||
scan_mode: scan_mode as u64,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use tracing::warn;
|
||||
use tracing::{info, warn};
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
disk::endpoint::{Endpoint, EndpointType},
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use madmin::heal_commands::HealResultItem;
|
||||
use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
sync::{
|
||||
@@ -10,7 +11,7 @@ use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{
|
||||
heal_commands::{HealOpts, HealResultItem},
|
||||
heal_commands::HealOpts,
|
||||
heal_ops::{new_bg_heal_sequence, HealSequence},
|
||||
};
|
||||
use crate::heal::error::ERR_RETRY_HEALING;
|
||||
|
||||
@@ -62,7 +62,7 @@ use crate::{
|
||||
store_api::{FileInfo, ObjectInfo},
|
||||
};
|
||||
|
||||
const _DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
|
||||
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
|
||||
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
|
||||
const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch.
|
||||
const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch.
|
||||
@@ -73,7 +73,6 @@ const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to w
|
||||
pub const HEAL_DELETE_DANGLING: bool = true;
|
||||
const HEAL_OBJECT_SELECT_PROB: u64 = 1024; // Overall probability of a file being scanned; one in n.
|
||||
|
||||
// static SCANNER_SLEEPER: () = new_dynamic_sleeper(2, Duration::from_secs(1), true); // Keep defaults same as config defaults
|
||||
static SCANNER_CYCLE: AtomicU64 = AtomicU64::new(DATA_SCANNER_START_DELAY.as_secs());
|
||||
static _SCANNER_IDLE_MODE: AtomicU32 = AtomicU32::new(0); // default is throttled when idle
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
|
||||
@@ -81,9 +80,67 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102
|
||||
static SCANNER_EXCESS_FOLDERS: AtomicU64 = AtomicU64::new(50_000);
|
||||
|
||||
lazy_static! {
|
||||
static ref SCANNER_SLEEPER: RwLock<DynamicSleeper> = RwLock::new(new_dynamic_sleeper(2.0, Duration::from_secs(1), true));
|
||||
pub static ref globalHealConfig: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::default()));
|
||||
}
|
||||
|
||||
struct DynamicSleeper {
|
||||
factor: f64,
|
||||
max_sleep: Duration,
|
||||
min_sleep: Duration,
|
||||
_is_scanner: bool,
|
||||
}
|
||||
|
||||
type TimerFn = Pin<Box<dyn Future<Output = ()> + Send>>;
|
||||
impl DynamicSleeper {
|
||||
fn timer() -> TimerFn {
|
||||
let t = SystemTime::now();
|
||||
Box::pin(async move {
|
||||
let done_at = SystemTime::now().duration_since(t).unwrap_or_default();
|
||||
SCANNER_SLEEPER.read().await.sleep(done_at).await;
|
||||
})
|
||||
}
|
||||
|
||||
async fn sleep(&self, base: Duration) {
|
||||
let (min_wait, max_wait) = (self.min_sleep, self.max_sleep);
|
||||
let factor = self.factor;
|
||||
|
||||
let want_sleep = {
|
||||
let tmp = base.mul_f64(factor);
|
||||
if tmp < min_wait {
|
||||
return;
|
||||
}
|
||||
|
||||
if max_wait > Duration::from_secs(0) && tmp > max_wait {
|
||||
max_wait
|
||||
} else {
|
||||
tmp
|
||||
}
|
||||
};
|
||||
sleep(want_sleep).await;
|
||||
}
|
||||
|
||||
fn _update(&mut self, factor: f64, max_wait: Duration) -> Result<()> {
|
||||
if (self.factor - factor).abs() < 1e-10 && self.max_sleep == max_wait {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.factor = factor;
|
||||
self.max_sleep = max_wait;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> DynamicSleeper {
|
||||
DynamicSleeper {
|
||||
factor,
|
||||
max_sleep: max_wait,
|
||||
min_sleep: Duration::from_micros(100),
|
||||
_is_scanner: is_scanner,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init_data_scanner() {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
@@ -457,6 +514,7 @@ struct CachedFolder {
|
||||
pub type GetSizeFn =
|
||||
Box<dyn Fn(&ScannerItem) -> Pin<Box<dyn Future<Output = Result<SizeSummary>> + Send>> + Send + Sync + 'static>;
|
||||
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
|
||||
pub type ShouldSleepFn = Option<Arc<dyn Fn() -> bool + Send + Sync + 'static>>;
|
||||
|
||||
struct FolderScanner {
|
||||
root: String,
|
||||
@@ -474,6 +532,7 @@ struct FolderScanner {
|
||||
update_current_path: UpdateCurrentPathFn,
|
||||
skip_heal: AtomicBool,
|
||||
drive: LocalDrive,
|
||||
we_sleep: ShouldSleepFn,
|
||||
}
|
||||
|
||||
impl FolderScanner {
|
||||
@@ -514,6 +573,12 @@ impl FolderScanner {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(should_sleep) = &self.we_sleep {
|
||||
if should_sleep() {
|
||||
SCANNER_SLEEPER.read().await.sleep(DATA_SCANNER_SLEEP_PER_FOLDER).await;
|
||||
}
|
||||
}
|
||||
|
||||
let mut existing_folders = Vec::new();
|
||||
let mut new_folders = Vec::new();
|
||||
let mut found_objects: bool = false;
|
||||
@@ -553,6 +618,16 @@ impl FolderScanner {
|
||||
continue;
|
||||
}
|
||||
|
||||
let _wait = if let Some(should_sleep) = &self.we_sleep {
|
||||
if should_sleep() {
|
||||
DynamicSleeper::timer()
|
||||
} else {
|
||||
Box::pin(async {})
|
||||
}
|
||||
} else {
|
||||
Box::pin(async {})
|
||||
};
|
||||
|
||||
let mut item = ScannerItem {
|
||||
path: Path::new(&self.root).join(&ent_name).to_string_lossy().to_string(),
|
||||
bucket,
|
||||
@@ -1001,6 +1076,7 @@ pub async fn scan_data_folder(
|
||||
cache: &DataUsageCache,
|
||||
get_size_fn: GetSizeFn,
|
||||
heal_scan_mode: HealScanMode,
|
||||
should_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache> {
|
||||
if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT {
|
||||
return Err(Error::from_string("internal error: root scan attempted"));
|
||||
@@ -1029,6 +1105,7 @@ pub async fn scan_data_folder(
|
||||
disks_quorum: disks.len() / 2,
|
||||
skip_heal,
|
||||
drive: drive.clone(),
|
||||
we_sleep: should_sleep,
|
||||
};
|
||||
|
||||
if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing {
|
||||
|
||||
@@ -24,7 +24,6 @@ use crate::{
|
||||
use super::{background_heal_ops::get_local_disks_to_heal, heal_ops::BG_HEALING_UUID};
|
||||
|
||||
pub type HealScanMode = usize;
|
||||
pub type HealItemType = String;
|
||||
|
||||
pub const HEAL_UNKNOWN_SCAN: HealScanMode = 0;
|
||||
pub const HEAL_NORMAL_SCAN: HealScanMode = 1;
|
||||
@@ -66,49 +65,6 @@ pub struct HealOpts {
|
||||
pub set: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct HealDriveInfo {
|
||||
pub uuid: String,
|
||||
pub endpoint: String,
|
||||
pub state: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct Infos {
|
||||
#[serde(rename = "drives")]
|
||||
pub drives: Vec<HealDriveInfo>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct HealResultItem {
|
||||
#[serde(rename = "resultId")]
|
||||
pub result_index: usize,
|
||||
#[serde(rename = "type")]
|
||||
pub heal_item_type: HealItemType,
|
||||
#[serde(rename = "bucket")]
|
||||
pub bucket: String,
|
||||
#[serde(rename = "object")]
|
||||
pub object: String,
|
||||
#[serde(rename = "versionId")]
|
||||
pub version_id: String,
|
||||
#[serde(rename = "detail")]
|
||||
pub detail: String,
|
||||
#[serde(rename = "parityBlocks")]
|
||||
pub parity_blocks: usize,
|
||||
#[serde(rename = "dataBlocks")]
|
||||
pub data_blocks: usize,
|
||||
#[serde(rename = "diskCount")]
|
||||
pub disk_count: usize,
|
||||
#[serde(rename = "setCount")]
|
||||
pub set_count: usize,
|
||||
#[serde(rename = "before")]
|
||||
pub before: Infos,
|
||||
#[serde(rename = "after")]
|
||||
pub after: Infos,
|
||||
#[serde(rename = "objectSize")]
|
||||
pub object_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct HealStartSuccess {
|
||||
#[serde(rename = "clientToken")]
|
||||
|
||||
@@ -2,19 +2,14 @@ use super::{
|
||||
background_heal_ops::HealTask,
|
||||
data_scanner::HEAL_DELETE_DANGLING,
|
||||
error::ERR_SKIP_FILE,
|
||||
heal_commands::{
|
||||
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA,
|
||||
},
|
||||
heal_commands::{HealOpts, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA},
|
||||
};
|
||||
use crate::store_api::StorageAPI;
|
||||
use crate::{
|
||||
config::common::CONFIG_PREFIX,
|
||||
disk::RUSTFS_META_BUCKET,
|
||||
global::GLOBAL_BackgroundHealRoutine,
|
||||
heal::{
|
||||
error::ERR_HEAL_STOP_SIGNALLED,
|
||||
heal_commands::{HealDriveInfo, DRIVE_STATE_OK},
|
||||
},
|
||||
heal::{error::ERR_HEAL_STOP_SIGNALLED, heal_commands::DRIVE_STATE_OK},
|
||||
};
|
||||
use crate::{
|
||||
disk::{endpoint::Endpoint, MetaCacheEntry},
|
||||
@@ -32,6 +27,7 @@ use crate::{
|
||||
use chrono::Utc;
|
||||
use futures::join;
|
||||
use lazy_static::lazy_static;
|
||||
use madmin::heal_commands::{HealDriveInfo, HealItemType, HealResultItem};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
|
||||
@@ -14,7 +14,7 @@ pub mod heal;
|
||||
pub mod metrics_realtime;
|
||||
pub mod notification_sys;
|
||||
pub mod peer;
|
||||
mod peer_rest_client;
|
||||
pub mod peer_rest_client;
|
||||
mod quorum;
|
||||
pub mod set_disk;
|
||||
mod sets;
|
||||
|
||||
@@ -25,8 +25,8 @@ pub fn get_global_notification_sys() -> Option<&'static NotificationSys> {
|
||||
}
|
||||
|
||||
pub struct NotificationSys {
|
||||
peer_clients: Vec<Option<PeerRestClient>>,
|
||||
all_peer_clients: Vec<Option<PeerRestClient>>,
|
||||
pub peer_clients: Vec<Option<PeerRestClient>>,
|
||||
pub all_peer_clients: Vec<Option<PeerRestClient>>,
|
||||
}
|
||||
|
||||
impl NotificationSys {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use async_trait::async_trait;
|
||||
use futures::future::join_all;
|
||||
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use protos::node_service_time_out_client;
|
||||
use protos::proto_gen::node_service::{
|
||||
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
|
||||
@@ -14,8 +15,7 @@ use crate::disk::error::is_all_buckets_not_found;
|
||||
use crate::disk::{DiskAPI, DiskStore};
|
||||
use crate::global::GLOBAL_LOCAL_DISK_MAP;
|
||||
use crate::heal::heal_commands::{
|
||||
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK,
|
||||
HEAL_ITEM_BUCKET,
|
||||
HealOpts, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_ITEM_BUCKET,
|
||||
};
|
||||
use crate::heal::heal_ops::RUESTFS_RESERVED_BUCKET;
|
||||
use crate::quorum::{bucket_op_ignored_errs, reduce_write_quorum_errs};
|
||||
|
||||
@@ -35,7 +35,7 @@ pub const PEER_RESTSIGNAL: &str = "signal";
|
||||
pub const PEER_RESTSUB_SYS: &str = "sub-sys";
|
||||
pub const PEER_RESTDRY_RUN: &str = "dry-run";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PeerRestClient {
|
||||
pub host: XHost,
|
||||
pub grid_host: String,
|
||||
|
||||
@@ -31,8 +31,8 @@ use crate::{
|
||||
data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
|
||||
data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo},
|
||||
heal_commands::{
|
||||
HealDriveInfo, HealOpts, HealResultItem, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING,
|
||||
DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
|
||||
HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
|
||||
DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
|
||||
},
|
||||
heal_ops::BG_HEALING_UUID,
|
||||
},
|
||||
@@ -64,6 +64,7 @@ use lock::{
|
||||
namespace_lock::{new_nslock, NsLockMap},
|
||||
LockApi,
|
||||
};
|
||||
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use rand::{
|
||||
thread_rng,
|
||||
{seq::SliceRandom, Rng},
|
||||
@@ -2811,7 +2812,7 @@ impl SetDisks {
|
||||
});
|
||||
// Calc usage
|
||||
let before = cache.info.last_update;
|
||||
let cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode).await {
|
||||
let cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode, None).await {
|
||||
Ok(cache) => cache,
|
||||
Err(_) => {
|
||||
if cache.info.last_update > before {
|
||||
|
||||
@@ -5,6 +5,7 @@ use common::globals::GLOBAL_Local_Node_Name;
|
||||
use futures::future::join_all;
|
||||
use http::HeaderMap;
|
||||
use lock::{namespace_lock::NsLockMap, new_lock_api, LockApi};
|
||||
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -18,8 +19,7 @@ use crate::{
|
||||
error::{Error, Result},
|
||||
global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
|
||||
heal::heal_commands::{
|
||||
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK,
|
||||
HEAL_ITEM_METADATA,
|
||||
HealOpts, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_ITEM_METADATA,
|
||||
},
|
||||
set_disk::SetDisks,
|
||||
store_api::{
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::global::{
|
||||
};
|
||||
use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT};
|
||||
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
|
||||
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
|
||||
use crate::heal::heal_commands::{HealOpts, HealScanMode, HEAL_ITEM_METADATA};
|
||||
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
|
||||
use crate::new_object_layer_fn;
|
||||
use crate::notification_sys::get_global_notification_sys;
|
||||
@@ -45,6 +45,7 @@ use futures::future::join_all;
|
||||
use glob::Pattern;
|
||||
use http::HeaderMap;
|
||||
use lazy_static::lazy_static;
|
||||
use madmin::heal_commands::HealResultItem;
|
||||
use rand::Rng;
|
||||
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
@@ -2,12 +2,14 @@ use crate::heal::heal_ops::HealSequence;
|
||||
use crate::{
|
||||
disk::DiskStore,
|
||||
error::{Error, Result},
|
||||
heal::heal_commands::{HealOpts, HealResultItem},
|
||||
heal::heal_commands::HealOpts,
|
||||
utils::path::decode_dir_object,
|
||||
xhttp,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use madmin::heal_commands::HealResultItem;
|
||||
use madmin::info_commands::DiskMetrics;
|
||||
use rmp_serde::Serializer;
|
||||
use s3s::{dto::StreamingBlob, Body};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -6,6 +6,5 @@ pub mod hash;
|
||||
pub mod net;
|
||||
pub mod os;
|
||||
pub mod path;
|
||||
pub mod time;
|
||||
pub mod wildcard;
|
||||
pub mod xml;
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use tracing::info;
|
||||
|
||||
pub fn parse_duration(s: &str) -> Option<Duration> {
|
||||
if s.ends_with("ms") {
|
||||
if let Ok(s) = s.trim_end_matches("ms").parse::<u64>() {
|
||||
return Some(Duration::from_millis(s));
|
||||
}
|
||||
} else if s.ends_with("s") {
|
||||
if let Ok(s) = s.trim_end_matches('s').parse::<u64>() {
|
||||
return Some(Duration::from_secs(s));
|
||||
}
|
||||
} else if s.ends_with("m") {
|
||||
if let Ok(s) = s.trim_end_matches('m').parse::<u64>() {
|
||||
return Some(Duration::from_secs(s * 60));
|
||||
}
|
||||
} else if s.ends_with("h") {
|
||||
if let Ok(s) = s.trim_end_matches('h').parse::<u64>() {
|
||||
return Some(Duration::from_secs(s * 60 * 60));
|
||||
}
|
||||
}
|
||||
info!("can not parse duration, s: {}", s);
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::time::Duration;
|
||||
|
||||
use super::parse_duration;
|
||||
|
||||
#[test]
|
||||
fn test_parse_dur() {
|
||||
let s = String::from("3s");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Some(Duration::from_secs(3)), dur);
|
||||
|
||||
let s = String::from("3ms");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Some(Duration::from_millis(3)), dur);
|
||||
|
||||
let s = String::from("3m");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Some(Duration::from_secs(3 * 60)), dur);
|
||||
|
||||
let s = String::from("3h");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Some(Duration::from_secs(3 * 60 * 60)), dur);
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,9 @@ workspace = true
|
||||
[dependencies]
|
||||
chrono.workspace = true
|
||||
common.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper.workspace = true
|
||||
psutil = "3.3.0"
|
||||
serde.workspace = true
|
||||
time.workspace =true
|
||||
tracing.workspace = true
|
||||
|
||||
46
madmin/src/heal_commands.rs
Normal file
46
madmin/src/heal_commands.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub type HealItemType = String;
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct HealDriveInfo {
|
||||
pub uuid: String,
|
||||
pub endpoint: String,
|
||||
pub state: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct Infos {
|
||||
#[serde(rename = "drives")]
|
||||
pub drives: Vec<HealDriveInfo>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct HealResultItem {
|
||||
#[serde(rename = "resultId")]
|
||||
pub result_index: usize,
|
||||
#[serde(rename = "type")]
|
||||
pub heal_item_type: HealItemType,
|
||||
#[serde(rename = "bucket")]
|
||||
pub bucket: String,
|
||||
#[serde(rename = "object")]
|
||||
pub object: String,
|
||||
#[serde(rename = "versionId")]
|
||||
pub version_id: String,
|
||||
#[serde(rename = "detail")]
|
||||
pub detail: String,
|
||||
#[serde(rename = "parityBlocks")]
|
||||
pub parity_blocks: usize,
|
||||
#[serde(rename = "dataBlocks")]
|
||||
pub data_blocks: usize,
|
||||
#[serde(rename = "diskCount")]
|
||||
pub disk_count: usize,
|
||||
#[serde(rename = "setCount")]
|
||||
pub set_count: usize,
|
||||
#[serde(rename = "before")]
|
||||
pub before: Infos,
|
||||
#[serde(rename = "after")]
|
||||
pub after: Infos,
|
||||
#[serde(rename = "objectSize")]
|
||||
pub object_size: usize,
|
||||
}
|
||||
@@ -1,6 +1,10 @@
|
||||
pub mod heal_commands;
|
||||
pub mod health;
|
||||
pub mod info_commands;
|
||||
pub mod metrics;
|
||||
pub mod net;
|
||||
pub mod service_commands;
|
||||
pub mod trace;
|
||||
pub mod utils;
|
||||
|
||||
pub use info_commands::*;
|
||||
|
||||
103
madmin/src/service_commands.rs
Normal file
103
madmin/src/service_commands.rs
Normal file
@@ -0,0 +1,103 @@
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use hyper::Uri;
|
||||
|
||||
use crate::{trace::TraceType, utils::parse_duration};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ServiceTraceOpts {
|
||||
s3: bool,
|
||||
internal: bool,
|
||||
storage: bool,
|
||||
os: bool,
|
||||
scanner: bool,
|
||||
decommission: bool,
|
||||
healing: bool,
|
||||
batch_replication: bool,
|
||||
batch_key_rotation: bool,
|
||||
batch_expire: bool,
|
||||
batch_all: bool,
|
||||
rebalance: bool,
|
||||
replication_resync: bool,
|
||||
bootstrap: bool,
|
||||
ftp: bool,
|
||||
ilm: bool,
|
||||
only_errors: bool,
|
||||
threshold: Duration,
|
||||
}
|
||||
|
||||
impl ServiceTraceOpts {
|
||||
fn trace_types(&self) -> TraceType {
|
||||
let mut tt = TraceType::default();
|
||||
tt.set_if(self.s3, &TraceType::S3);
|
||||
tt.set_if(self.internal, &TraceType::INTERNAL);
|
||||
tt.set_if(self.storage, &TraceType::STORAGE);
|
||||
tt.set_if(self.os, &TraceType::OS);
|
||||
tt.set_if(self.scanner, &TraceType::SCANNER);
|
||||
tt.set_if(self.decommission, &TraceType::DECOMMISSION);
|
||||
tt.set_if(self.healing, &TraceType::HEALING);
|
||||
|
||||
if self.batch_all {
|
||||
tt.set_if(true, &TraceType::BATCH_REPLICATION);
|
||||
tt.set_if(true, &TraceType::BATCH_KEY_ROTATION);
|
||||
tt.set_if(true, &TraceType::BATCH_EXPIRE);
|
||||
} else {
|
||||
tt.set_if(self.batch_replication, &TraceType::BATCH_REPLICATION);
|
||||
tt.set_if(self.batch_key_rotation, &TraceType::BATCH_KEY_ROTATION);
|
||||
tt.set_if(self.batch_expire, &TraceType::BATCH_EXPIRE);
|
||||
}
|
||||
|
||||
tt.set_if(self.rebalance, &TraceType::REBALANCE);
|
||||
tt.set_if(self.replication_resync, &TraceType::REPLICATION_RESYNC);
|
||||
tt.set_if(self.bootstrap, &TraceType::BOOTSTRAP);
|
||||
tt.set_if(self.ftp, &TraceType::FTP);
|
||||
tt.set_if(self.ilm, &TraceType::ILM);
|
||||
|
||||
tt
|
||||
}
|
||||
|
||||
pub fn parse_params(&mut self, uri: &Uri) -> Result<(), String> {
|
||||
let query_pairs: HashMap<_, _> = uri
|
||||
.query()
|
||||
.unwrap_or("")
|
||||
.split('&')
|
||||
.filter_map(|pair| {
|
||||
let mut split = pair.split('=');
|
||||
let key = split.next()?.to_string();
|
||||
let value = split.next().map(|v| v.to_string()).unwrap_or_else(|| "false".to_string());
|
||||
Some((key, value))
|
||||
})
|
||||
.collect();
|
||||
|
||||
self.s3 = query_pairs.get("s3").map_or(false, |v| v == "true");
|
||||
self.os = query_pairs.get("os").map_or(false, |v| v == "true");
|
||||
self.scanner = query_pairs.get("scanner").map_or(false, |v| v == "true");
|
||||
self.decommission = query_pairs.get("decommission").map_or(false, |v| v == "true");
|
||||
self.healing = query_pairs.get("healing").map_or(false, |v| v == "true");
|
||||
self.batch_replication = query_pairs.get("batch-replication").map_or(false, |v| v == "true");
|
||||
self.batch_key_rotation = query_pairs.get("batch-keyrotation").map_or(false, |v| v == "true");
|
||||
self.batch_expire = query_pairs.get("batch-expire").map_or(false, |v| v == "true");
|
||||
if query_pairs.get("all").map_or(false, |v| v == "true") {
|
||||
self.s3 = true;
|
||||
self.internal = true;
|
||||
self.storage = true;
|
||||
self.os = true;
|
||||
}
|
||||
|
||||
self.rebalance = query_pairs.get("rebalance").map_or(false, |v| v == "true");
|
||||
self.storage = query_pairs.get("storage").map_or(false, |v| v == "true");
|
||||
self.internal = query_pairs.get("internal").map_or(false, |v| v == "true");
|
||||
self.only_errors = query_pairs.get("err").map_or(false, |v| v == "true");
|
||||
self.replication_resync = query_pairs.get("replication-resync").map_or(false, |v| v == "true");
|
||||
self.bootstrap = query_pairs.get("bootstrap").map_or(false, |v| v == "true");
|
||||
self.ftp = query_pairs.get("ftp").map_or(false, |v| v == "true");
|
||||
self.ilm = query_pairs.get("ilm").map_or(false, |v| v == "true");
|
||||
|
||||
if let Some(threshold) = query_pairs.get("threshold") {
|
||||
let duration = parse_duration(threshold)?;
|
||||
self.threshold = duration;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
172
madmin/src/trace.rs
Normal file
172
madmin/src/trace.rs
Normal file
@@ -0,0 +1,172 @@
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::heal_commands::HealResultItem;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct TraceType(u64);
|
||||
|
||||
impl TraceType {
|
||||
// 定义一些常量
|
||||
pub const OS: TraceType = TraceType(1 << 0);
|
||||
pub const STORAGE: TraceType = TraceType(1 << 1);
|
||||
pub const S3: TraceType = TraceType(1 << 2);
|
||||
pub const INTERNAL: TraceType = TraceType(1 << 3);
|
||||
pub const SCANNER: TraceType = TraceType(1 << 4);
|
||||
pub const DECOMMISSION: TraceType = TraceType(1 << 5);
|
||||
pub const HEALING: TraceType = TraceType(1 << 6);
|
||||
pub const BATCH_REPLICATION: TraceType = TraceType(1 << 7);
|
||||
pub const BATCH_KEY_ROTATION: TraceType = TraceType(1 << 8);
|
||||
pub const BATCH_EXPIRE: TraceType = TraceType(1 << 9);
|
||||
pub const REBALANCE: TraceType = TraceType(1 << 10);
|
||||
pub const REPLICATION_RESYNC: TraceType = TraceType(1 << 11);
|
||||
pub const BOOTSTRAP: TraceType = TraceType(1 << 12);
|
||||
pub const FTP: TraceType = TraceType(1 << 13);
|
||||
pub const ILM: TraceType = TraceType(1 << 14);
|
||||
|
||||
// MetricsAll must be last.
|
||||
pub const ALL: TraceType = TraceType((1 << 15) - 1);
|
||||
|
||||
pub fn new(t: u64) -> Self {
|
||||
Self(t)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TraceType {
|
||||
fn default() -> Self {
|
||||
Self(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl TraceType {
|
||||
pub fn contains(&self, x: &TraceType) -> bool {
|
||||
(self.0 & x.0) == x.0
|
||||
}
|
||||
|
||||
pub fn overlaps(&self, x: &TraceType) -> bool {
|
||||
(self.0 & x.0) != 0
|
||||
}
|
||||
|
||||
pub fn single_type(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, other: &TraceType) {
|
||||
self.0 = self.0 | other.0
|
||||
}
|
||||
|
||||
pub fn set_if(&mut self, b: bool, other: &TraceType) {
|
||||
if b {
|
||||
self.0 = self.0 | other.0
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mask(&self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceInfo {
|
||||
#[serde(rename = "type")]
|
||||
trace_type: u64,
|
||||
#[serde(rename = "nodename")]
|
||||
node_name: String,
|
||||
#[serde(rename = "funcname")]
|
||||
func_name: String,
|
||||
#[serde(rename = "time")]
|
||||
time: DateTime<Utc>,
|
||||
#[serde(rename = "path")]
|
||||
path: String,
|
||||
#[serde(rename = "dur")]
|
||||
duration: Duration,
|
||||
#[serde(rename = "bytes", skip_serializing_if = "Option::is_none")]
|
||||
bytes: Option<i64>,
|
||||
#[serde(rename = "msg", skip_serializing_if = "Option::is_none")]
|
||||
message: Option<String>,
|
||||
#[serde(rename = "error", skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
#[serde(rename = "custom", skip_serializing_if = "Option::is_none")]
|
||||
custom: Option<HashMap<String, String>>,
|
||||
#[serde(rename = "http", skip_serializing_if = "Option::is_none")]
|
||||
http: Option<TraceHTTPStats>,
|
||||
#[serde(rename = "healResult", skip_serializing_if = "Option::is_none")]
|
||||
heal_result: Option<HealResultItem>,
|
||||
}
|
||||
|
||||
impl TraceInfo {
|
||||
pub fn mask(&self) -> u64 {
|
||||
TraceType::new(self.trace_type).mask()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceInfoLegacy {
|
||||
trace_info: TraceInfo,
|
||||
#[serde(rename = "request")]
|
||||
req_info: Option<TraceRequestInfo>,
|
||||
#[serde(rename = "response")]
|
||||
resp_info: Option<TraceResponseInfo>,
|
||||
#[serde(rename = "stats")]
|
||||
call_stats: Option<TraceCallStats>,
|
||||
#[serde(rename = "storageStats")]
|
||||
storage_stats: Option<StorageStats>,
|
||||
#[serde(rename = "osStats")]
|
||||
os_stats: Option<OSStats>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct StorageStats {
|
||||
path: String,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct OSStats {
|
||||
path: String,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceHTTPStats {
|
||||
req_info: TraceRequestInfo,
|
||||
resp_info: TraceResponseInfo,
|
||||
call_stats: TraceCallStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceCallStats {
|
||||
input_bytes: i32,
|
||||
output_bytes: i32,
|
||||
latency: Duration,
|
||||
time_to_first_byte: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceRequestInfo {
|
||||
time: DateTime<Utc>,
|
||||
proto: String,
|
||||
method: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
path: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
raw_query: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
headers: Option<HashMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
body: Option<Vec<u8>>,
|
||||
client: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct TraceResponseInfo {
|
||||
time: DateTime<Utc>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
headers: Option<HashMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
body: Option<Vec<u8>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
status_code: Option<i32>,
|
||||
}
|
||||
37
madmin/src/utils.rs
Normal file
37
madmin/src/utils.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn parse_duration(s: &str) -> Result<Duration, String> {
|
||||
// Implement your own duration parsing logic here
|
||||
// For example, you could use the humantime crate or a custom parser
|
||||
humantime::parse_duration(s).map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::time::Duration;
|
||||
|
||||
use super::parse_duration;
|
||||
|
||||
#[test]
|
||||
fn test_parse_dur() {
|
||||
let s = String::from("3s");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Ok(Duration::from_secs(3)), dur);
|
||||
|
||||
let s = String::from("3ms");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Ok(Duration::from_millis(3)), dur);
|
||||
|
||||
let s = String::from("3m");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Ok(Duration::from_secs(3 * 60)), dur);
|
||||
|
||||
let s = String::from("3h");
|
||||
let dur = parse_duration(&s);
|
||||
println!("{:?}", dur);
|
||||
assert_eq!(Ok(Duration::from_secs(3 * 60 * 60)), dur);
|
||||
}
|
||||
}
|
||||
@@ -15,13 +15,13 @@ use ecstore::peer::is_reserved_or_invalid_bucket;
|
||||
use ecstore::store::is_valid_object_prefix;
|
||||
use ecstore::store_api::StorageAPI;
|
||||
use ecstore::utils::path::path_join;
|
||||
use ecstore::utils::time::parse_duration;
|
||||
use ecstore::utils::xml;
|
||||
use ecstore::GLOBAL_Endpoints;
|
||||
use futures::{Stream, StreamExt};
|
||||
use http::Uri;
|
||||
use hyper::StatusCode;
|
||||
use madmin::metrics::RealtimeMetrics;
|
||||
use madmin::utils::parse_duration;
|
||||
use matchit::Params;
|
||||
use s3s::stream::{ByteStream, DynByteStream};
|
||||
use s3s::{
|
||||
@@ -45,6 +45,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
pub mod service_account;
|
||||
pub mod trace;
|
||||
|
||||
#[derive(Deserialize, Debug, Default)]
|
||||
#[serde(rename_all = "PascalCase", default)]
|
||||
@@ -370,8 +371,8 @@ impl Operation for MetricsHandler {
|
||||
info!("mp: {:?}", mp);
|
||||
|
||||
let tick = match parse_duration(&mp.tick) {
|
||||
Some(i) => i,
|
||||
None => std_Duration::from_secs(1),
|
||||
Ok(i) => i,
|
||||
Err(_) => std_Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let mut n = mp.n;
|
||||
|
||||
36
rustfs/src/admin/handlers/trace.rs
Normal file
36
rustfs/src/admin/handlers/trace.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use ecstore::{peer_rest_client::PeerRestClient, GLOBAL_Endpoints};
|
||||
use http::StatusCode;
|
||||
use hyper::Uri;
|
||||
use madmin::service_commands::ServiceTraceOpts;
|
||||
use matchit::Params;
|
||||
use s3s::{s3_error, Body, S3Request, S3Response, S3Result};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::admin::router::Operation;
|
||||
|
||||
fn extract_trace_options(uri: &Uri) -> S3Result<ServiceTraceOpts> {
|
||||
let mut st_opts = ServiceTraceOpts::default();
|
||||
st_opts
|
||||
.parse_params(uri)
|
||||
.map_err(|_| s3_error!(InvalidRequest, "invalid params"))?;
|
||||
|
||||
Ok(st_opts)
|
||||
}
|
||||
|
||||
pub struct Trace {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for Trace {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle Trace");
|
||||
|
||||
let _trace_opts = extract_trace_options(&req.uri)?;
|
||||
|
||||
// let (tx, rx) = mpsc::channel(10000);
|
||||
let _perrs = match GLOBAL_Endpoints.get() {
|
||||
Some(ep) => PeerRestClient::new_clients(ep.clone()).await,
|
||||
None => (Vec::new(), Vec::new()),
|
||||
};
|
||||
return Err(s3_error!(NotImplemented));
|
||||
}
|
||||
}
|
||||
@@ -1349,7 +1349,7 @@ impl Node for NodeService {
|
||||
}
|
||||
}
|
||||
});
|
||||
let data_usage_cache = disk.ns_scanner(&cache, updates_tx, request.scan_mode as usize).await;
|
||||
let data_usage_cache = disk.ns_scanner(&cache, updates_tx, request.scan_mode as usize, None).await;
|
||||
let _ = task.await;
|
||||
match data_usage_cache {
|
||||
Ok(data_usage_cache) => {
|
||||
|
||||
Reference in New Issue
Block a user