Signed-off-by: mujunxiang <1948535941@qq.com>
This commit is contained in:
mujunxiang
2024-12-06 14:32:17 +08:00
committed by weisd
parent 807be52e8f
commit baf03dffd4
29 changed files with 532 additions and 135 deletions

9
Cargo.lock generated
View File

@@ -1207,6 +1207,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hybrid-array"
version = "0.2.1"
@@ -1676,9 +1682,12 @@ version = "0.0.1"
dependencies = [
"chrono",
"common",
"humantime",
"hyper",
"psutil",
"serde",
"time",
"tracing",
]
[[package]]

View File

@@ -50,6 +50,7 @@ hyper-util = { version = "0.1.10", features = [
] }
http = "1.1.0"
http-body = "1.0.1"
humantime = "2.1.0"
lock = { path = "./common/lock" }
lazy_static = "1.5.0"
mime = "0.3.17"

View File

@@ -21,7 +21,7 @@ use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE};
use crate::error::{Error, Result};
use crate::file_meta::read_xl_meta_no_data;
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, SizeSummary};
use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, ShouldSleepFn, SizeSummary};
use crate::heal::data_scanner_metric::{ScannerMetric, ScannerMetrics};
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE};
@@ -2164,6 +2164,7 @@ impl DiskAPI for LocalDisk {
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
we_sleep: ShouldSleepFn,
) -> Result<DataUsageCache> {
self.scanning.fetch_add(1, Ordering::SeqCst);
defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) });
@@ -2279,6 +2280,7 @@ impl DiskAPI for LocalDisk {
})
}),
scan_mode,
we_sleep,
)
.await?;
data_usage_info.info.last_update = Some(SystemTime::now());

View File

@@ -19,6 +19,7 @@ use crate::{
error::{Error, Result},
file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion},
heal::{
data_scanner::ShouldSleepFn,
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::{HealScanMode, HealingTracker},
},
@@ -351,11 +352,12 @@ impl DiskAPI for Disk {
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
we_sleep: ShouldSleepFn,
) -> Result<DataUsageCache> {
info!("ns_scanner");
match self {
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await,
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await,
Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await,
Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await,
}
}
@@ -468,6 +470,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
we_sleep: ShouldSleepFn,
) -> Result<DataUsageCache>;
async fn healing(&self) -> Option<HealingTracker>;
}

View File

