diff --git a/Cargo.lock b/Cargo.lock index 624ea269..755756bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1207,6 +1207,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hybrid-array" version = "0.2.1" @@ -1676,9 +1682,12 @@ version = "0.0.1" dependencies = [ "chrono", "common", + "humantime", + "hyper", "psutil", "serde", "time", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7736c02f..31028857 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ hyper-util = { version = "0.1.10", features = [ ] } http = "1.1.0" http-body = "1.0.1" +humantime = "2.1.0" lock = { path = "./common/lock" } lazy_static = "1.5.0" mime = "0.3.17" diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index d44fa68d..51ba61c4 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -21,7 +21,7 @@ use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE}; use crate::error::{Error, Result}; use crate::file_meta::read_xl_meta_no_data; use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; -use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, SizeSummary}; +use crate::heal::data_scanner::{has_active_rules, scan_data_folder, ScannerItem, ShouldSleepFn, SizeSummary}; use crate::heal::data_scanner_metric::{ScannerMetric, ScannerMetrics}; use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry}; use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE}; @@ -2164,6 +2164,7 @@ impl DiskAPI for LocalDisk { cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, + we_sleep: ShouldSleepFn, ) -> Result { self.scanning.fetch_add(1, Ordering::SeqCst); defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) }); @@ -2279,6 +2280,7 @@ impl DiskAPI for LocalDisk { }) }), scan_mode, + we_sleep, ) .await?; data_usage_info.info.last_update = Some(SystemTime::now()); diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 0b608030..35e54ce1 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -19,6 +19,7 @@ use crate::{ error::{Error, Result}, file_meta::{merge_file_meta_versions, FileMeta, FileMetaShallowVersion}, heal::{ + data_scanner::ShouldSleepFn, data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::{HealScanMode, HealingTracker}, }, @@ -351,11 +352,12 @@ impl DiskAPI for Disk { cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, + we_sleep: ShouldSleepFn, ) -> Result { info!("ns_scanner"); match self { - Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await, - Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await, + Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await, + Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode, we_sleep).await, } } @@ -468,6 +470,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, + we_sleep: ShouldSleepFn, ) -> Result; async fn healing(&self) -> Option; } diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 3259f941..a8239f65 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -25,6 +25,7 @@ use crate::{ disk::error::DiskError, error::{Error, Result}, heal::{ + data_scanner::ShouldSleepFn, data_usage_cache::{DataUsageCache, DataUsageEntry}, heal_commands::{HealScanMode, HealingTracker}, }, @@ -759,6 +760,7 @@ impl DiskAPI for RemoteDisk { cache: &DataUsageCache, updates: Sender, scan_mode: HealScanMode, + _we_sleep: ShouldSleepFn, ) -> Result { info!("ns_scanner"); let cache = serde_json::to_string(cache)?; diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 334c74d6..b1205b87 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -1,4 +1,5 @@ -use tracing::warn; +use tracing::{info, warn}; +use url::Url; use crate::{ disk::endpoint::{Endpoint, EndpointType}, diff --git a/ecstore/src/heal/background_heal_ops.rs b/ecstore/src/heal/background_heal_ops.rs index fa9a1f63..4e49d7f5 100644 --- a/ecstore/src/heal/background_heal_ops.rs +++ b/ecstore/src/heal/background_heal_ops.rs @@ -1,3 +1,4 @@ +use madmin::heal_commands::HealResultItem; use std::{cmp::Ordering, env, path::PathBuf, sync::Arc, time::Duration}; use tokio::{ sync::{ @@ -10,7 +11,7 @@ use tracing::{error, info}; use uuid::Uuid; use super::{ - heal_commands::{HealOpts, HealResultItem}, + heal_commands::HealOpts, heal_ops::{new_bg_heal_sequence, HealSequence}, }; use crate::heal::error::ERR_RETRY_HEALING; diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 0f579093..05b883af 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -62,7 +62,7 @@ use crate::{ store_api::{FileInfo, ObjectInfo}, }; -const _DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders. +const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders. const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles. const DATA_SCANNER_COMPACT_LEAST_OBJECT: u64 = 500; // Compact when there are less than this many objects in a branch. const DATA_SCANNER_COMPACT_AT_CHILDREN: u64 = 10000; // Compact when there are this many children in a branch. @@ -73,7 +73,6 @@ const DATA_SCANNER_START_DELAY: Duration = Duration::from_secs(60); // Time to w pub const HEAL_DELETE_DANGLING: bool = true; const HEAL_OBJECT_SELECT_PROB: u64 = 1024; // Overall probability of a file being scanned; one in n. -// static SCANNER_SLEEPER: () = new_dynamic_sleeper(2, Duration::from_secs(1), true); // Keep defaults same as config defaults static SCANNER_CYCLE: AtomicU64 = AtomicU64::new(DATA_SCANNER_START_DELAY.as_secs()); static _SCANNER_IDLE_MODE: AtomicU32 = AtomicU32::new(0); // default is throttled when idle static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100); @@ -81,9 +80,67 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102 static SCANNER_EXCESS_FOLDERS: AtomicU64 = AtomicU64::new(50_000); lazy_static! { + static ref SCANNER_SLEEPER: RwLock = RwLock::new(new_dynamic_sleeper(2.0, Duration::from_secs(1), true)); pub static ref globalHealConfig: Arc> = Arc::new(RwLock::new(Config::default())); } +struct DynamicSleeper { + factor: f64, + max_sleep: Duration, + min_sleep: Duration, + _is_scanner: bool, +} + +type TimerFn = Pin + Send>>; +impl DynamicSleeper { + fn timer() -> TimerFn { + let t = SystemTime::now(); + Box::pin(async move { + let done_at = SystemTime::now().duration_since(t).unwrap_or_default(); + SCANNER_SLEEPER.read().await.sleep(done_at).await; + }) + } + + async fn sleep(&self, base: Duration) { + let (min_wait, max_wait) = (self.min_sleep, self.max_sleep); + let factor = self.factor; + + let want_sleep = { + let tmp = base.mul_f64(factor); + if tmp < min_wait { + return; + } + + if max_wait > Duration::from_secs(0) && tmp > max_wait { + max_wait + } else { + tmp + } + }; + sleep(want_sleep).await; + } + + fn _update(&mut self, factor: f64, max_wait: Duration) -> Result<()> { + if (self.factor - factor).abs() < 1e-10 && self.max_sleep == max_wait { + return Ok(()); + } + + self.factor = factor; + self.max_sleep = max_wait; + + Ok(()) + } +} + +fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> DynamicSleeper { + DynamicSleeper { + factor, + max_sleep: max_wait, + min_sleep: Duration::from_micros(100), + _is_scanner: is_scanner, + } +} + pub async fn init_data_scanner() { tokio::spawn(async move { loop { @@ -457,6 +514,7 @@ struct CachedFolder { pub type GetSizeFn = Box Pin> + Send>> + Send + Sync + 'static>; pub type UpdateCurrentPathFn = Arc Pin + Send>> + Send + Sync + 'static>; +pub type ShouldSleepFn = Option bool + Send + Sync + 'static>>; struct FolderScanner { root: String, @@ -474,6 +532,7 @@ struct FolderScanner { update_current_path: UpdateCurrentPathFn, skip_heal: AtomicBool, drive: LocalDrive, + we_sleep: ShouldSleepFn, } impl FolderScanner { @@ -514,6 +573,12 @@ impl FolderScanner { None }; + if let Some(should_sleep) = &self.we_sleep { + if should_sleep() { + SCANNER_SLEEPER.read().await.sleep(DATA_SCANNER_SLEEP_PER_FOLDER).await; + } + } + let mut existing_folders = Vec::new(); let mut new_folders = Vec::new(); let mut found_objects: bool = false; @@ -553,6 +618,16 @@ impl FolderScanner { continue; } + let _wait = if let Some(should_sleep) = &self.we_sleep { + if should_sleep() { + DynamicSleeper::timer() + } else { + Box::pin(async {}) + } + } else { + Box::pin(async {}) + }; + let mut item = ScannerItem { path: Path::new(&self.root).join(&ent_name).to_string_lossy().to_string(), bucket, @@ -1001,6 +1076,7 @@ pub async fn scan_data_folder( cache: &DataUsageCache, get_size_fn: GetSizeFn, heal_scan_mode: HealScanMode, + should_sleep: ShouldSleepFn, ) -> Result { if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT { return Err(Error::from_string("internal error: root scan attempted")); @@ -1029,6 +1105,7 @@ pub async fn scan_data_folder( disks_quorum: disks.len() / 2, skip_heal, drive: drive.clone(), + we_sleep: should_sleep, }; if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing { diff --git a/ecstore/src/heal/heal_commands.rs b/ecstore/src/heal/heal_commands.rs index 0ce85910..3aee3cd5 100644 --- a/ecstore/src/heal/heal_commands.rs +++ b/ecstore/src/heal/heal_commands.rs @@ -24,7 +24,6 @@ use crate::{ use super::{background_heal_ops::get_local_disks_to_heal, heal_ops::BG_HEALING_UUID}; pub type HealScanMode = usize; -pub type HealItemType = String; pub const HEAL_UNKNOWN_SCAN: HealScanMode = 0; pub const HEAL_NORMAL_SCAN: HealScanMode = 1; @@ -66,49 +65,6 @@ pub struct HealOpts { pub set: Option, } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct HealDriveInfo { - pub uuid: String, - pub endpoint: String, - pub state: String, -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct Infos { - #[serde(rename = "drives")] - pub drives: Vec, -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct HealResultItem { - #[serde(rename = "resultId")] - pub result_index: usize, - #[serde(rename = "type")] - pub heal_item_type: HealItemType, - #[serde(rename = "bucket")] - pub bucket: String, - #[serde(rename = "object")] - pub object: String, - #[serde(rename = "versionId")] - pub version_id: String, - #[serde(rename = "detail")] - pub detail: String, - #[serde(rename = "parityBlocks")] - pub parity_blocks: usize, - #[serde(rename = "dataBlocks")] - pub data_blocks: usize, - #[serde(rename = "diskCount")] - pub disk_count: usize, - #[serde(rename = "setCount")] - pub set_count: usize, - #[serde(rename = "before")] - pub before: Infos, - #[serde(rename = "after")] - pub after: Infos, - #[serde(rename = "objectSize")] - pub object_size: usize, -} - #[derive(Debug, Serialize, Deserialize)] pub struct HealStartSuccess { #[serde(rename = "clientToken")] diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 53b5551a..ed774ecb 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -2,19 +2,14 @@ use super::{ background_heal_ops::HealTask, data_scanner::HEAL_DELETE_DANGLING, error::ERR_SKIP_FILE, - heal_commands::{ - HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA, - }, + heal_commands::{HealOpts, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker, HEAL_ITEM_BUCKET_METADATA}, }; use crate::store_api::StorageAPI; use crate::{ config::common::CONFIG_PREFIX, disk::RUSTFS_META_BUCKET, global::GLOBAL_BackgroundHealRoutine, - heal::{ - error::ERR_HEAL_STOP_SIGNALLED, - heal_commands::{HealDriveInfo, DRIVE_STATE_OK}, - }, + heal::{error::ERR_HEAL_STOP_SIGNALLED, heal_commands::DRIVE_STATE_OK}, }; use crate::{ disk::{endpoint::Endpoint, MetaCacheEntry}, @@ -32,6 +27,7 @@ use crate::{ use chrono::Utc; use futures::join; use lazy_static::lazy_static; +use madmin::heal_commands::{HealDriveInfo, HealItemType, HealResultItem}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 8aa575a3..0043ad04 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -18,7 +18,7 @@ pub mod metacache; pub mod metrics_realtime; pub mod notification_sys; pub mod peer; -mod peer_rest_client; +pub mod peer_rest_client; pub mod pools; mod quorum; pub mod set_disk; diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index c0735900..aaf6d4d1 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -25,14 +25,14 @@ pub fn get_global_notification_sys() -> Option<&'static NotificationSys> { } pub struct NotificationSys { - peer_clients: Vec>, + pub peer_clients: Vec>, #[allow(dead_code)] - all_peer_clients: Vec>, + pub all_peer_clients: Vec>, } impl NotificationSys { pub async fn new(eps: EndpointServerPools) -> Self { - let (peer_clients, all_peer_clients) = PeerRestClient::new_clients(eps).await; + let (peer_clients, all_peer_clients) = PeerRestClient::new_clients(&eps).await; Self { peer_clients, all_peer_clients, @@ -46,9 +46,7 @@ pub struct NotificationPeerErr { } impl NotificationSys { - pub fn rest_client_from_hash(&self, s:&str) ->Option{ - - + pub fn rest_client_from_hash(&self, s: &str) -> Option { None } pub async fn delete_policy(&self) -> Vec { diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index a5217ba9..e9d02c60 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::future::join_all; +use madmin::heal_commands::{HealDriveInfo, HealResultItem}; use protos::node_service_time_out_client; use protos::proto_gen::node_service::{ DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest, @@ -14,8 +15,7 @@ use crate::disk::error::is_all_buckets_not_found; use crate::disk::{DiskAPI, DiskStore}; use crate::global::GLOBAL_LOCAL_DISK_MAP; use crate::heal::heal_commands::{ - HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, - HEAL_ITEM_BUCKET, + HealOpts, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_ITEM_BUCKET, }; use crate::heal::heal_ops::RUESTFS_RESERVED_BUCKET; use crate::quorum::{bucket_op_ignored_errs, reduce_write_quorum_errs}; diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index 76aaee69..f6792575 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -35,7 +35,7 @@ pub const PEER_RESTSIGNAL: &str = "signal"; pub const PEER_RESTSUB_SYS: &str = "sub-sys"; pub const PEER_RESTDRY_RUN: &str = "dry-run"; -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] pub struct PeerRestClient { pub host: XHost, pub grid_host: String, diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 4a6b430e..d938671d 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -30,8 +30,8 @@ use crate::{ data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT}, data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo}, heal_commands::{ - HealDriveInfo, HealOpts, HealResultItem, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, - DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN, + HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, + DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN, }, heal_ops::BG_HEALING_UUID, }, @@ -67,6 +67,7 @@ use lock::{ namespace_lock::{new_nslock, NsLockMap}, LockApi, }; +use madmin::heal_commands::{HealDriveInfo, HealResultItem}; use rand::{ thread_rng, {seq::SliceRandom, Rng}, @@ -2816,7 +2817,7 @@ impl SetDisks { }); // Calc usage let before = cache.info.last_update; - let cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode).await { + let cache = match disk.clone().ns_scanner(&cache, tx, heal_scan_mode, None).await { Ok(cache) => cache, Err(_) => { if cache.info.last_update > before { diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 708805a9..50561057 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -5,6 +5,7 @@ use common::globals::GLOBAL_Local_Node_Name; use futures::future::join_all; use http::HeaderMap; use lock::{namespace_lock::NsLockMap, new_lock_api, LockApi}; +use madmin::heal_commands::{HealDriveInfo, HealResultItem}; use tokio::sync::RwLock; use uuid::Uuid; @@ -18,8 +19,7 @@ use crate::{ error::{Error, Result}, global::{is_dist_erasure, GLOBAL_LOCAL_DISK_SET_DRIVES}, heal::heal_commands::{ - HealDriveInfo, HealOpts, HealResultItem, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, - HEAL_ITEM_METADATA, + HealOpts, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_ITEM_METADATA, }, set_disk::SetDisks, store_api::{ diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index d886df61..86dda946 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -12,7 +12,7 @@ use crate::global::{ }; use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT}; use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo}; -use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA}; +use crate::heal::heal_commands::{HealOpts, HealScanMode, HEAL_ITEM_METADATA}; use crate::heal::heal_ops::{HealEntryFn, HealSequence}; use crate::new_object_layer_fn; use crate::notification_sys::get_global_notification_sys; @@ -45,6 +45,7 @@ use futures::future::join_all; use glob::Pattern; use http::HeaderMap; use lazy_static::lazy_static; +use madmin::heal_commands::HealResultItem; use rand::Rng; use s3s::dto::{BucketVersioningStatus, ObjectLockConfiguration, ObjectLockEnabled, VersioningConfiguration}; use std::cmp::Ordering; diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 715d2181..48a860fe 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -2,12 +2,14 @@ use crate::heal::heal_ops::HealSequence; use crate::{ disk::DiskStore, error::{Error, Result}, - heal::heal_commands::{HealOpts, HealResultItem}, + heal::heal_commands::HealOpts, utils::path::decode_dir_object, xhttp, }; use futures::StreamExt; use http::HeaderMap; +use madmin::heal_commands::HealResultItem; +use madmin::info_commands::DiskMetrics; use rmp_serde::Serializer; use s3s::{dto::StreamingBlob, Body}; use serde::{Deserialize, Serialize}; diff --git a/ecstore/src/utils/mod.rs b/ecstore/src/utils/mod.rs index 467d8cdf..65aac72b 100644 --- a/ecstore/src/utils/mod.rs +++ b/ecstore/src/utils/mod.rs @@ -6,6 +6,5 @@ pub mod hash; pub mod net; pub mod os; pub mod path; -pub mod time; pub mod wildcard; pub mod xml; diff --git a/ecstore/src/utils/time.rs b/ecstore/src/utils/time.rs deleted file mode 100644 index 791db2d4..00000000 --- a/ecstore/src/utils/time.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::time::Duration; - -use tracing::info; - -pub fn parse_duration(s: &str) -> Option { - if s.ends_with("ms") { - if let Ok(s) = s.trim_end_matches("ms").parse::() { - return Some(Duration::from_millis(s)); - } - } else if s.ends_with("s") { - if let Ok(s) = s.trim_end_matches('s').parse::() { - return Some(Duration::from_secs(s)); - } - } else if s.ends_with("m") { - if let Ok(s) = s.trim_end_matches('m').parse::() { - return Some(Duration::from_secs(s * 60)); - } - } else if s.ends_with("h") { - if let Ok(s) = s.trim_end_matches('h').parse::() { - return Some(Duration::from_secs(s * 60 * 60)); - } - } - info!("can not parse duration, s: {}", s); - None -} - -#[cfg(test)] -mod test { - use std::time::Duration; - - use super::parse_duration; - - #[test] - fn test_parse_dur() { - let s = String::from("3s"); - let dur = parse_duration(&s); - println!("{:?}", dur); - assert_eq!(Some(Duration::from_secs(3)), dur); - - let s = String::from("3ms"); - let dur = parse_duration(&s); - println!("{:?}", dur); - assert_eq!(Some(Duration::from_millis(3)), dur); - - let s = String::from("3m"); - let dur = parse_duration(&s); - println!("{:?}", dur); - assert_eq!(Some(Duration::from_secs(3 * 60)), dur); - - let s = String::from("3h"); - let dur = parse_duration(&s); - println!("{:?}", dur); - assert_eq!(Some(Duration::from_secs(3 * 60 * 60)), dur); - } -} diff --git a/madmin/Cargo.toml b/madmin/Cargo.toml index 9893a41d..06c1a835 100644 --- a/madmin/Cargo.toml +++ b/madmin/Cargo.toml @@ -12,6 +12,9 @@ workspace = true [dependencies] chrono.workspace = true common.workspace = true +humantime.workspace = true +hyper.workspace = true psutil = "3.3.0" serde.workspace = true time.workspace =true +tracing.workspace = truetime.workspace =true diff --git a/madmin/src/heal_commands.rs b/madmin/src/heal_commands.rs new file mode 100644 index 00000000..eec724ac --- /dev/null +++ b/madmin/src/heal_commands.rs @@ -0,0 +1,46 @@ +use serde::{Deserialize, Serialize}; + +pub type HealItemType = String; + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct HealDriveInfo { + pub uuid: String, + pub endpoint: String, + pub state: String, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct Infos { + #[serde(rename = "drives")] + pub drives: Vec, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct HealResultItem { + #[serde(rename = "resultId")] + pub result_index: usize, + #[serde(rename = "type")] + pub heal_item_type: HealItemType, + #[serde(rename = "bucket")] + pub bucket: String, + #[serde(rename = "object")] + pub object: String, + #[serde(rename = "versionId")] + pub version_id: String, + #[serde(rename = "detail")] + pub detail: String, + #[serde(rename = "parityBlocks")] + pub parity_blocks: usize, + #[serde(rename = "dataBlocks")] + pub data_blocks: usize, + #[serde(rename = "diskCount")] + pub disk_count: usize, + #[serde(rename = "setCount")] + pub set_count: usize, + #[serde(rename = "before")] + pub before: Infos, + #[serde(rename = "after")] + pub after: Infos, + #[serde(rename = "objectSize")] + pub object_size: usize, +} diff --git a/madmin/src/lib.rs b/madmin/src/lib.rs index fd416c42..4f8f29ed 100644 --- a/madmin/src/lib.rs +++ b/madmin/src/lib.rs @@ -1,6 +1,10 @@ +pub mod heal_commands; pub mod health; pub mod info_commands; pub mod metrics; pub mod net; +pub mod service_commands; +pub mod trace; +pub mod utils; pub use info_commands::*; diff --git a/madmin/src/service_commands.rs b/madmin/src/service_commands.rs new file mode 100644 index 00000000..9db8b57d --- /dev/null +++ b/madmin/src/service_commands.rs @@ -0,0 +1,103 @@ +use std::{collections::HashMap, time::Duration}; + +use hyper::Uri; + +use crate::{trace::TraceType, utils::parse_duration}; + +#[derive(Debug, Default)] +pub struct ServiceTraceOpts { + s3: bool, + internal: bool, + storage: bool, + os: bool, + scanner: bool, + decommission: bool, + healing: bool, + batch_replication: bool, + batch_key_rotation: bool, + batch_expire: bool, + batch_all: bool, + rebalance: bool, + replication_resync: bool, + bootstrap: bool, + ftp: bool, + ilm: bool, + only_errors: bool, + threshold: Duration, +} + +impl ServiceTraceOpts { + fn trace_types(&self) -> TraceType { + let mut tt = TraceType::default(); + tt.set_if(self.s3, &TraceType::S3); + tt.set_if(self.internal, &TraceType::INTERNAL); + tt.set_if(self.storage, &TraceType::STORAGE); + tt.set_if(self.os, &TraceType::OS); + tt.set_if(self.scanner, &TraceType::SCANNER); + tt.set_if(self.decommission, &TraceType::DECOMMISSION); + tt.set_if(self.healing, &TraceType::HEALING); + + if self.batch_all { + tt.set_if(true, &TraceType::BATCH_REPLICATION); + tt.set_if(true, &TraceType::BATCH_KEY_ROTATION); + tt.set_if(true, &TraceType::BATCH_EXPIRE); + } else { + tt.set_if(self.batch_replication, &TraceType::BATCH_REPLICATION); + tt.set_if(self.batch_key_rotation, &TraceType::BATCH_KEY_ROTATION); + tt.set_if(self.batch_expire, &TraceType::BATCH_EXPIRE); + } + + tt.set_if(self.rebalance, &TraceType::REBALANCE); + tt.set_if(self.replication_resync, &TraceType::REPLICATION_RESYNC); + tt.set_if(self.bootstrap, &TraceType::BOOTSTRAP); + tt.set_if(self.ftp, &TraceType::FTP); + tt.set_if(self.ilm, &TraceType::ILM); + + tt + } + + pub fn parse_params(&mut self, uri: &Uri) -> Result<(), String> { + let query_pairs: HashMap<_, _> = uri + .query() + .unwrap_or("") + .split('&') + .filter_map(|pair| { + let mut split = pair.split('='); + let key = split.next()?.to_string(); + let value = split.next().map(|v| v.to_string()).unwrap_or_else(|| "false".to_string()); + Some((key, value)) + }) + .collect(); + + self.s3 = query_pairs.get("s3").map_or(false, |v| v == "true"); + self.os = query_pairs.get("os").map_or(false, |v| v == "true"); + self.scanner = query_pairs.get("scanner").map_or(false, |v| v == "true"); + self.decommission = query_pairs.get("decommission").map_or(false, |v| v == "true"); + self.healing = query_pairs.get("healing").map_or(false, |v| v == "true"); + self.batch_replication = query_pairs.get("batch-replication").map_or(false, |v| v == "true"); + self.batch_key_rotation = query_pairs.get("batch-keyrotation").map_or(false, |v| v == "true"); + self.batch_expire = query_pairs.get("batch-expire").map_or(false, |v| v == "true"); + if query_pairs.get("all").map_or(false, |v| v == "true") { + self.s3 = true; + self.internal = true; + self.storage = true; + self.os = true; + } + + self.rebalance = query_pairs.get("rebalance").map_or(false, |v| v == "true"); + self.storage = query_pairs.get("storage").map_or(false, |v| v == "true"); + self.internal = query_pairs.get("internal").map_or(false, |v| v == "true"); + self.only_errors = query_pairs.get("err").map_or(false, |v| v == "true"); + self.replication_resync = query_pairs.get("replication-resync").map_or(false, |v| v == "true"); + self.bootstrap = query_pairs.get("bootstrap").map_or(false, |v| v == "true"); + self.ftp = query_pairs.get("ftp").map_or(false, |v| v == "true"); + self.ilm = query_pairs.get("ilm").map_or(false, |v| v == "true"); + + if let Some(threshold) = query_pairs.get("threshold") { + let duration = parse_duration(threshold)?; + self.threshold = duration; + } + + Ok(()) + } +} diff --git a/madmin/src/trace.rs b/madmin/src/trace.rs new file mode 100644 index 00000000..702362f8 --- /dev/null +++ b/madmin/src/trace.rs @@ -0,0 +1,172 @@ +use std::{collections::HashMap, time::Duration}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::heal_commands::HealResultItem; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct TraceType(u64); + +impl TraceType { + // 定义一些常量 + pub const OS: TraceType = TraceType(1 << 0); + pub const STORAGE: TraceType = TraceType(1 << 1); + pub const S3: TraceType = TraceType(1 << 2); + pub const INTERNAL: TraceType = TraceType(1 << 3); + pub const SCANNER: TraceType = TraceType(1 << 4); + pub const DECOMMISSION: TraceType = TraceType(1 << 5); + pub const HEALING: TraceType = TraceType(1 << 6); + pub const BATCH_REPLICATION: TraceType = TraceType(1 << 7); + pub const BATCH_KEY_ROTATION: TraceType = TraceType(1 << 8); + pub const BATCH_EXPIRE: TraceType = TraceType(1 << 9); + pub const REBALANCE: TraceType = TraceType(1 << 10); + pub const REPLICATION_RESYNC: TraceType = TraceType(1 << 11); + pub const BOOTSTRAP: TraceType = TraceType(1 << 12); + pub const FTP: TraceType = TraceType(1 << 13); + pub const ILM: TraceType = TraceType(1 << 14); + + // MetricsAll must be last. + pub const ALL: TraceType = TraceType((1 << 15) - 1); + + pub fn new(t: u64) -> Self { + Self(t) + } +} + +impl Default for TraceType { + fn default() -> Self { + Self(0) + } +} + +impl TraceType { + pub fn contains(&self, x: &TraceType) -> bool { + (self.0 & x.0) == x.0 + } + + pub fn overlaps(&self, x: &TraceType) -> bool { + (self.0 & x.0) != 0 + } + + pub fn single_type(&self) -> bool { + todo!() + } + + pub fn merge(&mut self, other: &TraceType) { + self.0 = self.0 | other.0 + } + + pub fn set_if(&mut self, b: bool, other: &TraceType) { + if b { + self.0 = self.0 | other.0 + } + } + + pub fn mask(&self) -> u64 { + self.0 + } +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct TraceInfo { + #[serde(rename = "type")] + trace_type: u64, + #[serde(rename = "nodename")] + node_name: String, + #[serde(rename = "funcname")] + func_name: String, + #[serde(rename = "time")] + time: DateTime, + #[serde(rename = "path")] + path: String, + #[serde(rename = "dur")] + duration: Duration, + #[serde(rename = "bytes", skip_serializing_if = "Option::is_none")] + bytes: Option, + #[serde(rename = "msg", skip_serializing_if = "Option::is_none")] + message: Option, + #[serde(rename = "error", skip_serializing_if = "Option::is_none")] + error: Option, + #[serde(rename = "custom", skip_serializing_if = "Option::is_none")] + custom: Option>, + #[serde(rename = "http", skip_serializing_if = "Option::is_none")] + http: Option, + #[serde(rename = "healResult", skip_serializing_if = "Option::is_none")] + heal_result: Option, +} + +impl TraceInfo { + pub fn mask(&self) -> u64 { + TraceType::new(self.trace_type).mask() + } +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct TraceInfoLegacy { + trace_info: TraceInfo, + #[serde(rename = "request")] + req_info: Option, + #[serde(rename = "response")] + resp_info: Option, + #[serde(rename = "stats")] + call_stats: Option, + #[serde(rename = "storageStats")] + storage_stats: Option, + #[serde(rename = "osStats")] + os_stats: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct StorageStats { + path: String, + duration: Duration, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct OSStats { + path: String, + duration: Duration, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct TraceHTTPStats { + req_info: TraceRequestInfo, + resp_info: TraceResponseInfo, + call_stats: TraceCallStats, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct TraceCallStats { + input_bytes: i32, + output_bytes: i32, + latency: Duration, + time_to_first_byte: Duration, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct TraceRequestInfo { + time: DateTime, + proto: String, + method: String, + #[serde(skip_serializing_if = "Option::is_none")] + path: Option, + #[serde(skip_serializing_if = "Option::is_none")] + raw_query: Option, + #[serde(skip_serializing_if = "Option::is_none")] + headers: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + body: Option>, + client: String, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct TraceResponseInfo { + time: DateTime, + #[serde(skip_serializing_if = "Option::is_none")] + headers: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + body: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + status_code: Option, +} diff --git a/madmin/src/utils.rs b/madmin/src/utils.rs new file mode 100644 index 00000000..a25b4f66 --- /dev/null +++ b/madmin/src/utils.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +pub fn parse_duration(s: &str) -> Result { + // Implement your own duration parsing logic here + // For example, you could use the humantime crate or a custom parser + humantime::parse_duration(s).map_err(|e| e.to_string()) +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use super::parse_duration; + + #[test] + fn test_parse_dur() { + let s = String::from("3s"); + let dur = parse_duration(&s); + println!("{:?}", dur); + assert_eq!(Ok(Duration::from_secs(3)), dur); + + let s = String::from("3ms"); + let dur = parse_duration(&s); + println!("{:?}", dur); + assert_eq!(Ok(Duration::from_millis(3)), dur); + + let s = String::from("3m"); + let dur = parse_duration(&s); + println!("{:?}", dur); + assert_eq!(Ok(Duration::from_secs(3 * 60)), dur); + + let s = String::from("3h"); + let dur = parse_duration(&s); + println!("{:?}", dur); + assert_eq!(Ok(Duration::from_secs(3 * 60 * 60)), dur); + } +} diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 14e8fd1c..297fff8f 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -15,13 +15,13 @@ use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::store::is_valid_object_prefix; use ecstore::store_api::StorageAPI; use ecstore::utils::path::path_join; -use ecstore::utils::time::parse_duration; use ecstore::utils::xml; use ecstore::GLOBAL_Endpoints; use futures::{Stream, StreamExt}; use http::Uri; use hyper::StatusCode; use madmin::metrics::RealtimeMetrics; +use madmin::utils::parse_duration; use matchit::Params; use s3s::stream::{ByteStream, DynByteStream}; use s3s::{ @@ -45,6 +45,7 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; pub mod service_account; +pub mod trace; #[derive(Deserialize, Debug, Default)] #[serde(rename_all = "PascalCase", default)] @@ -370,8 +371,8 @@ impl Operation for MetricsHandler { info!("mp: {:?}", mp); let tick = match parse_duration(&mp.tick) { - Some(i) => i, - None => std_Duration::from_secs(1), + Ok(i) => i, + Err(_) => std_Duration::from_secs(1), }; let mut n = mp.n; diff --git a/rustfs/src/admin/handlers/trace.rs b/rustfs/src/admin/handlers/trace.rs new file mode 100644 index 00000000..cf3c1710 --- /dev/null +++ b/rustfs/src/admin/handlers/trace.rs @@ -0,0 +1,37 @@ +use ecstore::{peer_rest_client::PeerRestClient, GLOBAL_Endpoints}; +use http::StatusCode; +use hyper::Uri; +use madmin::service_commands::ServiceTraceOpts; +use matchit::Params; +use s3s::{s3_error, Body, S3Request, S3Response, S3Result}; +use tokio::sync::mpsc; +use tracing::warn; + +use crate::admin::router::Operation; + +fn extract_trace_options(uri: &Uri) -> S3Result { + let mut st_opts = ServiceTraceOpts::default(); + st_opts + .parse_params(uri) + .map_err(|_| s3_error!(InvalidRequest, "invalid params"))?; + + Ok(st_opts) +} + +pub struct Trace {} + +#[async_trait::async_trait] +impl Operation for Trace { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle Trace"); + + let trace_opts = extract_trace_options(&req.uri)?; + + // let (tx, rx) = mpsc::channel(10000); + let perrs = match GLOBAL_Endpoints.get() { + Some(ep) => PeerRestClient::new_clients(ep).await, + None => (Vec::new(), Vec::new()), + }; + return Err(s3_error!(NotImplemented)); + } +} diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 16c7c588..c529ac8c 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -1349,7 +1349,7 @@ impl Node for NodeService { } } }); - let data_usage_cache = disk.ns_scanner(&cache, updates_tx, request.scan_mode as usize).await; + let data_usage_cache = disk.ns_scanner(&cache, updates_tx, request.scan_mode as usize, None).await; let _ = task.await; match data_usage_cache { Ok(data_usage_cache) => {