From c98e8496212fcfc3bac15ba76861e9d3858f4cec Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 3 Dec 2024 13:37:22 +0800 Subject: [PATCH 1/6] fix clippy --- ecstore/src/bitrot.rs | 13 +++++++------ ecstore/src/file_meta.rs | 27 +++++++++++++++++---------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index 738fed60..e26d039b 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -169,6 +169,7 @@ pub async fn new_bitrot_writer( pub type BitrotReader = Box; +#[allow(clippy::too_many_arguments)] pub fn new_bitrot_reader( disk: DiskStore, data: &[u8], @@ -691,16 +692,16 @@ mod test { let ep = Endpoint::try_from(temp_dir.as_str())?; let opt = DiskOption::default(); let disk = new_disk(&ep, &opt).await?; - let _ = disk.make_volume(volume).await?; + disk.make_volume(volume).await?; let mut writer = new_bitrot_writer(disk.clone(), "", volume, file_path, 35, algo.clone(), 10).await?; - let _ = writer.write(b"aaaaaaaaaa").await?; - let _ = writer.write(b"aaaaaaaaaa").await?; - let _ = writer.write(b"aaaaaaaaaa").await?; - let _ = writer.write(b"aaaaa").await?; + writer.write(b"aaaaaaaaaa").await?; + writer.write(b"aaaaaaaaaa").await?; + writer.write(b"aaaaaaaaaa").await?; + writer.write(b"aaaaa").await?; let sum = bitrot_writer_sum(&writer); - let _ = writer.close().await?; + writer.close().await?; let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10); let read_len = 10; diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index 3a774289..d506e3ef 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -824,7 +824,7 @@ impl TryFrom for FileMetaVersion { } } -#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Ord, Hash)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)] pub struct FileMetaVersionHeader { pub version_id: Option, pub mod_time: Option, @@ -974,26 +974,33 @@ impl FileMetaVersionHeader { impl PartialOrd for FileMetaVersionHeader { fn partial_cmp(&self, other: &Self) -> Option { - match self.mod_time.partial_cmp(&other.mod_time) { - Some(core::cmp::Ordering::Equal) => {} + Some(self.cmp(other)) + } +} + +impl Ord for FileMetaVersionHeader { + fn cmp(&self, other: &Self) -> Ordering { + match self.mod_time.cmp(&other.mod_time) { + core::cmp::Ordering::Equal => {} ord => return ord, } - match self.version_type.partial_cmp(&other.version_type) { - Some(core::cmp::Ordering::Equal) => {} + match self.version_type.cmp(&other.version_type) { + core::cmp::Ordering::Equal => {} ord => return ord, } - match self.signature.partial_cmp(&other.signature) { - Some(core::cmp::Ordering::Equal) => {} + match self.signature.cmp(&other.signature) { + core::cmp::Ordering::Equal => {} ord => return ord, } - match self.version_id.partial_cmp(&other.version_id) { - Some(core::cmp::Ordering::Equal) => {} + match self.version_id.cmp(&other.version_id) { + core::cmp::Ordering::Equal => {} ord => return ord, } - self.flags.partial_cmp(&other.flags) + self.flags.cmp(&other.flags) } } + impl From for FileMetaVersionHeader { fn from(value: FileMetaVersion) -> Self { let flags = { From eb12c9b00534d33fe6487ebfc7cddec12be5e852 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 3 Dec 2024 17:38:52 +0800 Subject: [PATCH 2/6] test StorageInfoHandler --- Cargo.lock | 2 + crypto/src/jwt/tests.rs | 9 +- e2e_test/Cargo.toml | 3 +- e2e_test/src/reliant/node_interact_test.rs | 4 +- ecstore/src/admin_server_info.rs | 9 +- ecstore/src/config/common.rs | 4 +- ecstore/src/disk/local.rs | 2 +- ecstore/src/endpoints.rs | 52 ++++++-- ecstore/src/global.rs | 8 ++ ecstore/src/heal/heal_commands.rs | 37 +----- ecstore/src/heal/heal_ops.rs | 5 +- ecstore/src/notification_sys.rs | 66 +++++++++- ecstore/src/peer_rest_client.rs | 104 +++++++++------- ecstore/src/pools.rs | 6 +- ecstore/src/set_disk.rs | 29 +++-- ecstore/src/sets.rs | 14 +-- ecstore/src/store.rs | 28 +++-- ecstore/src/store_api.rs | 86 +------------ ecstore/src/utils/net.rs | 30 +++-- madmin/Cargo.toml | 1 + madmin/src/info_commands.rs | 136 ++++++++++++++++++++- madmin/src/lib.rs | 2 + rustfs/src/admin/handlers.rs | 30 +++-- rustfs/src/main.rs | 10 +- 24 files changed, 420 insertions(+), 257 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18efd2c8..624ea269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -764,6 +764,7 @@ dependencies = [ "flatbuffers", "lazy_static", "lock", + "madmin", "protos", "rmp-serde", "serde", @@ -1677,6 +1678,7 @@ dependencies = [ "common", "psutil", "serde", + "time", ] [[package]] diff --git a/crypto/src/jwt/tests.rs b/crypto/src/jwt/tests.rs index abfe0020..627e8802 100644 --- a/crypto/src/jwt/tests.rs +++ b/crypto/src/jwt/tests.rs @@ -10,7 +10,10 @@ fn test() { "bbb": "bbb" }); - let jwt_token = encode(b"aaaa", &claims).unwrap(); - let new_claims = decode(&jwt_token, b"aaaa").unwrap(); - assert_eq!(new_claims.claims, claims); + let jwt_token = encode(b"aaaa", &claims).unwrap_or_default(); + let new_claims = match decode(&jwt_token, b"aaaa") { + Ok(res) => Some(res.claims), + Err(_errr) => None, + }; + assert_eq!(new_claims, Some(claims)); } diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 771f97e8..6700f01b 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -20,4 +20,5 @@ serde_json.workspace = true tonic = { version = "0.12.3", features = ["gzip"] } tokio = { workspace = true } tower.workspace = true -url.workspace = true \ No newline at end of file +url.workspace = true +madmin.workspace =true \ No newline at end of file diff --git a/e2e_test/src/reliant/node_interact_test.rs b/e2e_test/src/reliant/node_interact_test.rs index db456ad2..16b333a3 100644 --- a/e2e_test/src/reliant/node_interact_test.rs +++ b/e2e_test/src/reliant/node_interact_test.rs @@ -1,6 +1,6 @@ #![cfg(test)] -use ecstore::{disk::VolumeInfo, store_api::StorageInfo}; +use ecstore::disk::VolumeInfo; use protos::{ models::{PingBody, PingBodyBuilder}, node_service_time_out_client, @@ -118,7 +118,7 @@ async fn storage_info() -> Result<(), Box> { let info = response.storage_info; let mut buf = Deserializer::new(Cursor::new(info)); - let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); + let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); println!("{:?}", storage_info); Ok(()) } diff --git a/ecstore/src/admin_server_info.rs b/ecstore/src/admin_server_info.rs index bee9d996..6ecadff2 100644 --- a/ecstore/src/admin_server_info.rs +++ b/ecstore/src/admin_server_info.rs @@ -15,12 +15,7 @@ use protos::{ use serde::{Deserialize, Serialize}; use tonic::Request; -use crate::{ - disk::endpoint::Endpoint, - global::GLOBAL_Endpoints, - new_object_layer_fn, - store_api::{StorageAPI, StorageDisk}, -}; +use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI}; pub const ITEM_OFFLINE: &str = "offline"; pub const ITEM_INITIALIZING: &str = "initializing"; @@ -44,7 +39,7 @@ pub struct ServerProperties { pub version: String, pub commit_id: String, pub network: HashMap, - pub disks: Vec, + pub disks: Vec, pub pool_number: i32, pub pool_numbers: Vec, pub mem_stats: MemStats, diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs index 6a47e489..f25f557c 100644 --- a/ecstore/src/config/common.rs +++ b/ecstore/src/config/common.rs @@ -105,9 +105,9 @@ pub async fn read_config_without_migrate(api: Arc) -> Result res, Err(err) => { if is_not_found(&err) { - warn!("config not found init start"); + warn!("config not found, start to init"); let cfg = new_and_save_server_config(api).await?; - warn!("config not found init done"); + warn!("config init done"); return Ok(cfg); } else { error!("read config err {:?}", &err); diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index c85db05f..b53a147b 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1291,7 +1291,7 @@ impl DiskAPI for LocalDisk { Ok(res) => res, Err(e) => { if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) { - error!("list_dir err {:?}", &e); + info!("list_dir err {:?}", &e); } if opts.report_notfound && is_err_file_not_found(&e) { diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 73dd58d7..334c74d6 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -1,9 +1,11 @@ +use tracing::warn; + use crate::{ disk::endpoint::{Endpoint, EndpointType}, disks_layout::DisksLayout, error::{Error, Result}, global::global_rustfs_port, - utils::net, + utils::net::{self, XHost}, }; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -530,20 +532,30 @@ impl EndpointServerPools { nodes } - pub fn hosts_sorted(&self) -> Vec<()> { + pub fn hosts_sorted(&self) -> Vec> { let (mut peers, local) = self.peers(); + let mut ret = vec![None; peers.len()]; + peers.sort(); - for peer in peers.iter() { + for (i, peer) in peers.iter().enumerate() { if &local == peer { continue; } - // FIXME:TODO + let host = match XHost::try_from(peer.clone()) { + Ok(res) => res, + Err(err) => { + warn!("Xhost parse failed {:?}", err); + continue; + } + }; + + ret[i] = Some(host); } - unimplemented!() + ret } pub fn peers(&self) -> (Vec, String) { let mut local = None; @@ -554,12 +566,8 @@ impl EndpointServerPools { continue; } let host = endpoint.host_port(); - if endpoint.is_local { - if endpoint.url.port() == Some(global_rustfs_port()) { - if local.is_none() { - local = Some(host.clone()); - } - } + if endpoint.is_local && endpoint.url.port() == Some(global_rustfs_port()) && local.is_none() { + local = Some(host.clone()); } set.insert(host); @@ -570,6 +578,28 @@ impl EndpointServerPools { (hosts, local.unwrap_or_default()) } + + pub fn find_grid_hosts_from_peer(&self, host: &XHost) -> Option { + for ep in self.0.iter() { + for endpoint in ep.endpoints.0.iter() { + if endpoint.is_local { + continue; + } + let xhost = match XHost::try_from(endpoint.host_port()) { + Ok(res) => res, + Err(_) => { + continue; + } + }; + + if xhost.to_string() == host.to_string() { + return Some(endpoint.grid_host()); + } + } + } + + None + } } #[cfg(test)] diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index 63e07edb..81f1a0d3 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -65,6 +65,14 @@ pub fn set_global_endpoints(eps: Vec) { .expect("GLOBAL_Endpoints set faild") } +pub fn get_global_endpoints() -> EndpointServerPools { + if let Some(eps) = GLOBAL_Endpoints.get() { + eps.clone() + } else { + EndpointServerPools::default() + } +} + pub fn new_object_layer_fn() -> Option> { GLOBAL_OBJECT_API.get().map(|ec| ec.clone()) } diff --git a/ecstore/src/heal/heal_commands.rs b/ecstore/src/heal/heal_commands.rs index 2943f1e5..0ce85910 100644 --- a/ecstore/src/heal/heal_commands.rs +++ b/ecstore/src/heal/heal_commands.rs @@ -17,7 +17,7 @@ use crate::{ global::GLOBAL_BackgroundHealState, heal::heal_ops::HEALING_TRACKER_FILENAME, new_object_layer_fn, - store_api::{BucketInfo, StorageAPI, StorageDisk}, + store_api::{BucketInfo, StorageAPI}, utils::fs::read_file, }; @@ -131,35 +131,6 @@ impl Default for HealStartSuccess { pub type HealStopSuccess = HealStartSuccess; -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct HealingDisk { - pub id: String, - pub heal_id: String, - pub pool_index: Option, - pub set_index: Option, - pub disk_index: Option, - pub endpoint: String, - pub path: String, - pub started: Option, - pub last_update: Option, - pub retry_attempts: u64, - pub objects_total_count: u64, - pub objects_total_size: u64, - pub items_healed: u64, - pub items_failed: u64, - pub item_skipped: u64, - pub bytes_done: u64, - pub bytes_failed: u64, - pub bytes_skipped: u64, - pub objects_healed: u64, - pub objects_failed: u64, - pub bucket: String, - pub object: String, - pub queue_buckets: Vec, - pub healed_buckets: Vec, - pub finished: bool, -} - #[derive(Debug, Default, Deserialize, Serialize)] pub struct HealingTracker { #[serde(skip_serializing, skip_deserializing)] @@ -375,10 +346,10 @@ impl HealingTracker { }); } - pub async fn to_healing_disk(&self) -> HealingDisk { + pub async fn to_healing_disk(&self) -> madmin::HealingDisk { let _ = self.mu.read().await; - HealingDisk { + madmin::HealingDisk { id: self.id.clone(), heal_id: self.heal_id.clone(), pool_index: self.pool_index, @@ -516,7 +487,7 @@ pub struct SetStatus { pub heal_status: String, pub heal_priority: String, pub total_objects: usize, - pub disks: Vec, + pub disks: Vec, } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 73d7f82f..53b5551a 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -3,8 +3,7 @@ use super::{ data_scanner::HEAL_DELETE_DANGLING, error::ERR_SKIP_FILE, heal_commands::{ - HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker, - HEAL_ITEM_BUCKET_METADATA, + HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA, }, }; use crate::store_api::StorageAPI; @@ -664,7 +663,7 @@ impl AllHealState { self.heal_status.write().await.insert(tracker.id.clone(), tracker.clone()); } - pub async fn get_local_healing_disks(&self) -> HashMap { + pub async fn get_local_healing_disks(&self) -> HashMap { let _ = self.mu.read().await; let mut dst = HashMap::new(); diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index 4ea46a1d..359e4c3f 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -1,8 +1,11 @@ -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use crate::endpoints::EndpointServerPools; use crate::error::{Error, Result}; +use crate::global::get_global_endpoints; use crate::peer_rest_client::PeerRestClient; +use crate::StorageAPI; +use futures::future::join_all; use lazy_static::lazy_static; lazy_static! { @@ -16,9 +19,13 @@ pub async fn new_global_notification_sys(eps: EndpointServerPools) -> Result<()> Ok(()) } +pub fn get_global_notification_sys() -> Option<&'static NotificationSys> { + GLOBAL_NotificationSys.get() +} + pub struct NotificationSys { - pub peer_clients: Vec, - pub all_peer_clients: Vec, + peer_clients: Vec>, + all_peer_clients: Vec>, } impl NotificationSys { @@ -50,4 +57,57 @@ impl NotificationSys { pub async fn delete_user(&self) -> Vec { unimplemented!() } + + pub async fn storage_info(&self, api: &S) -> madmin::StorageInfo { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + + for client in self.peer_clients.iter() { + futures.push(async move { + if let Some(client) = client { + match client.local_storage_info().await { + Ok(info) => Some(info), + Err(_) => Some(madmin::StorageInfo { + disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()), + ..Default::default() + }), + } + } else { + None + } + }); + } + + let mut replies = join_all(futures).await; + + replies.push(Some(api.local_storage_info().await)); + + let mut disks = Vec::new(); + for info in replies.into_iter().flatten() { + disks.extend(info.disks); + } + + let backend = api.backend_info().await; + madmin::StorageInfo { disks, backend } + } +} + +fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec { + let mut offline_disks = Vec::new(); + + for pool in endpoints.as_ref() { + for ep in pool.endpoints.as_ref() { + if (offline_host.is_empty() && ep.is_local) || offline_host == ep.host_port() { + offline_disks.push(madmin::Disk { + endpoint: ep.to_string(), + state: madmin::ItemState::Offline.to_string().to_owned(), + pool_index: ep.pool_idx, + set_index: ep.set_idx, + disk_index: ep.disk_idx, + ..Default::default() + }); + } + } + } + + offline_disks } diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index d60b537d..76aaee69 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -6,7 +6,7 @@ use crate::{ global::is_dist_erasure, heal::heal_commands::BgHealState, metrics_realtime::{CollectMetricsOpts, MetricType}, - store_api::StorageInfo, + utils::net::XHost, }; use common::error::{Error, Result}; use madmin::{ @@ -29,35 +29,53 @@ use protos::{ use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize as _}; use tonic::Request; +use tracing::warn; pub const PEER_RESTSIGNAL: &str = "signal"; pub const PEER_RESTSUB_SYS: &str = "sub-sys"; pub const PEER_RESTDRY_RUN: &str = "dry-run"; +#[derive(Debug, Clone)] pub struct PeerRestClient { - addr: String, + pub host: XHost, + pub grid_host: String, } impl PeerRestClient { - pub fn new(url: url::Url) -> Self { - Self { - addr: format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()), - } + pub fn new(host: XHost, grid_host: String) -> Self { + Self { host, grid_host } } - pub async fn new_clients(_eps: EndpointServerPools) -> (Vec, Vec) { + pub async fn new_clients(eps: EndpointServerPools) -> (Vec>, Vec>) { if !is_dist_erasure().await { return (Vec::new(), Vec::new()); } - // FIXME:TODO + let eps = eps.clone(); + let hosts = eps.hosts_sorted(); + let mut remote = vec![None; hosts.len()]; + let mut all = Vec::with_capacity(hosts.len()); + for (i, hs_host) in hosts.iter().enumerate() { + if let Some(host) = hs_host { + if let Some(grid_host) = eps.find_grid_hosts_from_peer(host) { + let client = PeerRestClient::new(host.clone(), grid_host); - todo!() + all[i] = Some(client.clone()); + remote.push(Some(client)); + } + } + } + + if all.len() != remote.len() + 1 { + warn!("Expected number of all hosts ({}) to be remote +1 ({})", all.len(), remote.len()); + } + + (remote, all) } } impl PeerRestClient { - pub async fn local_storage_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + pub async fn local_storage_info(&self) -> Result { + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LocalStorageInfoRequest { metrics: true }); @@ -72,13 +90,13 @@ impl PeerRestClient { let data = response.storage_info; let mut buf = Deserializer::new(Cursor::new(data)); - let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); + let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); Ok(storage_info) } pub async fn server_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ServerInfoRequest { metrics: true }); @@ -99,7 +117,7 @@ impl PeerRestClient { } pub async fn get_cpus(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetCpusRequest {}); @@ -120,7 +138,7 @@ impl PeerRestClient { } pub async fn get_net_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetNetInfoRequest {}); @@ -141,7 +159,7 @@ impl PeerRestClient { } pub async fn get_partitions(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetPartitionsRequest {}); @@ -162,7 +180,7 @@ impl PeerRestClient { } pub async fn get_os_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetOsInfoRequest {}); @@ -183,7 +201,7 @@ impl PeerRestClient { } pub async fn get_se_linux_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSeLinuxInfoRequest {}); @@ -204,7 +222,7 @@ impl PeerRestClient { } pub async fn get_sys_config(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSysConfigRequest {}); @@ -225,7 +243,7 @@ impl PeerRestClient { } pub async fn get_sys_errors(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSysErrorsRequest {}); @@ -246,7 +264,7 @@ impl PeerRestClient { } pub async fn get_mem_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetMemInfoRequest {}); @@ -267,7 +285,7 @@ impl PeerRestClient { } pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let mut buf_t = Vec::new(); @@ -295,7 +313,7 @@ impl PeerRestClient { } pub async fn get_proc_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetProcInfoRequest {}); @@ -316,7 +334,7 @@ impl PeerRestClient { } pub async fn start_profiling(&self, profiler: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(StartProfilingRequest { @@ -350,7 +368,7 @@ impl PeerRestClient { } pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadBucketMetadataRequest { @@ -368,7 +386,7 @@ impl PeerRestClient { } pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteBucketMetadataRequest { @@ -386,7 +404,7 @@ impl PeerRestClient { } pub async fn delete_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeletePolicyRequest { @@ -404,7 +422,7 @@ impl PeerRestClient { } pub async fn load_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadPolicyRequest { @@ -422,7 +440,7 @@ impl PeerRestClient { } pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadPolicyMappingRequest { @@ -442,7 +460,7 @@ impl PeerRestClient { } pub async fn delete_user(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteUserRequest { @@ -460,7 +478,7 @@ impl PeerRestClient { } pub async fn delete_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteServiceAccountRequest { @@ -478,7 +496,7 @@ impl PeerRestClient { } pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadUserRequest { @@ -497,7 +515,7 @@ impl PeerRestClient { } pub async fn load_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadServiceAccountRequest { @@ -515,7 +533,7 @@ impl PeerRestClient { } pub async fn load_group(&self, group: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadGroupRequest { @@ -533,7 +551,7 @@ impl PeerRestClient { } pub async fn reload_site_replication_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ReloadSiteReplicationConfigRequest {}); @@ -549,7 +567,7 @@ impl PeerRestClient { } pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let mut vars = HashMap::new(); @@ -571,7 +589,7 @@ impl PeerRestClient { } pub async fn background_heal_status(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(BackgroundHealStatusRequest {}); @@ -592,21 +610,21 @@ impl PeerRestClient { } pub async fn get_metacache_listing(&self) -> Result<()> { - let mut _client = node_service_time_out_client(&self.addr) + let mut _client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; todo!() } pub async fn update_metacache_listing(&self) -> Result<()> { - let mut _client = node_service_time_out_client(&self.addr) + let mut _client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; todo!() } pub async fn reload_pool_meta(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ReloadPoolMetaRequest {}); @@ -623,7 +641,7 @@ impl PeerRestClient { } pub async fn stop_rebalance(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(StopRebalanceRequest {}); @@ -640,7 +658,7 @@ impl PeerRestClient { } pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadRebalanceMetaRequest { start_rebalance }); @@ -657,7 +675,7 @@ impl PeerRestClient { } pub async fn load_transition_tier_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadTransitionTierConfigRequest {}); diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index 8712308e..d355141a 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -5,7 +5,7 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; use crate::error::{Error, Result}; use crate::heal::heal_commands::HealOpts; use crate::new_object_layer_fn; -use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI, StorageDisk, StorageInfo}; +use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI}; use crate::store_err::{is_err_bucket_exists, StorageError}; use crate::utils::path::{path_join, SLASH_SEPARATOR}; use crate::{sets::Sets, store::ECStore}; @@ -670,7 +670,7 @@ impl ECStore { } } -fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize { +fn get_total_usable_capacity(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize { let mut capacity = 0; for disk in disks.iter() { if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { @@ -683,7 +683,7 @@ fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize capacity } -fn get_total_usable_capacity_free(disks: &[StorageDisk], info: &StorageInfo) -> usize { +fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize { let mut capacity = 0; for disk in disks.iter() { if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 08df4853..a320c97e 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -38,10 +38,9 @@ use crate::{ }, quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError}, store_api::{ - BackendByte, BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, - GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, - MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, - RawFileInfo, StorageAPI, StorageDisk, StorageInfo, DEFAULT_BITROT_ALGO, + BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec, + ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, ObjectIO, ObjectInfo, + ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, RawFileInfo, StorageAPI, DEFAULT_BITROT_ALGO, }, store_err::{to_object_err, StorageError}, store_init::{load_format_erasure, ErasureError}, @@ -3590,15 +3589,15 @@ impl ObjectIO for SetDisks { #[async_trait::async_trait] impl StorageAPI for SetDisks { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { unimplemented!() } - async fn storage_info(&self) -> StorageInfo { + async fn storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; get_storage_info(&disks, &self.set_endpoints).await } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; let mut local_disks: Vec>> = Vec::new(); @@ -4941,13 +4940,13 @@ pub fn should_heal_object_on_disk( (false, err.clone()) } -async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { +async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { let mut ret = Vec::new(); for (i, pool) in disks.iter().enumerate() { if let Some(disk) = pool { match disk.disk_info(&DiskInfoOptions::default()).await { - Ok(res) => ret.push(StorageDisk { + Ok(res) => ret.push(madmin::Disk { endpoint: eps[i].to_string(), local: eps[i].is_local, pool_index: eps[i].pool_idx, @@ -4978,7 +4977,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec ret.push(StorageDisk { + Err(err) => ret.push(madmin::Disk { state: err.to_string(), endpoint: eps[i].to_string(), local: eps[i].is_local, @@ -4989,7 +4988,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec], eps: &[Endpoint]) -> Vec>, eps: &Vec) -> StorageInfo { +async fn get_storage_info(disks: &Vec>, eps: &Vec) -> madmin::StorageInfo { let mut disks = get_disks_info(disks, eps).await; disks.sort_by(|a, b| a.total_space.cmp(&b.total_space)); - StorageInfo { + madmin::StorageInfo { disks, - backend: BackendInfo { - backend_type: BackendByte::Erasure, + backend: madmin::BackendInfo { + backend_type: madmin::BackendByte::Erasure, ..Default::default() }, } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 50537374..000fe458 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -23,9 +23,9 @@ use crate::{ }, set_disk::SetDisks, store_api::{ - BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, + BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, - ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, StorageInfo, + ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, }, store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file}, utils::hash, @@ -294,10 +294,10 @@ impl ObjectIO for Sets { #[async_trait::async_trait] impl StorageAPI for Sets { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { unimplemented!() } - async fn storage_info(&self) -> StorageInfo { + async fn storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.disk_set.len()); for set in self.disk_set.iter() { @@ -312,12 +312,12 @@ impl StorageAPI for Sets { disks.extend_from_slice(&res.disks); } - StorageInfo { + madmin::StorageInfo { disks, ..Default::default() } } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.disk_set.len()); for set in self.disk_set.iter() { @@ -331,7 +331,7 @@ impl StorageAPI for Sets { for res in results.into_iter() { disks.extend_from_slice(&res.disks); } - StorageInfo { + madmin::StorageInfo { disks, ..Default::default() } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 2dcc5dc6..148837f5 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -15,10 +15,9 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo}; use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA}; use crate::heal::heal_ops::{HealEntryFn, HealSequence}; use crate::new_object_layer_fn; +use crate::notification_sys::get_global_notification_sys; use crate::pools::PoolMeta; -use crate::store_api::{ - BackendByte, BackendDisks, BackendInfo, ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO, StorageInfo, -}; +use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO}; use crate::store_err::{ is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found, to_object_err, StorageError, @@ -1120,7 +1119,7 @@ lazy_static! { #[async_trait::async_trait] impl StorageAPI for ECStore { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { let (standard_sc_parity, rr_sc_parity) = { if let Some(sc) = GLOBAL_StorageClass.get() { let sc_parity = sc @@ -1151,10 +1150,10 @@ impl StorageAPI for ECStore { drives_per_set.push(*set_count); } - BackendInfo { - backend_type: BackendByte::Erasure, - online_disks: BackendDisks::new(), - offline_disks: BackendDisks::new(), + madmin::BackendInfo { + backend_type: madmin::BackendByte::Erasure, + online_disks: madmin::BackendDisks::new(), + offline_disks: madmin::BackendDisks::new(), standard_sc_data, standard_sc_parity, rr_sc_data, @@ -1164,11 +1163,14 @@ impl StorageAPI for ECStore { ..Default::default() } } - async fn storage_info(&self) -> StorageInfo { - // FIXME: globalNotificationSys.StorageInfo - unimplemented!() + async fn storage_info(&self) -> madmin::StorageInfo { + let Some(notification_sy) = get_global_notification_sys() else { + return madmin::StorageInfo::default(); + }; + + notification_sy.storage_info(self).await } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.pools.len()); for pool in self.pools.iter() { @@ -1184,7 +1186,7 @@ impl StorageAPI for ECStore { } let backend = self.backend_info().await; - StorageInfo { backend, disks } + madmin::StorageInfo { backend, disks } } async fn list_bucket(&self, opts: &BucketOptions) -> Result> { diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index acd8d1dc..8b34fde3 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,4 +1,3 @@ -use crate::heal::heal_commands::HealingDisk; use crate::heal::heal_ops::HealSequence; use crate::{ disk::DiskStore, @@ -9,7 +8,6 @@ use crate::{ }; use futures::StreamExt; use http::HeaderMap; -use madmin::info_commands::DiskMetrics; use rmp_serde::Serializer; use s3s::{dto::StreamingBlob, Body}; use serde::{Deserialize, Serialize}; @@ -804,84 +802,6 @@ pub struct DeletedObject { // pub replication_state: ReplicationState, } -#[derive(Debug, Default, Serialize, Deserialize)] -pub enum BackendByte { - #[default] - Unknown, - FS, - Erasure, -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct StorageDisk { - pub endpoint: String, - pub root_disk: bool, - pub drive_path: String, - pub healing: bool, - pub scanning: bool, - pub state: String, - pub uuid: String, - pub major: u32, - pub minor: u32, - pub model: Option, - pub total_space: u64, - pub used_space: u64, - pub available_space: u64, - pub read_throughput: f64, - pub write_throughput: f64, - pub read_latency: f64, - pub write_latency: f64, - pub utilization: f64, - pub metrics: Option, - pub heal_info: Option, - pub used_inodes: u64, - pub free_inodes: u64, - pub local: bool, - pub pool_index: i32, - pub set_index: i32, - pub disk_index: i32, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct StorageInfo { - pub disks: Vec, - pub backend: BackendInfo, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct BackendDisks(HashMap); - -impl BackendDisks { - pub fn new() -> Self { - Self(HashMap::new()) - } - pub fn sum(&self) -> usize { - self.0.values().sum() - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase", default)] -pub struct BackendInfo { - pub backend_type: BackendByte, - pub online_disks: BackendDisks, - pub offline_disks: BackendDisks, - #[serde(rename = "StandardSCData")] - pub standard_sc_data: Vec, - #[serde(rename = "StandardSCParities")] - pub standard_sc_parities: Vec, - #[serde(rename = "StandardSCParity")] - pub standard_sc_parity: Option, - #[serde(rename = "RRSCData")] - pub rr_sc_data: Vec, - #[serde(rename = "RRSCParities")] - pub rr_sc_parities: Vec, - #[serde(rename = "RRSCParity")] - pub rr_sc_parity: Option, - pub total_sets: Vec, - pub drives_per_set: Vec, -} - pub struct ListObjectVersionsInfo { pub is_truncated: bool, pub next_marker: String, @@ -911,9 +831,9 @@ pub trait StorageAPI: ObjectIO { // Shutdown TODO: // NSScanner TODO: - async fn backend_info(&self) -> BackendInfo; - async fn storage_info(&self) -> StorageInfo; - async fn local_storage_info(&self) -> StorageInfo; + async fn backend_info(&self) -> madmin::BackendInfo; + async fn storage_info(&self) -> madmin::StorageInfo; + async fn local_storage_info(&self) -> madmin::StorageInfo; async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index 52d6d015..c2de72e1 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -2,6 +2,7 @@ use crate::error::{Error, Result}; use lazy_static::lazy_static; use std::{ collections::HashSet, + fmt::Display, net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs}, }; @@ -105,27 +106,38 @@ pub(crate) fn must_get_local_ips() -> Result> { } } +#[derive(Debug, Clone)] pub struct XHost { pub name: String, pub port: u16, pub is_port_set: bool, } -impl ToString for XHost { - fn to_string(&self) -> String { +impl Display for XHost { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if !self.is_port_set { - self.name.clone() + write!(f, "{}", self.name) + } else if self.name.contains(':') { + write!(f, "[{}]:{}", self.name, self.port) } else { - join_host_port(&self.name, self.port) + write!(f, "{}:{}", self.name, self.port) } } } -fn join_host_port(host: &str, port: u16) -> String { - if host.contains(':') { - format!("[{}]:{}", host, port) - } else { - format!("{}:{}", host, port) +impl TryFrom for XHost { + type Error = std::io::Error; + + fn try_from(value: String) -> std::result::Result { + if let Some(addr) = value.to_socket_addrs()?.next() { + Ok(Self { + name: addr.ip().to_string(), + port: addr.port(), + is_port_set: addr.port() > 0, + }) + } else { + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "value invalid")) + } } } diff --git a/madmin/Cargo.toml b/madmin/Cargo.toml index 77235f0f..83b54093 100644 --- a/madmin/Cargo.toml +++ b/madmin/Cargo.toml @@ -11,3 +11,4 @@ chrono.workspace = true common.workspace = true psutil = "3.3.0" serde.workspace = true +time.workspace =true diff --git a/madmin/src/info_commands.rs b/madmin/src/info_commands.rs index 17f75e18..ec8f0f27 100644 --- a/madmin/src/info_commands.rs +++ b/madmin/src/info_commands.rs @@ -1,9 +1,36 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::SystemTime}; use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; use crate::metrics::TimedAction; +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum ItemState { + Offline, + Initializing, + Online, +} + +impl ItemState { + pub fn to_string(&self) -> &str { + match self { + ItemState::Offline => "offline", + ItemState::Initializing => "initializing", + ItemState::Online => "online", + } + } + + pub fn from_string(s: &str) -> Option { + match s { + "offline" => Some(ItemState::Offline), + "initializing" => Some(ItemState::Initializing), + "online" => Some(ItemState::Online), + _ => None, + } + } +} + #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct DiskMetrics { pub last_minute: HashMap, @@ -14,3 +41,110 @@ pub struct DiskMetrics { pub total_writes: u64, pub total_deletes: u64, } + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct Disk { + pub endpoint: String, + pub root_disk: bool, + pub drive_path: String, + pub healing: bool, + pub scanning: bool, + pub state: String, + pub uuid: String, + pub major: u32, + pub minor: u32, + pub model: Option, + pub total_space: u64, + pub used_space: u64, + pub available_space: u64, + pub read_throughput: f64, + pub write_throughput: f64, + pub read_latency: f64, + pub write_latency: f64, + pub utilization: f64, + pub metrics: Option, + pub heal_info: Option, + pub used_inodes: u64, + pub free_inodes: u64, + pub local: bool, + pub pool_index: i32, + pub set_index: i32, + pub disk_index: i32, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct HealingDisk { + pub id: String, + pub heal_id: String, + pub pool_index: Option, + pub set_index: Option, + pub disk_index: Option, + pub endpoint: String, + pub path: String, + pub started: Option, + pub last_update: Option, + pub retry_attempts: u64, + pub objects_total_count: u64, + pub objects_total_size: u64, + pub items_healed: u64, + pub items_failed: u64, + pub item_skipped: u64, + pub bytes_done: u64, + pub bytes_failed: u64, + pub bytes_skipped: u64, + pub objects_healed: u64, + pub objects_failed: u64, + pub bucket: String, + pub object: String, + pub queue_buckets: Vec, + pub healed_buckets: Vec, + pub finished: bool, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub enum BackendByte { + #[default] + Unknown, + FS, + Erasure, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StorageInfo { + pub disks: Vec, + pub backend: BackendInfo, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct BackendDisks(HashMap); + +impl BackendDisks { + pub fn new() -> Self { + Self(HashMap::new()) + } + pub fn sum(&self) -> usize { + self.0.values().sum() + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase", default)] +pub struct BackendInfo { + pub backend_type: BackendByte, + pub online_disks: BackendDisks, + pub offline_disks: BackendDisks, + #[serde(rename = "StandardSCData")] + pub standard_sc_data: Vec, + #[serde(rename = "StandardSCParities")] + pub standard_sc_parities: Vec, + #[serde(rename = "StandardSCParity")] + pub standard_sc_parity: Option, + #[serde(rename = "RRSCData")] + pub rr_sc_data: Vec, + #[serde(rename = "RRSCParities")] + pub rr_sc_parities: Vec, + #[serde(rename = "RRSCParity")] + pub rr_sc_parity: Option, + pub total_sets: Vec, + pub drives_per_set: Vec, +} diff --git a/madmin/src/lib.rs b/madmin/src/lib.rs index 908eb69d..fd416c42 100644 --- a/madmin/src/lib.rs +++ b/madmin/src/lib.rs @@ -2,3 +2,5 @@ pub mod health; pub mod info_commands; pub mod metrics; pub mod net; + +pub use info_commands::*; diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 1419beae..fea05ef4 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -10,6 +10,7 @@ use ecstore::global::GLOBAL_ALlHealState; use ecstore::heal::heal_commands::HealOpts; use ecstore::heal::heal_ops::new_heal_sequence; use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType}; +use ecstore::new_object_layer_fn; use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::store::is_valid_object_prefix; use ecstore::store_api::StorageAPI; @@ -17,7 +18,6 @@ use ecstore::utils::path::path_join; use ecstore::utils::time::parse_duration; use ecstore::utils::xml; use ecstore::GLOBAL_Endpoints; -use ecstore::{new_object_layer_fn, store_api::BackendInfo}; use futures::{Stream, StreamExt}; use http::Uri; use hyper::StatusCode; @@ -37,7 +37,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration as std_Duration; -use std::u64; use time::{Duration, OffsetDateTime}; use tokio::sync::mpsc::{self}; use tokio::time::interval; @@ -128,7 +127,7 @@ impl Operation for AssumeRoleHandle { #[serde(rename_all = "PascalCase", default)] pub struct AccountInfo { pub account_name: String, - pub server: BackendInfo, + pub server: madmin::BackendInfo, pub policy: BucketPolicy, } @@ -224,7 +223,18 @@ impl Operation for StorageInfoHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle StorageInfoHandler"); - return Err(s3_error!(NotImplemented)); + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + // TODO:getAggregatedBackgroundHealState + + let info = store.storage_info().await; + + let output = serde_json::to_string(&info) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } @@ -492,15 +502,11 @@ fn extract_heal_init_params(body: &Bytes, uri: &Uri, params: Params<'_, '_>) -> hip.client_token = value.to_string(); } } - if key == "forceStart" { - if parts.next().is_some() { - hip.force_start = true; - } + if key == "forceStart" && parts.next().is_some() { + hip.force_start = true; } - if key == "forceStop" { - if parts.next().is_some() { - hip.force_stop = true; - } + if key == "forceStop" && parts.next().is_some() { + hip.force_stop = true; } } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e2ba46dc..e0225424 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -8,7 +8,6 @@ use common::{ error::{Error, Result}, globals::set_global_addr, }; -use ecstore::global::set_global_rustfs_port; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::utils::net::{self, get_available_port}; use ecstore::{ @@ -18,6 +17,7 @@ use ecstore::{ store::{init_local_disks, ECStore}, update_erasure_type, }; +use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use grpc::make_server; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -212,10 +212,10 @@ async fn run(opt: config::Opt) -> Result<()> { })?; warn!(" init store success!"); - // new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { - // error!("new_global_notification_sys faild {:?}", &err); - // Error::from_string(err.to_string()) - // })?; + new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { + error!("new_global_notification_sys faild {:?}", &err); + Error::from_string(err.to_string()) + })?; // init scanner init_data_scanner().await; From 3de66a37d1714e77268da27ce1ddf2878c00b170 Mon Sep 17 00:00:00 2001 From: Nugine Date: Tue, 3 Dec 2024 12:01:53 +0800 Subject: [PATCH 3/6] ci: allow clippy warnings --- .github/workflows/rust.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b2f2eb77..2e657034 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -62,7 +62,8 @@ jobs: run: cargo fmt --all --check - name: cargo clippy - run: cargo clippy -- -D warnings + run: cargo clippy + # run: cargo clippy -- -D warnings - name: cargo test run: cargo test --all --exclude e2e_test From a7a2ae278100a2304eab187c9d63da63755ce772 Mon Sep 17 00:00:00 2001 From: mujunxiang <1948535941@qq.com> Date: Tue, 3 Dec 2024 15:31:43 +0800 Subject: [PATCH 4/6] sanner(1) Signed-off-by: mujunxiang <1948535941@qq.com> --- ecstore/src/disk/local.rs | 1 + ecstore/src/disk/mod.rs | 1 + ecstore/src/heal/data_scanner.rs | 18 +++++++-- ecstore/src/heal/data_scanner_metric.rs | 2 + ecstore/src/heal/data_usage_cache.rs | 10 ++--- ecstore/src/metrics_realtime.rs | 50 ++++++++++++++++++++++++- ecstore/src/set_disk.rs | 2 + rustfs/src/admin/handlers.rs | 2 +- 8 files changed, 76 insertions(+), 10 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index b53a147b..4ad6af74 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -2092,6 +2092,7 @@ impl DiskAPI for LocalDisk { ) .await?; data_usage_info.info.last_update = Some(SystemTime::now()); + info!("ns_scanner completed: {data_usage_info:?}"); Ok(data_usage_info) } diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 079d299b..ab57ce45 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -351,6 +351,7 @@ impl DiskAPI for Disk { updates: Sender, scan_mode: HealScanMode, ) -> Result { + info!("ns_scanner"); match self { Disk::Local(local_disk) => local_disk.ns_scanner(cache, updates, scan_mode).await, Disk::Remote(remote_disk) => remote_disk.ns_scanner(cache, updates, scan_mode).await, diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 8562d32d..16ff8cc0 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -86,23 +86,28 @@ lazy_static! { } pub async fn init_data_scanner() { - let mut r = rand::thread_rng(); - let random = r.gen_range(0.0..1.0); tokio::spawn(async move { loop { run_data_scanner().await; + let random = { + let mut r = rand::thread_rng(); + r.gen_range(0.0..1.0) + }; let duration = Duration::from_secs_f64(random * (SCANNER_CYCLE.load(std::sync::atomic::Ordering::SeqCst) as f64)); let sleep_duration = if duration < Duration::new(1, 0) { Duration::new(1, 0) } else { duration }; + + info!("data scanner will sleeping {sleep_duration:?}"); sleep(sleep_duration).await; } }); } async fn run_data_scanner() { + info!("run_data_scanner"); let Some(store) = new_object_layer_fn() else { error!("errServerNotInitialized"); return; @@ -163,8 +168,10 @@ async fn run_data_scanner() { }); let mut res = HashMap::new(); res.insert("cycle".to_string(), cycle_info.current.to_string()); + info!("start ns_scanner"); match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { Ok(_) => { + info!("ns_scanner completed"); cycle_info.next += 1; cycle_info.current = 0; cycle_info.cycle_completed.push(Utc::now()); @@ -176,9 +183,14 @@ async fn run_data_scanner() { globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; let mut tmp = Vec::new(); tmp.write_u64::(cycle_info.next).unwrap(); - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await; + if let Ok(data) = cycle_info.marshal_msg(&tmp) { + let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &data).await; + } else { + let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await; + } } Err(err) => { + info!("ns_scanner failed: {:?}", err); res.insert("error".to_string(), err.to_string()); } } diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index b0f364bd..77b97c1d 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -14,6 +14,7 @@ use std::{ time::SystemTime, }; use tokio::sync::RwLock; +use tracing::info; use super::data_scanner::{CurrentScannerCycle, UpdateCurrentPathFn}; @@ -148,6 +149,7 @@ impl ScannerMetrics { } pub async fn set_cycle(&mut self, c: Option) { + info!("ScannerMetrics set_cycle {c:?}"); *self.cycle_info.write().await = c; } diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index 09be08b9..fbc6003b 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -131,7 +131,7 @@ const OBJECTS_VERSION_COUNT_INTERVALS: [ObjectHistogramInterval; DATA_USAGE_VERS ]; // sizeHistogram is a size histogram. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct SizeHistogram(Vec); impl Default for SizeHistogram { @@ -168,7 +168,7 @@ impl SizeHistogram { } // versionsHistogram is a histogram of number of versions in an object. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct VersionsHistogram(Vec); impl Default for VersionsHistogram { @@ -238,7 +238,7 @@ impl ReplicationAllStats { } } -#[derive(Clone, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct DataUsageEntry { pub children: DataUsageHashMap, // These fields do no include any children. @@ -340,7 +340,7 @@ pub struct DataUsageEntryInfo { pub entry: DataUsageEntry, } -#[derive(Clone, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct DataUsageCacheInfo { pub name: String, pub next_cycle: u32, @@ -367,7 +367,7 @@ pub struct DataUsageCacheInfo { // } // } -#[derive(Clone, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct DataUsageCache { pub info: DataUsageCacheInfo, pub cache: HashMap, diff --git a/ecstore/src/metrics_realtime.rs b/ecstore/src/metrics_realtime.rs index a219723b..97b39697 100644 --- a/ecstore/src/metrics_realtime.rs +++ b/ecstore/src/metrics_realtime.rs @@ -4,6 +4,7 @@ use chrono::Utc; use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr}; use madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics}; use serde::{Deserialize, Serialize}; +use tracing::info; use crate::{ admin_server_info::get_local_server_property, @@ -55,8 +56,10 @@ impl MetricType { } pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) -> RealtimeMetrics { + info!("collect_local_metrics"); let mut real_time_metrics = RealtimeMetrics::default(); if types.0 == MetricType::NONE.0 { + info!("types is None, return"); return real_time_metrics; } @@ -75,11 +78,13 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) } if types.contains(&MetricType::DISK) { + info!("start get disk metrics"); let mut aggr = DiskMetric { collected_at: Utc::now(), ..Default::default() }; for (name, disk) in collect_local_disks_metrics(&opts.disks).await.into_iter() { + info!("got disk metric, name: {name}, metric: {disk:?}"); real_time_metrics.by_disk.insert(name, disk.clone()); aggr.merge(&disk); } @@ -87,10 +92,31 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) } if types.contains(&MetricType::SCANNER) { + info!("start get scanner metrics"); let metrics = globalScannerMetrics.read().await.report().await; real_time_metrics.aggregated.scanner = Some(metrics); } - RealtimeMetrics::default() + + if types.contains(&MetricType::OS) {} + + if types.contains(&MetricType::BATCH_JOBS) {} + + if types.contains(&MetricType::SITE_RESYNC) {} + + if types.contains(&MetricType::NET) {} + + if types.contains(&MetricType::MEM) {} + + if types.contains(&MetricType::CPU) {} + + if types.contains(&MetricType::RPC) {} + + real_time_metrics + .by_host + .insert(by_host_name.clone(), real_time_metrics.aggregated.clone()); + real_time_metrics.hosts.push(by_host_name); + + real_time_metrics } async fn collect_local_disks_metrics(disks: &HashSet) -> HashMap { @@ -167,3 +193,25 @@ async fn collect_local_disks_metrics(disks: &HashSet) -> HashMap, heal_scan_mode: HealScanMode, ) -> Result<()> { + info!("ns_scanner"); if buckets.is_empty() { return Ok(()); } @@ -2845,6 +2846,7 @@ impl SetDisks { } let _ = join_all(futures).await; let _ = task.await; + info!("ns_scanner completed"); Ok(()) } diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index fea05ef4..a91b6f7a 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -441,7 +441,7 @@ impl Operation for MetricsHandler { // todo write resp match serde_json::to_vec(&m) { Ok(re) => { - info!("got metrics, send it to client, m: {m:?}, re: {re:?}"); + info!("got metrics, send it to client, m: {m:?}"); let _ = tx.send(Ok(Bytes::from(re))).await; } Err(e) => { From 9533e312b1e7d64363554a2615a690a3cd437e8f Mon Sep 17 00:00:00 2001 From: mujunxiang <1948535941@qq.com> Date: Tue, 3 Dec 2024 17:05:23 +0800 Subject: [PATCH 5/6] scanner(2) Signed-off-by: mujunxiang <1948535941@qq.com> --- common/lock/src/lrwmutex.rs | 5 +- ecstore/src/heal/data_scanner.rs | 80 +++++++++++++------------ ecstore/src/heal/data_scanner_metric.rs | 1 + ecstore/src/set_disk.rs | 10 +++- 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/common/lock/src/lrwmutex.rs b/common/lock/src/lrwmutex.rs index fa65f1a7..9bc3415e 100644 --- a/common/lock/src/lrwmutex.rs +++ b/common/lock/src/lrwmutex.rs @@ -2,6 +2,7 @@ use std::time::{Duration, Instant}; use rand::Rng; use tokio::{sync::RwLock, time::sleep}; +use tracing::info; #[derive(Debug, Default)] pub struct LRWMutex { @@ -87,14 +88,14 @@ impl LRWMutex { pub async fn un_lock(&self) { let is_write = true; if !self.unlock(is_write).await { - panic!("Trying to un_lock() while no Lock() is active") + info!("Trying to un_lock() while no Lock() is active") } } pub async fn un_r_lock(&self) { let is_write = false; if !self.unlock(is_write).await { - panic!("Trying to un_r_lock() while no Lock() is active") + info!("Trying to un_r_lock() while no Lock() is active") } } diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 16ff8cc0..721a09f8 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -12,7 +12,6 @@ use std::{ time::{Duration, SystemTime}, }; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use chrono::{DateTime, Utc}; use lazy_static::lazy_static; use rand::Rng; @@ -113,33 +112,15 @@ async fn run_data_scanner() { return; }; - let mut cycle_info = CurrentScannerCycle::default(); - - let mut buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) + let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) .await .map_or(Vec::new(), |buf| buf); - match buf.len().cmp(&8) { - std::cmp::Ordering::Less => {} - std::cmp::Ordering::Equal => { - cycle_info.next = match Cursor::new(buf).read_u64::() { - Ok(buf) => buf, - Err(_) => { - error!("can not decode DATA_USAGE_BLOOM_NAME_PATH"); - return; - } - }; - } - std::cmp::Ordering::Greater => { - cycle_info.next = match Cursor::new(buf[..8].to_vec()).read_u64::() { - Ok(buf) => buf, - Err(_) => { - error!("can not decode DATA_USAGE_BLOOM_NAME_PATH"); - return; - } - }; - let _ = cycle_info.unmarshal_msg(&buf.split_off(8)); - } - } + + let mut buf_t = Deserializer::new(Cursor::new(buf)); + let mut cycle_info: CurrentScannerCycle = match Deserialize::deserialize(&mut buf_t) { + Ok(info) => info, + Err(_) => CurrentScannerCycle::default(), + }; loop { let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); @@ -176,18 +157,12 @@ async fn run_data_scanner() { cycle_info.current = 0; cycle_info.cycle_completed.push(Utc::now()); if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize { - cycle_info.cycle_completed = cycle_info.cycle_completed - [cycle_info.cycle_completed.len() - DATA_USAGE_UPDATE_DIR_CYCLES as usize..] - .to_vec(); + let _ = cycle_info.cycle_completed.remove(0); } globalScannerMetrics.write().await.set_cycle(Some(cycle_info.clone())).await; - let mut tmp = Vec::new(); - tmp.write_u64::(cycle_info.next).unwrap(); - if let Ok(data) = cycle_info.marshal_msg(&tmp) { - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &data).await; - } else { - let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &tmp).await; - } + let mut wr = Vec::new(); + cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); + let _ = save_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH, &wr).await; } Err(err) => { info!("ns_scanner failed: {:?}", err); @@ -261,7 +236,7 @@ async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot HEAL_NORMAL_SCAN } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct CurrentScannerCycle { pub current: u64, pub next: u64, @@ -1081,3 +1056,34 @@ pub async fn scan_data_folder( } // pub fn eval_action_from_lifecycle(lc: &BucketLifecycleConfiguration, lr: &ObjectLockConfiguration, rcfg: &ReplicationConfiguration, obj: &ObjectInfo) + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use chrono::Utc; + use rmp_serde::{Deserializer, Serializer}; + use serde::{Deserialize, Serialize}; + + use super::CurrentScannerCycle; + + #[test] + fn test_current_cycle() { + let cycle_info = CurrentScannerCycle { + current: 0, + next: 1, + started: Utc::now(), + cycle_completed: vec![Utc::now(), Utc::now()], + }; + + println!("{cycle_info:?}"); + + let mut wr = Vec::new(); + cycle_info.serialize(&mut Serializer::new(&mut wr)).unwrap(); + + let mut buf_t = Deserializer::new(Cursor::new(wr)); + let c: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap(); + + println!("{c:?}"); + } +} diff --git a/ecstore/src/heal/data_scanner_metric.rs b/ecstore/src/heal/data_scanner_metric.rs index 77b97c1d..d96ad89c 100644 --- a/ecstore/src/heal/data_scanner_metric.rs +++ b/ecstore/src/heal/data_scanner_metric.rs @@ -218,6 +218,7 @@ impl ScannerMetrics { pub async fn report(&self) -> M_ScannerMetrics { let mut m = M_ScannerMetrics::default(); if let Some(cycle) = self.get_cycle().await { + info!("cycle: {cycle:?}"); m.current_cycle = cycle.current; m.cycles_completed_at = cycle.cycle_completed; m.current_started = cycle.started; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 31c23def..152f2063 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -2768,9 +2768,9 @@ impl SetDisks { let buckets_results_tx_clone = buckets_results_tx.clone(); futures.push(async move { loop { - match bucket_rx_clone.write().await.recv().await { - None => return, - Some(bucket_info) => { + match bucket_rx_clone.write().await.try_recv() { + Err(_) => return, + Ok(bucket_info) => { let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME); let mut cache = match DataUsageCache::load(self, &cache_name.to_string_lossy()).await { Ok(cache) => cache, @@ -2841,10 +2841,14 @@ impl SetDisks { let _ = cache.save(&cache_name.to_string_lossy()).await; } } + info!("continue scanner"); } }); } + info!("ns_scanner start"); let _ = join_all(futures).await; + drop(buckets_results_tx); + info!("1"); let _ = task.await; info!("ns_scanner completed"); Ok(()) From e39775bb34743cb76f82211c5691ee5b6a776150 Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 4 Dec 2024 10:55:09 +0800 Subject: [PATCH 6/6] notification_sys base --- common/workers/src/workers.rs | 2 +- ecstore/src/bucket/policy/resource.rs | 1 + ecstore/src/disk/error.rs | 5 ++--- ecstore/src/disk/mod.rs | 24 +++++++---------------- ecstore/src/erasure.rs | 4 ++-- ecstore/src/file_meta.rs | 2 +- ecstore/src/global.rs | 4 ++-- ecstore/src/heal/data_scanner.rs | 5 +---- ecstore/src/notification_sys.rs | 17 +++++++++++++++- ecstore/src/peer.rs | 2 +- ecstore/src/pools.rs | 28 ++++++++++++++++++--------- ecstore/src/store_api.rs | 7 ++++--- ecstore/src/store_err.rs | 6 +++--- ecstore/src/utils/os/unix.rs | 2 +- madmin/src/metrics.rs | 6 +++--- rustfs/src/config/mod.rs | 4 ---- rustfs/src/storage/ecfs.rs | 2 -- 17 files changed, 64 insertions(+), 57 deletions(-) diff --git a/common/workers/src/workers.rs b/common/workers/src/workers.rs index 5c03447f..85be5efb 100644 --- a/common/workers/src/workers.rs +++ b/common/workers/src/workers.rs @@ -81,7 +81,7 @@ mod tests { sleep(Duration::from_secs(1)).await; workers.wait().await; if workers.available().await != workers.limit { - assert!(false); + unreachable!(); } } } diff --git a/ecstore/src/bucket/policy/resource.rs b/ecstore/src/bucket/policy/resource.rs index 409cd9cb..b9c520f5 100644 --- a/ecstore/src/bucket/policy/resource.rs +++ b/ecstore/src/bucket/policy/resource.rs @@ -169,6 +169,7 @@ impl<'de> Deserialize<'de> for Resource { { struct Visitor; + #[allow(clippy::needless_lifetimes)] impl<'de> serde::de::Visitor<'de> for Visitor { type Value = Resource; diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index 0902e5d0..36e49f46 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -139,13 +139,12 @@ impl DiskError { } pub fn count_errs(&self, errs: &[Option]) -> usize { - return errs - .iter() + errs.iter() .filter(|&err| match err { None => false, Some(e) => self.is(e), }) - .count(); + .count() } pub fn quorum_unformatted_disks(errs: &[Option]) -> bool { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index ab57ce45..dd1fc6c0 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -994,6 +994,7 @@ impl BufferWriter { pub fn new(inner: Vec) -> Self { Self { inner } } + #[allow(clippy::should_implement_trait)] pub fn as_ref(&self) -> &[u8] { self.inner.as_ref() } @@ -1038,6 +1039,10 @@ impl Writer for LocalFileWriter { } } +type NodeClient = NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, +>; + #[derive(Debug)] pub struct RemoteFileWriter { pub root: PathBuf, @@ -1049,15 +1054,7 @@ pub struct RemoteFileWriter { } impl RemoteFileWriter { - pub async fn new( - root: PathBuf, - volume: String, - path: String, - is_append: bool, - mut client: NodeServiceClient< - InterceptedService) -> Result, Status> + Send + Sync + 'static>>, - >, - ) -> Result { + pub async fn new(root: PathBuf, volume: String, path: String, is_append: bool, mut client: NodeClient) -> Result { let (tx, rx) = mpsc::channel(128); let in_stream = ReceiverStream::new(rx); @@ -1247,14 +1244,7 @@ pub struct RemoteFileReader { } impl RemoteFileReader { - pub async fn new( - root: PathBuf, - volume: String, - path: String, - mut client: NodeServiceClient< - InterceptedService) -> Result, Status> + Send + Sync + 'static>>, - >, - ) -> Result { + pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeClient) -> Result { let (tx, rx) = mpsc::channel(128); let in_stream = ReceiverStream::new(rx); diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 9691a763..bd559e49 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -387,7 +387,7 @@ impl Erasure { // 算出每个分片大小 pub fn shard_size(&self, data_size: usize) -> usize { - (data_size + self.data_shards - 1) / self.data_shards + data_size.div_ceil(self.data_shards) } // returns final erasure size from original size. pub fn shard_file_size(&self, total_size: usize) -> usize { @@ -397,7 +397,7 @@ impl Erasure { let num_shards = total_size / self.block_size; let last_block_size = total_size % self.block_size; - let last_shard_size = (last_block_size + self.data_shards - 1) / self.data_shards; + let last_shard_size = last_block_size.div_ceil(self.data_shards); num_shards * self.shard_size(self.block_size) + last_shard_size // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index d506e3ef..3afae055 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -250,7 +250,7 @@ impl FileMeta { // TODO: use old buf let meta_buf = ver.marshal_msg()?; - let pre_mod_time = self.versions[idx].header.mod_time.clone(); + let pre_mod_time = self.versions[idx].header.mod_time; self.versions[idx].header = ver.header(); self.versions[idx].meta = meta_buf; diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index 81f1a0d3..f5a817c4 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -39,7 +39,7 @@ lazy_static! { pub fn global_rustfs_port() -> u16 { if let Some(p) = GLOBAL_RUSTFS_PORT.get() { - p.clone() + *p } else { DEFAULT_PORT } @@ -74,7 +74,7 @@ pub fn get_global_endpoints() -> EndpointServerPools { } pub fn new_object_layer_fn() -> Option> { - GLOBAL_OBJECT_API.get().map(|ec| ec.clone()) + GLOBAL_OBJECT_API.get().cloned() } pub async fn set_object_layer(o: Arc) { diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 721a09f8..f3d1745d 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -117,10 +117,7 @@ async fn run_data_scanner() { .map_or(Vec::new(), |buf| buf); let mut buf_t = Deserializer::new(Cursor::new(buf)); - let mut cycle_info: CurrentScannerCycle = match Deserialize::deserialize(&mut buf_t) { - Ok(info) => info, - Err(_) => CurrentScannerCycle::default(), - }; + let mut cycle_info: CurrentScannerCycle = Deserialize::deserialize(&mut buf_t).unwrap_or_default(); loop { let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index 359e4c3f..a36d1462 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; use crate::endpoints::EndpointServerPools; use crate::error::{Error, Result}; @@ -7,6 +7,7 @@ use crate::peer_rest_client::PeerRestClient; use crate::StorageAPI; use futures::future::join_all; use lazy_static::lazy_static; +use tracing::error; lazy_static! { pub static ref GLOBAL_NotificationSys: OnceLock = OnceLock::new(); @@ -89,6 +90,20 @@ impl NotificationSys { let backend = api.backend_info().await; madmin::StorageInfo { disks, backend } } + + pub async fn reload_pool_meta(&self) { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + for client in self.peer_clients.iter().flatten() { + futures.push(client.reload_pool_meta()); + } + + let results = join_all(futures).await; + for result in results { + if let Err(err) = result { + error!("notification reload_pool_meta err {:?}", err); + } + } + } } fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec { diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 61737fc0..a5217ba9 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -110,7 +110,7 @@ impl S3PeerSys { let mut futures = Vec::new(); let heal_bucket_results = Arc::new(RwLock::new(vec![HealResultItem::default(); self.clients.len()])); for (idx, client) in self.clients.iter().enumerate() { - let opts_clone = opts.clone(); + let opts_clone = opts; let heal_bucket_results_clone = heal_bucket_results.clone(); futures.push(async move { match client.heal_bucket(bucket, &opts_clone).await { diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index d355141a..d6a43f83 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -5,6 +5,7 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; use crate::error::{Error, Result}; use crate::heal::heal_commands::HealOpts; use crate::new_object_layer_fn; +use crate::notification_sys::get_global_notification_sys; use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI}; use crate::store_err::{is_err_bucket_exists, StorageError}; use crate::utils::path::{path_join, SLASH_SEPARATOR}; @@ -12,6 +13,7 @@ use crate::{sets::Sets, store::ECStore}; use byteorder::{ByteOrder, LittleEndian, WriteBytesExt}; use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; +use std::fmt::Display; use std::io::{Cursor, Write}; use std::path::PathBuf; use std::sync::Arc; @@ -270,7 +272,7 @@ impl PoolMeta { } } -fn path2_bucket_object(name: &String) -> (String, String) { +fn path2_bucket_object(name: &str) -> (String, String) { path2_bucket_object_with_base_path("", name) } @@ -374,11 +376,13 @@ pub struct DecomBucketInfo { pub prefix: String, } -impl ToString for DecomBucketInfo { - fn to_string(&self) -> String { - path_join(&[PathBuf::from(self.name.clone()), PathBuf::from(self.prefix.clone())]) - .to_string_lossy() - .to_string() +impl Display for DecomBucketInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + path_join(&[PathBuf::from(self.name.clone()), PathBuf::from(self.prefix.clone())]).to_string_lossy() + ) } } @@ -531,7 +535,9 @@ impl ECStore { let mut pool_meta = self.pool_meta.write().await; if pool_meta.decommission_failed(idx) { pool_meta.save(self.pools.clone()).await?; - // FIXME: globalNotificationSys.ReloadPoolMeta(ctx) + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.reload_pool_meta().await; + } } Ok(()) @@ -545,7 +551,9 @@ impl ECStore { let mut pool_meta = self.pool_meta.write().await; if pool_meta.decommission_complete(idx) { pool_meta.save(self.pools.clone()).await?; - // FIXME: globalNotificationSys.ReloadPoolMeta(ctx) + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.reload_pool_meta().await; + } } Ok(()) @@ -641,7 +649,9 @@ impl ECStore { pool_meta.save(self.pools.clone()).await?; - // FIXME: globalNotificationSys.ReloadPoolMeta(ctx) + if let Some(notification_sys) = get_global_notification_sys() { + notification_sys.reload_pool_meta().await; + } Ok(()) } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 8b34fde3..715d2181 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -331,7 +331,7 @@ impl ErasureInfo { // 算出每个分片大小 pub fn shard_size(&self, data_size: usize) -> usize { - (data_size + self.data_blocks - 1) / self.data_blocks + data_size.div_ceil(self.data_blocks) } // returns final erasure size from original size. @@ -342,7 +342,7 @@ impl ErasureInfo { let num_shards = total_size / self.block_size; let last_block_size = total_size % self.block_size; - let last_shard_size = (last_block_size + self.data_blocks - 1) / self.data_blocks; + let last_shard_size = last_block_size.div_ceil(self.data_blocks); num_shards * self.shard_size(self.block_size) + last_shard_size // // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 @@ -826,6 +826,7 @@ pub trait ObjectIO: Send + Sync + 'static { } #[async_trait::async_trait] +#[allow(clippy::too_many_arguments)] pub trait StorageAPI: ObjectIO { // NewNSLock TODO: // Shutdown TODO: @@ -873,7 +874,7 @@ pub trait StorageAPI: ObjectIO { objects: Vec, opts: ObjectOptions, ) -> Result<(Vec, Vec>)>; - #[warn(clippy::too_many_arguments)] + // TransitionObject TODO: // RestoreTransitionedObject TODO: diff --git a/ecstore/src/store_err.rs b/ecstore/src/store_err.rs index 5ee591dd..e0dabb87 100644 --- a/ecstore/src/store_err.rs +++ b/ecstore/src/store_err.rs @@ -208,7 +208,7 @@ pub fn is_err_object_not_found(err: &Error) -> bool { fn test_storage_error() { let e1 = Error::new(StorageError::BucketExists("ss".into())); let e2 = Error::new(StorageError::ObjectNotFound("ss".into(), "sdf".to_owned())); - assert_eq!(is_err_bucket_exists(&e1), true); - assert_eq!(is_err_object_not_found(&e1), false); - assert_eq!(is_err_object_not_found(&e2), true); + assert!(is_err_bucket_exists(&e1)); + assert!(!is_err_object_not_found(&e1)); + assert!(is_err_object_not_found(&e2)); } diff --git a/ecstore/src/utils/os/unix.rs b/ecstore/src/utils/os/unix.rs index 17ccd06f..bee85715 100644 --- a/ecstore/src/utils/os/unix.rs +++ b/ecstore/src/utils/os/unix.rs @@ -77,6 +77,6 @@ pub fn same_disk(disk1: &str, disk2: &str) -> Result { Ok(stat1.st_dev == stat2.st_dev) } -pub fn get_drive_stats(major: u32, minor: u32) -> Result { +pub fn get_drive_stats(_major: u32, _minor: u32) -> Result { Ok(IOStats::default()) } diff --git a/madmin/src/metrics.rs b/madmin/src/metrics.rs index fee5b365..f8c6948c 100644 --- a/madmin/src/metrics.rs +++ b/madmin/src/metrics.rs @@ -168,7 +168,7 @@ impl ScannerMetrics { self.last_minute.ilm.entry(k.clone()).or_default().merge(v); } - self.active_paths.extend(other.active_paths.clone().into_iter()); + self.active_paths.extend(other.active_paths.clone()); self.active_paths.sort(); } @@ -606,14 +606,14 @@ pub struct RealtimeMetrics { impl RealtimeMetrics { pub fn merge(&mut self, other: Self) { if !other.errors.is_empty() { - self.errors.extend(other.errors.into_iter()); + self.errors.extend(other.errors); } for (k, v) in other.by_host.into_iter() { *self.by_host.entry(k).or_default() = v; } - self.hosts.extend(other.hosts.into_iter()); + self.hosts.extend(other.hosts); self.aggregated.merge(&other.aggregated); self.hosts.sort(); diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index f038b3b5..c87957cd 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -4,10 +4,6 @@ use ecstore::global::DEFAULT_PORT; shadow_rs::shadow!(build); -/// Default port that a rustfs server listens on. -/// -/// Used if no port is specified. - pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index cd1da10e..b3a40ac8 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -591,8 +591,6 @@ impl S3 for FS { .await .map_err(to_s3_error)?; - error!("ObjectOptions {:?}", opts); - let obj_info = store .put_object(&bucket, &key, &mut reader, &opts) .await