@@ -25,6 +25,7 @@ use crate::{
disk::error::DiskError,
error::{Error, Result},
heal::{
data_scanner::ShouldSleepFn,
data_usage_cache::{DataUsageCache, DataUsageEntry},
heal_commands::{HealScanMode, HealingTracker},
},
@@ -759,6 +760,7 @@ impl DiskAPI for RemoteDisk {
cache: &DataUsageCache,
updates: Sender<DataUsageEntry>,
scan_mode: HealScanMode,
_we_sleep: ShouldSleepFn,
) -> Result<DataUsageCache> {
info!("ns_scanner");
let cache = serde_json::to_string(cache)?;

View File

@@ -1,4 +1,5 @@
use tracing::warn;
use tracing::{info, warn};
use url::Url;
use crate::{
disk::endpoint::{Endpoint, EndpointType},

View File

@@ -1,3 +1,4 @@
use madmin::heal_commands::HealResultItem;
use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration};
use tokio::{
sync::{
@@ -10,7 +11,7 @@ use tracing::{error, info};
use uuid::Uuid;
use super::{
heal_commands::{HealOpts, HealResultItem},
heal_commands::HealOpts,
heal_ops::{new_bg_heal_sequence, HealSequence},
};
use crate::heal::error::ERR_RETRY_HEALING;

View File

@@ -62,7 +62,7 @@ use crate::{
store_api::{FileInfo, ObjectInfo},
};
const _DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch.
const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch.
@@ -73,7 +73,6 @@ const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to w
pub const HEAL_DELETE_DANGLING: bool = true;
const HEAL_OBJECT_SELECT_PROB: u64 = 1024; // Overall probability of a file being scanned; one in n.
// static SCANNER_SLEEPER: () = new_dynamic_sleeper(2, Duration::from_secs(1), true); // Keep defaults same as config defaults
static SCANNER_CYCLE: AtomicU64 = AtomicU64::new(DATA_SCANNER_START_DELAY.as_secs());
static _SCANNER_IDLE_MODE: AtomicU32 = AtomicU32::new(0); // default is throttled when idle
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
@@ -81,9 +80,67 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102
static SCANNER_EXCESS_FOLDERS: AtomicU64 = AtomicU64::new(50_000);
lazy_static! {
static ref SCANNER_SLEEPER: RwLock<DynamicSleeper> = RwLock::new(new_dynamic_sleeper(2.0, Duration::from_secs(1), true));
pub static ref globalHealConfig: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::default()));
}
struct DynamicSleeper {
factor: f64,
max_sleep: Duration,
min_sleep: Duration,
_is_scanner: bool,
}
type TimerFn = Pin<Box<dyn Future<Output = ()> + Send>>;
impl DynamicSleeper {
fn timer() -> TimerFn {
let t = SystemTime::now();
Box::pin(async move {
let done_at = SystemTime::now().duration_since(t).unwrap_or_default();
SCANNER_SLEEPER.read().await.sleep(done_at).await;
})
}
async fn sleep(&self, base: Duration) {
let (min_wait, max_wait) = (self.min_sleep, self.max_sleep);
let factor = self.factor;
let want_sleep = {
let tmp = base.mul_f64(factor);
if tmp < min_wait {
return;
}
if max_wait > Duration::from_secs(0) && tmp > max_wait {
max_wait
} else {
tmp
}
};
sleep(want_sleep).await;
}
fn _update(&mut self, factor: f64, max_wait: Duration) -> Result<()> {
if (self.factor - factor).abs() < 1e-10 && self.max_sleep == max_wait {
return Ok(());
}
self.factor = factor;
self.max_sleep = max_wait;
Ok(())
}
}
fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> DynamicSleeper {
DynamicSleeper {
factor,
max_sleep: max_wait,
min_sleep: Duration::from_micros(100),
_is_scanner: is_scanner,
}
}
pub async fn init_data_scanner() {
tokio::spawn(async move {
loop {
@@ -457,6 +514,7 @@ struct CachedFolder {
pub type GetSizeFn =
Box<dyn Fn(&ScannerItem) -> Pin<Box<dyn Future<Output = Result<SizeSummary>> + Send>> + Send + Sync + 'static>;
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>;
pub type ShouldSleepFn = Option<Arc<dyn Fn() -> bool + Send + Sync + 'static>>;
struct FolderScanner {
root: String,
@@ -474,6 +532,7 @@ struct FolderScanner {
update_current_path: UpdateCurrentPathFn,
skip_heal: AtomicBool,
drive: LocalDrive,
we_sleep: ShouldSleepFn,
}
impl FolderScanner {
@@ -514,6 +573,12 @@ impl FolderScanner {
None
};
if let Some(should_sleep) = &self.we_sleep {
if should_sleep() {
SCANNER_SLEEPER.read().await.sleep(DATA_SCANNER_SLEEP_PER_FOLDER).await;
}
}
let mut existing_folders = Vec::new();
let mut new_folders = Vec::new();
let mut found_objects: bool = false;
@@ -553,6 +618,16 @@ impl FolderScanner {
continue;
}
let _wait = if let Some(should_sleep) = &self.we_sleep {
if should_sleep() {
DynamicSleeper::timer()
} else {
Box::pin(async {})
}
} else {
Box::pin(async {})
};
let mut item = ScannerItem {
path: Path::new(&self.root).join(&ent_name).to_string_lossy().to_string(),
bucket,
@@ -1001,6 +1076,7 @@ pub async fn scan_data_folder(
cache: &DataUsageCache,
get_size_fn: GetSizeFn,
heal_scan_mode: HealScanMode,
should_sleep: ShouldSleepFn,
) -> Result<DataUsageCache> {
if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT {
return Err(Error::from_string("internal error: root scan attempted"));
@@ -1029,6 +1105,7 @@ pub async fn scan_data_folder(
disks_quorum: disks.len() / 2,
skip_heal,
drive: drive.clone(),
we_sleep: should_sleep,
};
if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing {

View File

@@ -24,7 +24,6 @@ use crate::{
use super::{background_heal_ops::get_local_disks_to_heal, heal_ops::BG_HEALING_UUID};
pub type HealScanMode = usize;
pub type HealItemType = String;
pub const HEAL_UNKNOWN_SCAN: HealScanMode = 0;
pub const HEAL_NORMAL_SCAN: HealScanMode = 1;
@@ -66,49 +65,6 @@ pub struct HealOpts {
pub set: Option<usize>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct HealDriveInfo {
pub uuid: String,
pub endpoint: String,
pub state: String,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct Infos {
#[serde(rename = "drives")]
pub drives: Vec<HealDriveInfo>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct HealResultItem {
#[serde(rename = "resultId")]
pub result_index: usize,
#[serde(rename = "type")]
pub heal_item_type: HealItemType,
#[serde(rename = "bucket")]
pub bucket: String,
#[serde(rename = "object")]
pub object: String,
#[serde(rename = "versionId")]
pub version_id: String,
#[serde(rename = "detail")]
pub detail: String,
#[serde(rename = "parityBlocks")]
pub parity_blocks: usize,
#[serde(rename = "dataBlocks")]
pub data_blocks: usize,
#[serde(rename = "diskCount")]
pub disk_count: usize,
#[serde(rename = "setCount")]
pub set_count: usize,
#[serde(rename = "before")]
pub before: Infos,
#[serde(rename = "after")]
pub after: Infos,
#[serde(rename = "objectSize")]
pub object_size: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HealStartSuccess {
#[serde(rename = "clientToken")]

View File

@@ -2,19 +2,14 @@ use super::{
background_heal_ops::HealTask,
data_scanner::HEAL_DELETE_DANGLING,
error::ERR_SKIP_FILE,
heal_commands::{
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA,
},
heal_commands::{HealOpts, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker, HEAL_ITEM_BUCKET_METADATA},
};
use crate::store_api::StorageAPI;
use crate::{
config::common::CONFIG_PREFIX,
disk::RUSTFS_META_BUCKET,
global::GLOBAL_BackgroundHealRoutine,
heal::{
error::ERR_HEAL_STOP_SIGNALLED,
heal_commands::{HealDriveInfo, DRIVE_STATE_OK},
},
heal::{error::ERR_HEAL_STOP_SIGNALLED, heal_commands::DRIVE_STATE_OK},
};
use crate::{
disk::{endpoint::Endpoint, MetaCacheEntry},
@@ -32,6 +27,7 @@ use crate::{
use chrono::Utc;
use futures::join;
use lazy_static::lazy_static;
use madmin::heal_commands::{HealDriveInfo, HealItemType, HealResultItem};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,

View File

@@ -18,7 +18,7 @@ pub mod metacache;
pub mod metrics_realtime;
pub mod notification_sys;
pub mod peer;
mod peer_rest_client;
pub mod peer_rest_client;
pub mod pools;
mod quorum;
pub mod set_disk;

View File

@@ -25,14 +25,14 @@ pub fn get_global_notification_sys() -> Option<&'static NotificationSys> {
}
pub struct NotificationSys {
peer_clients: Vec<Option<PeerRestClient>>,
pub peer_clients: Vec<Option<PeerRestClient>>,
#[allow(dead_code)]
all_peer_clients: Vec<Option<PeerRestClient>>,
pub all_peer_clients: Vec<Option<PeerRestClient>>,
}
impl NotificationSys {
pub async fn new(eps: EndpointServerPools) -> Self {
let (peer_clients, all_peer_clients) = PeerRestClient::new_clients(eps).await;
let (peer_clients, all_peer_clients) = PeerRestClient::new_clients(&eps).await;
Self {
peer_clients,
all_peer_clients,
@@ -46,9 +46,7 @@ pub struct NotificationPeerErr {
}
impl NotificationSys {
pub fn rest_client_from_hash(&self, s:&str) ->Option<PeerRestClient>{
pub fn rest_client_from_hash(&self, s: &str) -> Option<PeerRestClient> {
None
}
pub async fn delete_policy(&self) -> Vec<NotificationPeerErr> {

View File

@@ -1,5 +1,6 @@
use async_trait::async_trait;
use futures::future::join_all;
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
@@ -14,8 +15,7 @@ use crate::disk::error::is_all_buckets_not_found;
use crate::disk::{DiskAPI, DiskStore};
use crate::global::GLOBAL_LOCAL_DISK_MAP;
use crate::heal::heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK,
HEAL_ITEM_BUCKET,
HealOpts, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_ITEM_BUCKET,
};
use crate::heal::heal_ops::RUESTFS_RESERVED_BUCKET;
use crate::quorum::{bucket_op_ignored_errs, reduce_write_quorum_errs};

View File

@@ -35,7 +35,7 @@ pub const PEER_RESTSIGNAL: &str = "signal";
pub const PEER_RESTSUB_SYS: &str = "sub-sys";
pub const PEER_RESTDRY_RUN: &str = "dry-run";
#[derive(Debug, Clone)]
#[derive(Clone, Debug)]
pub struct PeerRestClient {
pub host: XHost,
pub grid_host: String,

View File

@@ -30,8 +30,8 @@ use crate::{
data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo},
heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING,
DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
},
heal_ops::BG_HEALING_UUID,
},
@@ -67,6 +67,7 @@ use lock::{
namespace_lock::{new_nslock, NsLockMap},
LockApi,
};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rand::{
thread_rng,
{seq::SliceRandom, Rng},
@@ -2816,7 +2817,7 @@ impl SetDisks {
});
// Calc usage
let before = cache.info.last_update;
let cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode).await {
let cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode, None).await {
Ok(cache) => cache,
Err(_) => {
if cache.info.last_update > before {

View File

@@ -5,6 +5,7 @@ use common::globals::GLOBAL_Local_Node_Name;
use futures::future::join_all;
use http::HeaderMap;
use lock::{namespace_lock::NsLockMap, new_lock_api, LockApi};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use tokio::sync::RwLock;
use uuid::Uuid;
@@ -18,8 +19,7 @@ use crate::{
error::{Error, Result},
global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES},
heal::heal_commands::{
HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK,
HEAL_ITEM_METADATA,
HealOpts, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_ITEM_METADATA,
},
set_disk::SetDisks,
store_api::{

View File

@@ -12,7 +12,7 @@ use crate::global::{
};
use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT};
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
use crate::heal::heal_commands::{HealOpts, HealScanMode, HEAL_ITEM_METADATA};
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::new_object_layer_fn;
use crate::notification_sys::get_global_notification_sys;
@@ -45,6 +45,7 @@ use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
use lazy_static::lazy_static;
use madmin::heal_commands::HealResultItem;
use rand::Rng;
use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration};
use std::cmp::Ordering;

View File

@@ -2,12 +2,14 @@ use crate::heal::heal_ops::HealSequence;
use crate::{
disk::DiskStore,
error::{Error, Result},
heal::heal_commands::{HealOpts, HealResultItem},
heal::heal_commands::HealOpts,
utils::path::decode_dir_object,
xhttp,
};
use futures::StreamExt;
use http::HeaderMap;
use madmin::heal_commands::HealResultItem;
use madmin::info_commands::DiskMetrics;
use rmp_serde::Serializer;
use s3s::{dto::StreamingBlob, Body};
use serde::{Deserialize, Serialize};

View File

@@ -6,6 +6,5 @@ pub mod hash;
pub mod net;
pub mod os;
pub mod path;
pub mod time;
pub mod wildcard;
pub mod xml;

View File

@@ -1,55 +0,0 @@
use std::time::Duration;
use tracing::info;
pub fn parse_duration(s: &str) -> Option<Duration> {
if s.ends_with("ms") {
if let Ok(s) = s.trim_end_matches("ms").parse::<u64>() {
return Some(Duration::from_millis(s));
}
} else if s.ends_with("s") {
if let Ok(s) = s.trim_end_matches('s').parse::<u64>() {
return Some(Duration::from_secs(s));
}
} else if s.ends_with("m") {
if let Ok(s) = s.trim_end_matches('m').parse::<u64>() {
return Some(Duration::from_secs(s * 60));
}
} else if s.ends_with("h") {
if let Ok(s) = s.trim_end_matches('h').parse::<u64>() {
return Some(Duration::from_secs(s * 60 * 60));
}
}
info!("can not parse duration, s: {}", s);
None
}
#[cfg(test)]
mod test {
use std::time::Duration;
use super::parse_duration;
#[test]
fn test_parse_dur() {
let s = String::from("3s");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Some(Duration::from_secs(3)), dur);
let s = String::from("3ms");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Some(Duration::from_millis(3)), dur);
let s = String::from("3m");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Some(Duration::from_secs(3 * 60)), dur);
let s = String::from("3h");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Some(Duration::from_secs(3 * 60 * 60)), dur);
}
}

View File

@@ -12,6 +12,9 @@ workspace = true
[dependencies]
chrono.workspace = true
common.workspace = true
humantime.workspace = true
hyper.workspace = true
psutil = "3.3.0"
serde.workspace = true
time.workspace =true
tracing.workspace = truetime.workspace =true

View File

@@ -0,0 +1,46 @@
use serde::{Deserialize, Serialize};
pub type HealItemType = String;
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct HealDriveInfo {
pub uuid: String,
pub endpoint: String,
pub state: String,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct Infos {
#[serde(rename = "drives")]
pub drives: Vec<HealDriveInfo>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct HealResultItem {
#[serde(rename = "resultId")]
pub result_index: usize,
#[serde(rename = "type")]
pub heal_item_type: HealItemType,
#[serde(rename = "bucket")]
pub bucket: String,
#[serde(rename = "object")]
pub object: String,
#[serde(rename = "versionId")]
pub version_id: String,
#[serde(rename = "detail")]
pub detail: String,
#[serde(rename = "parityBlocks")]
pub parity_blocks: usize,
#[serde(rename = "dataBlocks")]
pub data_blocks: usize,
#[serde(rename = "diskCount")]
pub disk_count: usize,
#[serde(rename = "setCount")]
pub set_count: usize,
#[serde(rename = "before")]
pub before: Infos,
#[serde(rename = "after")]
pub after: Infos,
#[serde(rename = "objectSize")]
pub object_size: usize,
}

View File

@@ -1,6 +1,10 @@
pub mod heal_commands;
pub mod health;
pub mod info_commands;
pub mod metrics;
pub mod net;
pub mod service_commands;
pub mod trace;
pub mod utils;
pub use info_commands::*;

View File

@@ -0,0 +1,103 @@
use std::{collections::HashMap, time::Duration};
use hyper::Uri;
use crate::{trace::TraceType, utils::parse_duration};
#[derive(Debug, Default)]
pub struct ServiceTraceOpts {
s3: bool,
internal: bool,
storage: bool,
os: bool,
scanner: bool,
decommission: bool,
healing: bool,
batch_replication: bool,
batch_key_rotation: bool,
batch_expire: bool,
batch_all: bool,
rebalance: bool,
replication_resync: bool,
bootstrap: bool,
ftp: bool,
ilm: bool,
only_errors: bool,
threshold: Duration,
}
impl ServiceTraceOpts {
fn trace_types(&self) -> TraceType {
let mut tt = TraceType::default();
tt.set_if(self.s3, &TraceType::S3);
tt.set_if(self.internal, &TraceType::INTERNAL);
tt.set_if(self.storage, &TraceType::STORAGE);
tt.set_if(self.os, &TraceType::OS);
tt.set_if(self.scanner, &TraceType::SCANNER);
tt.set_if(self.decommission, &TraceType::DECOMMISSION);
tt.set_if(self.healing, &TraceType::HEALING);
if self.batch_all {
tt.set_if(true, &TraceType::BATCH_REPLICATION);
tt.set_if(true, &TraceType::BATCH_KEY_ROTATION);
tt.set_if(true, &TraceType::BATCH_EXPIRE);
} else {
tt.set_if(self.batch_replication, &TraceType::BATCH_REPLICATION);
tt.set_if(self.batch_key_rotation, &TraceType::BATCH_KEY_ROTATION);
tt.set_if(self.batch_expire, &TraceType::BATCH_EXPIRE);
}
tt.set_if(self.rebalance, &TraceType::REBALANCE);
tt.set_if(self.replication_resync, &TraceType::REPLICATION_RESYNC);
tt.set_if(self.bootstrap, &TraceType::BOOTSTRAP);
tt.set_if(self.ftp, &TraceType::FTP);
tt.set_if(self.ilm, &TraceType::ILM);
tt
}
pub fn parse_params(&mut self, uri: &Uri) -> Result<(), String> {
let query_pairs: HashMap<_, _> = uri
.query()
.unwrap_or("")
.split('&')
.filter_map(|pair| {
let mut split = pair.split('=');
let key = split.next()?.to_string();
let value = split.next().map(|v| v.to_string()).unwrap_or_else(|| "false".to_string());
Some((key, value))
})
.collect();
self.s3 = query_pairs.get("s3").map_or(false, |v| v == "true");
self.os = query_pairs.get("os").map_or(false, |v| v == "true");
self.scanner = query_pairs.get("scanner").map_or(false, |v| v == "true");
self.decommission = query_pairs.get("decommission").map_or(false, |v| v == "true");
self.healing = query_pairs.get("healing").map_or(false, |v| v == "true");
self.batch_replication = query_pairs.get("batch-replication").map_or(false, |v| v == "true");
self.batch_key_rotation = query_pairs.get("batch-keyrotation").map_or(false, |v| v == "true");
self.batch_expire = query_pairs.get("batch-expire").map_or(false, |v| v == "true");
if query_pairs.get("all").map_or(false, |v| v == "true") {
self.s3 = true;
self.internal = true;
self.storage = true;
self.os = true;
}
self.rebalance = query_pairs.get("rebalance").map_or(false, |v| v == "true");
self.storage = query_pairs.get("storage").map_or(false, |v| v == "true");
self.internal = query_pairs.get("internal").map_or(false, |v| v == "true");
self.only_errors = query_pairs.get("err").map_or(false, |v| v == "true");
self.replication_resync = query_pairs.get("replication-resync").map_or(false, |v| v == "true");
self.bootstrap = query_pairs.get("bootstrap").map_or(false, |v| v == "true");
self.ftp = query_pairs.get("ftp").map_or(false, |v| v == "true");
self.ilm = query_pairs.get("ilm").map_or(false, |v| v == "true");
if let Some(threshold) = query_pairs.get("threshold") {
let duration = parse_duration(threshold)?;
self.threshold = duration;
}
Ok(())
}
}

172
madmin/src/trace.rs Normal file
View File

@@ -0,0 +1,172 @@
use std::{collections::HashMap, time::Duration};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::heal_commands::HealResultItem;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TraceType(u64);
impl TraceType {
// 定义一些常量
pub const OS: TraceType = TraceType(1 << 0);
pub const STORAGE: TraceType = TraceType(1 << 1);
pub const S3: TraceType = TraceType(1 << 2);
pub const INTERNAL: TraceType = TraceType(1 << 3);
pub const SCANNER: TraceType = TraceType(1 << 4);
pub const DECOMMISSION: TraceType = TraceType(1 << 5);
pub const HEALING: TraceType = TraceType(1 << 6);
pub const BATCH_REPLICATION: TraceType = TraceType(1 << 7);
pub const BATCH_KEY_ROTATION: TraceType = TraceType(1 << 8);
pub const BATCH_EXPIRE: TraceType = TraceType(1 << 9);
pub const REBALANCE: TraceType = TraceType(1 << 10);
pub const REPLICATION_RESYNC: TraceType = TraceType(1 << 11);
pub const BOOTSTRAP: TraceType = TraceType(1 << 12);
pub const FTP: TraceType = TraceType(1 << 13);
pub const ILM: TraceType = TraceType(1 << 14);
// MetricsAll must be last.
pub const ALL: TraceType = TraceType((1 << 15) - 1);
pub fn new(t: u64) -> Self {
Self(t)
}
}
impl Default for TraceType {
fn default() -> Self {
Self(0)
}
}
impl TraceType {
pub fn contains(&self, x: &TraceType) -> bool {
(self.0 & x.0) == x.0
}
pub fn overlaps(&self, x: &TraceType) -> bool {
(self.0 & x.0) != 0
}
pub fn single_type(&self) -> bool {
todo!()
}
pub fn merge(&mut self, other: &TraceType) {
self.0 = self.0 | other.0
}
pub fn set_if(&mut self, b: bool, other: &TraceType) {
if b {
self.0 = self.0 | other.0
}
}
pub fn mask(&self) -> u64 {
self.0
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TraceInfo {
#[serde(rename = "type")]
trace_type: u64,
#[serde(rename = "nodename")]
node_name: String,
#[serde(rename = "funcname")]
func_name: String,
#[serde(rename = "time")]
time: DateTime<Utc>,
#[serde(rename = "path")]
path: String,
#[serde(rename = "dur")]
duration: Duration,
#[serde(rename = "bytes", skip_serializing_if = "Option::is_none")]
bytes: Option<i64>,
#[serde(rename = "msg", skip_serializing_if = "Option::is_none")]
message: Option<String>,
#[serde(rename = "error", skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(rename = "custom", skip_serializing_if = "Option::is_none")]
custom: Option<HashMap<String, String>>,
#[serde(rename = "http", skip_serializing_if = "Option::is_none")]
http: Option<TraceHTTPStats>,
#[serde(rename = "healResult", skip_serializing_if = "Option::is_none")]
heal_result: Option<HealResultItem>,
}
impl TraceInfo {
pub fn mask(&self) -> u64 {
TraceType::new(self.trace_type).mask()
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TraceInfoLegacy {
trace_info: TraceInfo,
#[serde(rename = "request")]
req_info: Option<TraceRequestInfo>,
#[serde(rename = "response")]
resp_info: Option<TraceResponseInfo>,
#[serde(rename = "stats")]
call_stats: Option<TraceCallStats>,
#[serde(rename = "storageStats")]
storage_stats: Option<StorageStats>,
#[serde(rename = "osStats")]
os_stats: Option<OSStats>,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StorageStats {
path: String,
duration: Duration,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OSStats {
path: String,
duration: Duration,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TraceHTTPStats {
req_info: TraceRequestInfo,
resp_info: TraceResponseInfo,
call_stats: TraceCallStats,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TraceCallStats {
input_bytes: i32,
output_bytes: i32,
latency: Duration,
time_to_first_byte: Duration,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TraceRequestInfo {
time: DateTime<Utc>,
proto: String,
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
raw_query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<Vec<u8>>,
client: String,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TraceResponseInfo {
time: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<Vec<u8>>,
#[serde(skip_serializing_if = "Option::is_none")]
status_code: Option<i32>,
}

37
madmin/src/utils.rs Normal file
View File

@@ -0,0 +1,37 @@
use std::time::Duration;
pub fn parse_duration(s: &str) -> Result<Duration, String> {
// Implement your own duration parsing logic here
// For example, you could use the humantime crate or a custom parser
humantime::parse_duration(s).map_err(|e| e.to_string())
}
#[cfg(test)]
mod test {
use std::time::Duration;
use super::parse_duration;
#[test]
fn test_parse_dur() {
let s = String::from("3s");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Ok(Duration::from_secs(3)), dur);
let s = String::from("3ms");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Ok(Duration::from_millis(3)), dur);
let s = String::from("3m");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Ok(Duration::from_secs(3 * 60)), dur);
let s = String::from("3h");
let dur = parse_duration(&s);
println!("{:?}", dur);
assert_eq!(Ok(Duration::from_secs(3 * 60 * 60)), dur);
}
}

View File

@@ -15,13 +15,13 @@ use ecstore::peer::is_reserved_or_invalid_bucket;
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::StorageAPI;
use ecstore::utils::path::path_join;
use ecstore::utils::time::parse_duration;
use ecstore::utils::xml;
use ecstore::GLOBAL_Endpoints;
use futures::{Stream, StreamExt};
use http::Uri;
use hyper::StatusCode;
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;
use matchit::Params;
use s3s::stream::{ByteStream, DynByteStream};
use s3s::{
@@ -45,6 +45,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info, warn};
pub mod service_account;
pub mod trace;
#[derive(Deserialize, Debug, Default)]
#[serde(rename_all = "PascalCase", default)]
@@ -370,8 +371,8 @@ impl Operation for MetricsHandler {
info!("mp: {:?}", mp);
let tick = match parse_duration(&mp.tick) {
Some(i) => i,
None => std_Duration::from_secs(1),
Ok(i) => i,
Err(_) => std_Duration::from_secs(1),
};
let mut n = mp.n;

View File

@@ -0,0 +1,37 @@
use ecstore::{peer_rest_client::PeerRestClient, GLOBAL_Endpoints};
use http::StatusCode;
use hyper::Uri;
use madmin::service_commands::ServiceTraceOpts;
use matchit::Params;
use s3s::{s3_error, Body, S3Request, S3Response, S3Result};
use tokio::sync::mpsc;
use tracing::warn;
use crate::admin::router::Operation;
fn extract_trace_options(uri: &Uri) -> S3Result<ServiceTraceOpts> {
let mut st_opts = ServiceTraceOpts::default();
st_opts
.parse_params(uri)
.map_err(|_| s3_error!(InvalidRequest, "invalid params"))?;
Ok(st_opts)
}
pub struct Trace {}
#[async_trait::async_trait]
impl Operation for Trace {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle Trace");
let trace_opts = extract_trace_options(&req.uri)?;
// let (tx, rx) = mpsc::channel(10000);
let perrs = match GLOBAL_Endpoints.get() {
Some(ep) => PeerRestClient::new_clients(ep).await,
None => (Vec::new(), Vec::new()),
};
return Err(s3_error!(NotImplemented));
}
}

View File

@@ -1349,7 +1349,7 @@ impl Node for NodeService {
}
}
});
let data_usage_cache = disk.ns_scanner(&cache, updates_tx, request.scan_mode as usize).await;
let data_usage_cache = disk.ns_scanner(&cache, updates_tx, request.scan_mode as usize, None).await;
let _ = task.await;
match data_usage_cache {
Ok(data_usage_cache) => {