From eb12c9b00534d33fe6487ebfc7cddec12be5e852 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 3 Dec 2024 17:38:52 +0800 Subject: [PATCH] test StorageInfoHandler --- Cargo.lock | 2 + crypto/src/jwt/tests.rs | 9 +- e2e_test/Cargo.toml | 3 +- e2e_test/src/reliant/node_interact_test.rs | 4 +- ecstore/src/admin_server_info.rs | 9 +- ecstore/src/config/common.rs | 4 +- ecstore/src/disk/local.rs | 2 +- ecstore/src/endpoints.rs | 52 ++++++-- ecstore/src/global.rs | 8 ++ ecstore/src/heal/heal_commands.rs | 37 +----- ecstore/src/heal/heal_ops.rs | 5 +- ecstore/src/notification_sys.rs | 66 +++++++++- ecstore/src/peer_rest_client.rs | 104 +++++++++------- ecstore/src/pools.rs | 6 +- ecstore/src/set_disk.rs | 29 +++-- ecstore/src/sets.rs | 14 +-- ecstore/src/store.rs | 28 +++-- ecstore/src/store_api.rs | 86 +------------ ecstore/src/utils/net.rs | 30 +++-- madmin/Cargo.toml | 1 + madmin/src/info_commands.rs | 136 ++++++++++++++++++++- madmin/src/lib.rs | 2 + rustfs/src/admin/handlers.rs | 30 +++-- rustfs/src/main.rs | 10 +- 24 files changed, 420 insertions(+), 257 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18efd2c8..624ea269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -764,6 +764,7 @@ dependencies = [ "flatbuffers", "lazy_static", "lock", + "madmin", "protos", "rmp-serde", "serde", @@ -1677,6 +1678,7 @@ dependencies = [ "common", "psutil", "serde", + "time", ] [[package]] diff --git a/crypto/src/jwt/tests.rs b/crypto/src/jwt/tests.rs index abfe0020..627e8802 100644 --- a/crypto/src/jwt/tests.rs +++ b/crypto/src/jwt/tests.rs @@ -10,7 +10,10 @@ fn test() { "bbb": "bbb" }); - let jwt_token = encode(b"aaaa", &claims).unwrap(); - let new_claims = decode(&jwt_token, b"aaaa").unwrap(); - assert_eq!(new_claims.claims, claims); + let jwt_token = encode(b"aaaa", &claims).unwrap_or_default(); + let new_claims = match decode(&jwt_token, b"aaaa") { + Ok(res) => Some(res.claims), + Err(_errr) => None, + }; + assert_eq!(new_claims, Some(claims)); } diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 771f97e8..6700f01b 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -20,4 +20,5 @@ serde_json.workspace = true tonic = { version = "0.12.3", features = ["gzip"] } tokio = { workspace = true } tower.workspace = true -url.workspace = true \ No newline at end of file +url.workspace = true +madmin.workspace =true \ No newline at end of file diff --git a/e2e_test/src/reliant/node_interact_test.rs b/e2e_test/src/reliant/node_interact_test.rs index db456ad2..16b333a3 100644 --- a/e2e_test/src/reliant/node_interact_test.rs +++ b/e2e_test/src/reliant/node_interact_test.rs @@ -1,6 +1,6 @@ #![cfg(test)] -use ecstore::{disk::VolumeInfo, store_api::StorageInfo}; +use ecstore::disk::VolumeInfo; use protos::{ models::{PingBody, PingBodyBuilder}, node_service_time_out_client, @@ -118,7 +118,7 @@ async fn storage_info() -> Result<(), Box> { let info = response.storage_info; let mut buf = Deserializer::new(Cursor::new(info)); - let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); + let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); println!("{:?}", storage_info); Ok(()) } diff --git a/ecstore/src/admin_server_info.rs b/ecstore/src/admin_server_info.rs index bee9d996..6ecadff2 100644 --- a/ecstore/src/admin_server_info.rs +++ b/ecstore/src/admin_server_info.rs @@ -15,12 +15,7 @@ use protos::{ use serde::{Deserialize, Serialize}; use tonic::Request; -use crate::{ - disk::endpoint::Endpoint, - global::GLOBAL_Endpoints, - new_object_layer_fn, - store_api::{StorageAPI, StorageDisk}, -}; +use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI}; pub const ITEM_OFFLINE: &str = "offline"; pub const ITEM_INITIALIZING: &str = "initializing"; @@ -44,7 +39,7 @@ pub struct ServerProperties { pub version: String, pub commit_id: String, pub network: HashMap, - pub disks: Vec, + pub disks: Vec, pub pool_number: i32, pub pool_numbers: Vec, pub mem_stats: MemStats, diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs index 6a47e489..f25f557c 100644 --- a/ecstore/src/config/common.rs +++ b/ecstore/src/config/common.rs @@ -105,9 +105,9 @@ pub async fn read_config_without_migrate(api: Arc) -> Result res, Err(err) => { if is_not_found(&err) { - warn!("config not found init start"); + warn!("config not found, start to init"); let cfg = new_and_save_server_config(api).await?; - warn!("config not found init done"); + warn!("config init done"); return Ok(cfg); } else { error!("read config err {:?}", &err); diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index c85db05f..b53a147b 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1291,7 +1291,7 @@ impl DiskAPI for LocalDisk { Ok(res) => res, Err(e) => { if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) { - error!("list_dir err {:?}", &e); + info!("list_dir err {:?}", &e); } if opts.report_notfound && is_err_file_not_found(&e) { diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 73dd58d7..334c74d6 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -1,9 +1,11 @@ +use tracing::warn; + use crate::{ disk::endpoint::{Endpoint, EndpointType}, disks_layout::DisksLayout, error::{Error, Result}, global::global_rustfs_port, - utils::net, + utils::net::{self, XHost}, }; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -530,20 +532,30 @@ impl EndpointServerPools { nodes } - pub fn hosts_sorted(&self) -> Vec<()> { + pub fn hosts_sorted(&self) -> Vec> { let (mut peers, local) = self.peers(); + let mut ret = vec![None; peers.len()]; + peers.sort(); - for peer in peers.iter() { + for (i, peer) in peers.iter().enumerate() { if &local == peer { continue; } - // FIXME:TODO + let host = match XHost::try_from(peer.clone()) { + Ok(res) => res, + Err(err) => { + warn!("Xhost parse failed {:?}", err); + continue; + } + }; + + ret[i] = Some(host); } - unimplemented!() + ret } pub fn peers(&self) -> (Vec, String) { let mut local = None; @@ -554,12 +566,8 @@ impl EndpointServerPools { continue; } let host = endpoint.host_port(); - if endpoint.is_local { - if endpoint.url.port() == Some(global_rustfs_port()) { - if local.is_none() { - local = Some(host.clone()); - } - } + if endpoint.is_local && endpoint.url.port() == Some(global_rustfs_port()) && local.is_none() { + local = Some(host.clone()); } set.insert(host); @@ -570,6 +578,28 @@ impl EndpointServerPools { (hosts, local.unwrap_or_default()) } + + pub fn find_grid_hosts_from_peer(&self, host: &XHost) -> Option { + for ep in self.0.iter() { + for endpoint in ep.endpoints.0.iter() { + if endpoint.is_local { + continue; + } + let xhost = match XHost::try_from(endpoint.host_port()) { + Ok(res) => res, + Err(_) => { + continue; + } + }; + + if xhost.to_string() == host.to_string() { + return Some(endpoint.grid_host()); + } + } + } + + None + } } #[cfg(test)] diff --git a/ecstore/src/global.rs b/ecstore/src/global.rs index 63e07edb..81f1a0d3 100644 --- a/ecstore/src/global.rs +++ b/ecstore/src/global.rs @@ -65,6 +65,14 @@ pub fn set_global_endpoints(eps: Vec) { .expect("GLOBAL_Endpoints set faild") } +pub fn get_global_endpoints() -> EndpointServerPools { + if let Some(eps) = GLOBAL_Endpoints.get() { + eps.clone() + } else { + EndpointServerPools::default() + } +} + pub fn new_object_layer_fn() -> Option> { GLOBAL_OBJECT_API.get().map(|ec| ec.clone()) } diff --git a/ecstore/src/heal/heal_commands.rs b/ecstore/src/heal/heal_commands.rs index 2943f1e5..0ce85910 100644 --- a/ecstore/src/heal/heal_commands.rs +++ b/ecstore/src/heal/heal_commands.rs @@ -17,7 +17,7 @@ use crate::{ global::GLOBAL_BackgroundHealState, heal::heal_ops::HEALING_TRACKER_FILENAME, new_object_layer_fn, - store_api::{BucketInfo, StorageAPI, StorageDisk}, + store_api::{BucketInfo, StorageAPI}, utils::fs::read_file, }; @@ -131,35 +131,6 @@ impl Default for HealStartSuccess { pub type HealStopSuccess = HealStartSuccess; -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct HealingDisk { - pub id: String, - pub heal_id: String, - pub pool_index: Option, - pub set_index: Option, - pub disk_index: Option, - pub endpoint: String, - pub path: String, - pub started: Option, - pub last_update: Option, - pub retry_attempts: u64, - pub objects_total_count: u64, - pub objects_total_size: u64, - pub items_healed: u64, - pub items_failed: u64, - pub item_skipped: u64, - pub bytes_done: u64, - pub bytes_failed: u64, - pub bytes_skipped: u64, - pub objects_healed: u64, - pub objects_failed: u64, - pub bucket: String, - pub object: String, - pub queue_buckets: Vec, - pub healed_buckets: Vec, - pub finished: bool, -} - #[derive(Debug, Default, Deserialize, Serialize)] pub struct HealingTracker { #[serde(skip_serializing, skip_deserializing)] @@ -375,10 +346,10 @@ impl HealingTracker { }); } - pub async fn to_healing_disk(&self) -> HealingDisk { + pub async fn to_healing_disk(&self) -> madmin::HealingDisk { let _ = self.mu.read().await; - HealingDisk { + madmin::HealingDisk { id: self.id.clone(), heal_id: self.heal_id.clone(), pool_index: self.pool_index, @@ -516,7 +487,7 @@ pub struct SetStatus { pub heal_status: String, pub heal_priority: String, pub total_objects: usize, - pub disks: Vec, + pub disks: Vec, } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index 73d7f82f..53b5551a 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -3,8 +3,7 @@ use super::{ data_scanner::HEAL_DELETE_DANGLING, error::ERR_SKIP_FILE, heal_commands::{ - HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker, - HEAL_ITEM_BUCKET_METADATA, + HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA, }, }; use crate::store_api::StorageAPI; @@ -664,7 +663,7 @@ impl AllHealState { self.heal_status.write().await.insert(tracker.id.clone(), tracker.clone()); } - pub async fn get_local_healing_disks(&self) -> HashMap { + pub async fn get_local_healing_disks(&self) -> HashMap { let _ = self.mu.read().await; let mut dst = HashMap::new(); diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index 4ea46a1d..359e4c3f 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -1,8 +1,11 @@ -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use crate::endpoints::EndpointServerPools; use crate::error::{Error, Result}; +use crate::global::get_global_endpoints; use crate::peer_rest_client::PeerRestClient; +use crate::StorageAPI; +use futures::future::join_all; use lazy_static::lazy_static; lazy_static! { @@ -16,9 +19,13 @@ pub async fn new_global_notification_sys(eps: EndpointServerPools) -> Result<()> Ok(()) } +pub fn get_global_notification_sys() -> Option<&'static NotificationSys> { + GLOBAL_NotificationSys.get() +} + pub struct NotificationSys { - pub peer_clients: Vec, - pub all_peer_clients: Vec, + peer_clients: Vec>, + all_peer_clients: Vec>, } impl NotificationSys { @@ -50,4 +57,57 @@ impl NotificationSys { pub async fn delete_user(&self) -> Vec { unimplemented!() } + + pub async fn storage_info(&self, api: &S) -> madmin::StorageInfo { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + + for client in self.peer_clients.iter() { + futures.push(async move { + if let Some(client) = client { + match client.local_storage_info().await { + Ok(info) => Some(info), + Err(_) => Some(madmin::StorageInfo { + disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()), + ..Default::default() + }), + } + } else { + None + } + }); + } + + let mut replies = join_all(futures).await; + + replies.push(Some(api.local_storage_info().await)); + + let mut disks = Vec::new(); + for info in replies.into_iter().flatten() { + disks.extend(info.disks); + } + + let backend = api.backend_info().await; + madmin::StorageInfo { disks, backend } + } +} + +fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec { + let mut offline_disks = Vec::new(); + + for pool in endpoints.as_ref() { + for ep in pool.endpoints.as_ref() { + if (offline_host.is_empty() && ep.is_local) || offline_host == ep.host_port() { + offline_disks.push(madmin::Disk { + endpoint: ep.to_string(), + state: madmin::ItemState::Offline.to_string().to_owned(), + pool_index: ep.pool_idx, + set_index: ep.set_idx, + disk_index: ep.disk_idx, + ..Default::default() + }); + } + } + } + + offline_disks } diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index d60b537d..76aaee69 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -6,7 +6,7 @@ use crate::{ global::is_dist_erasure, heal::heal_commands::BgHealState, metrics_realtime::{CollectMetricsOpts, MetricType}, - store_api::StorageInfo, + utils::net::XHost, }; use common::error::{Error, Result}; use madmin::{ @@ -29,35 +29,53 @@ use protos::{ use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize as _}; use tonic::Request; +use tracing::warn; pub const PEER_RESTSIGNAL: &str = "signal"; pub const PEER_RESTSUB_SYS: &str = "sub-sys"; pub const PEER_RESTDRY_RUN: &str = "dry-run"; +#[derive(Debug, Clone)] pub struct PeerRestClient { - addr: String, + pub host: XHost, + pub grid_host: String, } impl PeerRestClient { - pub fn new(url: url::Url) -> Self { - Self { - addr: format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()), - } + pub fn new(host: XHost, grid_host: String) -> Self { + Self { host, grid_host } } - pub async fn new_clients(_eps: EndpointServerPools) -> (Vec, Vec) { + pub async fn new_clients(eps: EndpointServerPools) -> (Vec>, Vec>) { if !is_dist_erasure().await { return (Vec::new(), Vec::new()); } - // FIXME:TODO + let eps = eps.clone(); + let hosts = eps.hosts_sorted(); + let mut remote = vec![None; hosts.len()]; + let mut all = Vec::with_capacity(hosts.len()); + for (i, hs_host) in hosts.iter().enumerate() { + if let Some(host) = hs_host { + if let Some(grid_host) = eps.find_grid_hosts_from_peer(host) { + let client = PeerRestClient::new(host.clone(), grid_host); - todo!() + all[i] = Some(client.clone()); + remote.push(Some(client)); + } + } + } + + if all.len() != remote.len() + 1 { + warn!("Expected number of all hosts ({}) to be remote +1 ({})", all.len(), remote.len()); + } + + (remote, all) } } impl PeerRestClient { - pub async fn local_storage_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + pub async fn local_storage_info(&self) -> Result { + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LocalStorageInfoRequest { metrics: true }); @@ -72,13 +90,13 @@ impl PeerRestClient { let data = response.storage_info; let mut buf = Deserializer::new(Cursor::new(data)); - let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); + let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap(); Ok(storage_info) } pub async fn server_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ServerInfoRequest { metrics: true }); @@ -99,7 +117,7 @@ impl PeerRestClient { } pub async fn get_cpus(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetCpusRequest {}); @@ -120,7 +138,7 @@ impl PeerRestClient { } pub async fn get_net_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetNetInfoRequest {}); @@ -141,7 +159,7 @@ impl PeerRestClient { } pub async fn get_partitions(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetPartitionsRequest {}); @@ -162,7 +180,7 @@ impl PeerRestClient { } pub async fn get_os_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetOsInfoRequest {}); @@ -183,7 +201,7 @@ impl PeerRestClient { } pub async fn get_se_linux_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSeLinuxInfoRequest {}); @@ -204,7 +222,7 @@ impl PeerRestClient { } pub async fn get_sys_config(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSysConfigRequest {}); @@ -225,7 +243,7 @@ impl PeerRestClient { } pub async fn get_sys_errors(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetSysErrorsRequest {}); @@ -246,7 +264,7 @@ impl PeerRestClient { } pub async fn get_mem_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetMemInfoRequest {}); @@ -267,7 +285,7 @@ impl PeerRestClient { } pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let mut buf_t = Vec::new(); @@ -295,7 +313,7 @@ impl PeerRestClient { } pub async fn get_proc_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(GetProcInfoRequest {}); @@ -316,7 +334,7 @@ impl PeerRestClient { } pub async fn start_profiling(&self, profiler: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(StartProfilingRequest { @@ -350,7 +368,7 @@ impl PeerRestClient { } pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadBucketMetadataRequest { @@ -368,7 +386,7 @@ impl PeerRestClient { } pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteBucketMetadataRequest { @@ -386,7 +404,7 @@ impl PeerRestClient { } pub async fn delete_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeletePolicyRequest { @@ -404,7 +422,7 @@ impl PeerRestClient { } pub async fn load_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadPolicyRequest { @@ -422,7 +440,7 @@ impl PeerRestClient { } pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadPolicyMappingRequest { @@ -442,7 +460,7 @@ impl PeerRestClient { } pub async fn delete_user(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteUserRequest { @@ -460,7 +478,7 @@ impl PeerRestClient { } pub async fn delete_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(DeleteServiceAccountRequest { @@ -478,7 +496,7 @@ impl PeerRestClient { } pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadUserRequest { @@ -497,7 +515,7 @@ impl PeerRestClient { } pub async fn load_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadServiceAccountRequest { @@ -515,7 +533,7 @@ impl PeerRestClient { } pub async fn load_group(&self, group: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadGroupRequest { @@ -533,7 +551,7 @@ impl PeerRestClient { } pub async fn reload_site_replication_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ReloadSiteReplicationConfigRequest {}); @@ -549,7 +567,7 @@ impl PeerRestClient { } pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let mut vars = HashMap::new(); @@ -571,7 +589,7 @@ impl PeerRestClient { } pub async fn background_heal_status(&self) -> Result { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(BackgroundHealStatusRequest {}); @@ -592,21 +610,21 @@ impl PeerRestClient { } pub async fn get_metacache_listing(&self) -> Result<()> { - let mut _client = node_service_time_out_client(&self.addr) + let mut _client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; todo!() } pub async fn update_metacache_listing(&self) -> Result<()> { - let mut _client = node_service_time_out_client(&self.addr) + let mut _client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; todo!() } pub async fn reload_pool_meta(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(ReloadPoolMetaRequest {}); @@ -623,7 +641,7 @@ impl PeerRestClient { } pub async fn stop_rebalance(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(StopRebalanceRequest {}); @@ -640,7 +658,7 @@ impl PeerRestClient { } pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadRebalanceMetaRequest { start_rebalance }); @@ -657,7 +675,7 @@ impl PeerRestClient { } pub async fn load_transition_tier_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client(&self.grid_host) .await .map_err(|err| Error::msg(err.to_string()))?; let request = Request::new(LoadTransitionTierConfigRequest {}); diff --git a/ecstore/src/pools.rs b/ecstore/src/pools.rs index 8712308e..d355141a 100644 --- a/ecstore/src/pools.rs +++ b/ecstore/src/pools.rs @@ -5,7 +5,7 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; use crate::error::{Error, Result}; use crate::heal::heal_commands::HealOpts; use crate::new_object_layer_fn; -use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI, StorageDisk, StorageInfo}; +use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI}; use crate::store_err::{is_err_bucket_exists, StorageError}; use crate::utils::path::{path_join, SLASH_SEPARATOR}; use crate::{sets::Sets, store::ECStore}; @@ -670,7 +670,7 @@ impl ECStore { } } -fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize { +fn get_total_usable_capacity(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize { let mut capacity = 0; for disk in disks.iter() { if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { @@ -683,7 +683,7 @@ fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize capacity } -fn get_total_usable_capacity_free(disks: &[StorageDisk], info: &StorageInfo) -> usize { +fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize { let mut capacity = 0; for disk in disks.iter() { if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize { diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 08df4853..a320c97e 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -38,10 +38,9 @@ use crate::{ }, quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError}, store_api::{ - BackendByte, BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, - GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, - MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, - RawFileInfo, StorageAPI, StorageDisk, StorageInfo, DEFAULT_BITROT_ALGO, + BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec, + ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, ObjectIO, ObjectInfo, + ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, RawFileInfo, StorageAPI, DEFAULT_BITROT_ALGO, }, store_err::{to_object_err, StorageError}, store_init::{load_format_erasure, ErasureError}, @@ -3590,15 +3589,15 @@ impl ObjectIO for SetDisks { #[async_trait::async_trait] impl StorageAPI for SetDisks { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { unimplemented!() } - async fn storage_info(&self) -> StorageInfo { + async fn storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; get_storage_info(&disks, &self.set_endpoints).await } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; let mut local_disks: Vec>> = Vec::new(); @@ -4941,13 +4940,13 @@ pub fn should_heal_object_on_disk( (false, err.clone()) } -async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { +async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { let mut ret = Vec::new(); for (i, pool) in disks.iter().enumerate() { if let Some(disk) = pool { match disk.disk_info(&DiskInfoOptions::default()).await { - Ok(res) => ret.push(StorageDisk { + Ok(res) => ret.push(madmin::Disk { endpoint: eps[i].to_string(), local: eps[i].is_local, pool_index: eps[i].pool_idx, @@ -4978,7 +4977,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec ret.push(StorageDisk { + Err(err) => ret.push(madmin::Disk { state: err.to_string(), endpoint: eps[i].to_string(), local: eps[i].is_local, @@ -4989,7 +4988,7 @@ async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec], eps: &[Endpoint]) -> Vec>, eps: &Vec) -> StorageInfo { +async fn get_storage_info(disks: &Vec>, eps: &Vec) -> madmin::StorageInfo { let mut disks = get_disks_info(disks, eps).await; disks.sort_by(|a, b| a.total_space.cmp(&b.total_space)); - StorageInfo { + madmin::StorageInfo { disks, - backend: BackendInfo { - backend_type: BackendByte::Erasure, + backend: madmin::BackendInfo { + backend_type: madmin::BackendByte::Erasure, ..Default::default() }, } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 50537374..000fe458 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -23,9 +23,9 @@ use crate::{ }, set_disk::SetDisks, store_api::{ - BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, + BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, - ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, StorageInfo, + ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, }, store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file}, utils::hash, @@ -294,10 +294,10 @@ impl ObjectIO for Sets { #[async_trait::async_trait] impl StorageAPI for Sets { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { unimplemented!() } - async fn storage_info(&self) -> StorageInfo { + async fn storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.disk_set.len()); for set in self.disk_set.iter() { @@ -312,12 +312,12 @@ impl StorageAPI for Sets { disks.extend_from_slice(&res.disks); } - StorageInfo { + madmin::StorageInfo { disks, ..Default::default() } } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.disk_set.len()); for set in self.disk_set.iter() { @@ -331,7 +331,7 @@ impl StorageAPI for Sets { for res in results.into_iter() { disks.extend_from_slice(&res.disks); } - StorageInfo { + madmin::StorageInfo { disks, ..Default::default() } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 2dcc5dc6..148837f5 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -15,10 +15,9 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo}; use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA}; use crate::heal::heal_ops::{HealEntryFn, HealSequence}; use crate::new_object_layer_fn; +use crate::notification_sys::get_global_notification_sys; use crate::pools::PoolMeta; -use crate::store_api::{ - BackendByte, BackendDisks, BackendInfo, ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO, StorageInfo, -}; +use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO}; use crate::store_err::{ is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found, to_object_err, StorageError, @@ -1120,7 +1119,7 @@ lazy_static! { #[async_trait::async_trait] impl StorageAPI for ECStore { - async fn backend_info(&self) -> BackendInfo { + async fn backend_info(&self) -> madmin::BackendInfo { let (standard_sc_parity, rr_sc_parity) = { if let Some(sc) = GLOBAL_StorageClass.get() { let sc_parity = sc @@ -1151,10 +1150,10 @@ impl StorageAPI for ECStore { drives_per_set.push(*set_count); } - BackendInfo { - backend_type: BackendByte::Erasure, - online_disks: BackendDisks::new(), - offline_disks: BackendDisks::new(), + madmin::BackendInfo { + backend_type: madmin::BackendByte::Erasure, + online_disks: madmin::BackendDisks::new(), + offline_disks: madmin::BackendDisks::new(), standard_sc_data, standard_sc_parity, rr_sc_data, @@ -1164,11 +1163,14 @@ impl StorageAPI for ECStore { ..Default::default() } } - async fn storage_info(&self) -> StorageInfo { - // FIXME: globalNotificationSys.StorageInfo - unimplemented!() + async fn storage_info(&self) -> madmin::StorageInfo { + let Some(notification_sy) = get_global_notification_sys() else { + return madmin::StorageInfo::default(); + }; + + notification_sy.storage_info(self).await } - async fn local_storage_info(&self) -> StorageInfo { + async fn local_storage_info(&self) -> madmin::StorageInfo { let mut futures = Vec::with_capacity(self.pools.len()); for pool in self.pools.iter() { @@ -1184,7 +1186,7 @@ impl StorageAPI for ECStore { } let backend = self.backend_info().await; - StorageInfo { backend, disks } + madmin::StorageInfo { backend, disks } } async fn list_bucket(&self, opts: &BucketOptions) -> Result> { diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index acd8d1dc..8b34fde3 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,4 +1,3 @@ -use crate::heal::heal_commands::HealingDisk; use crate::heal::heal_ops::HealSequence; use crate::{ disk::DiskStore, @@ -9,7 +8,6 @@ use crate::{ }; use futures::StreamExt; use http::HeaderMap; -use madmin::info_commands::DiskMetrics; use rmp_serde::Serializer; use s3s::{dto::StreamingBlob, Body}; use serde::{Deserialize, Serialize}; @@ -804,84 +802,6 @@ pub struct DeletedObject { // pub replication_state: ReplicationState, } -#[derive(Debug, Default, Serialize, Deserialize)] -pub enum BackendByte { - #[default] - Unknown, - FS, - Erasure, -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct StorageDisk { - pub endpoint: String, - pub root_disk: bool, - pub drive_path: String, - pub healing: bool, - pub scanning: bool, - pub state: String, - pub uuid: String, - pub major: u32, - pub minor: u32, - pub model: Option, - pub total_space: u64, - pub used_space: u64, - pub available_space: u64, - pub read_throughput: f64, - pub write_throughput: f64, - pub read_latency: f64, - pub write_latency: f64, - pub utilization: f64, - pub metrics: Option, - pub heal_info: Option, - pub used_inodes: u64, - pub free_inodes: u64, - pub local: bool, - pub pool_index: i32, - pub set_index: i32, - pub disk_index: i32, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct StorageInfo { - pub disks: Vec, - pub backend: BackendInfo, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct BackendDisks(HashMap); - -impl BackendDisks { - pub fn new() -> Self { - Self(HashMap::new()) - } - pub fn sum(&self) -> usize { - self.0.values().sum() - } -} - -#[derive(Debug, Default, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase", default)] -pub struct BackendInfo { - pub backend_type: BackendByte, - pub online_disks: BackendDisks, - pub offline_disks: BackendDisks, - #[serde(rename = "StandardSCData")] - pub standard_sc_data: Vec, - #[serde(rename = "StandardSCParities")] - pub standard_sc_parities: Vec, - #[serde(rename = "StandardSCParity")] - pub standard_sc_parity: Option, - #[serde(rename = "RRSCData")] - pub rr_sc_data: Vec, - #[serde(rename = "RRSCParities")] - pub rr_sc_parities: Vec, - #[serde(rename = "RRSCParity")] - pub rr_sc_parity: Option, - pub total_sets: Vec, - pub drives_per_set: Vec, -} - pub struct ListObjectVersionsInfo { pub is_truncated: bool, pub next_marker: String, @@ -911,9 +831,9 @@ pub trait StorageAPI: ObjectIO { // Shutdown TODO: // NSScanner TODO: - async fn backend_info(&self) -> BackendInfo; - async fn storage_info(&self) -> StorageInfo; - async fn local_storage_info(&self) -> StorageInfo; + async fn backend_info(&self) -> madmin::BackendInfo; + async fn storage_info(&self) -> madmin::StorageInfo; + async fn local_storage_info(&self) -> madmin::StorageInfo; async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index 52d6d015..c2de72e1 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -2,6 +2,7 @@ use crate::error::{Error, Result}; use lazy_static::lazy_static; use std::{ collections::HashSet, + fmt::Display, net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs}, }; @@ -105,27 +106,38 @@ pub(crate) fn must_get_local_ips() -> Result> { } } +#[derive(Debug, Clone)] pub struct XHost { pub name: String, pub port: u16, pub is_port_set: bool, } -impl ToString for XHost { - fn to_string(&self) -> String { +impl Display for XHost { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if !self.is_port_set { - self.name.clone() + write!(f, "{}", self.name) + } else if self.name.contains(':') { + write!(f, "[{}]:{}", self.name, self.port) } else { - join_host_port(&self.name, self.port) + write!(f, "{}:{}", self.name, self.port) } } } -fn join_host_port(host: &str, port: u16) -> String { - if host.contains(':') { - format!("[{}]:{}", host, port) - } else { - format!("{}:{}", host, port) +impl TryFrom for XHost { + type Error = std::io::Error; + + fn try_from(value: String) -> std::result::Result { + if let Some(addr) = value.to_socket_addrs()?.next() { + Ok(Self { + name: addr.ip().to_string(), + port: addr.port(), + is_port_set: addr.port() > 0, + }) + } else { + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "value invalid")) + } } } diff --git a/madmin/Cargo.toml b/madmin/Cargo.toml index 77235f0f..83b54093 100644 --- a/madmin/Cargo.toml +++ b/madmin/Cargo.toml @@ -11,3 +11,4 @@ chrono.workspace = true common.workspace = true psutil = "3.3.0" serde.workspace = true +time.workspace =true diff --git a/madmin/src/info_commands.rs b/madmin/src/info_commands.rs index 17f75e18..ec8f0f27 100644 --- a/madmin/src/info_commands.rs +++ b/madmin/src/info_commands.rs @@ -1,9 +1,36 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::SystemTime}; use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; use crate::metrics::TimedAction; +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum ItemState { + Offline, + Initializing, + Online, +} + +impl ItemState { + pub fn to_string(&self) -> &str { + match self { + ItemState::Offline => "offline", + ItemState::Initializing => "initializing", + ItemState::Online => "online", + } + } + + pub fn from_string(s: &str) -> Option { + match s { + "offline" => Some(ItemState::Offline), + "initializing" => Some(ItemState::Initializing), + "online" => Some(ItemState::Online), + _ => None, + } + } +} + #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct DiskMetrics { pub last_minute: HashMap, @@ -14,3 +41,110 @@ pub struct DiskMetrics { pub total_writes: u64, pub total_deletes: u64, } + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct Disk { + pub endpoint: String, + pub root_disk: bool, + pub drive_path: String, + pub healing: bool, + pub scanning: bool, + pub state: String, + pub uuid: String, + pub major: u32, + pub minor: u32, + pub model: Option, + pub total_space: u64, + pub used_space: u64, + pub available_space: u64, + pub read_throughput: f64, + pub write_throughput: f64, + pub read_latency: f64, + pub write_latency: f64, + pub utilization: f64, + pub metrics: Option, + pub heal_info: Option, + pub used_inodes: u64, + pub free_inodes: u64, + pub local: bool, + pub pool_index: i32, + pub set_index: i32, + pub disk_index: i32, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct HealingDisk { + pub id: String, + pub heal_id: String, + pub pool_index: Option, + pub set_index: Option, + pub disk_index: Option, + pub endpoint: String, + pub path: String, + pub started: Option, + pub last_update: Option, + pub retry_attempts: u64, + pub objects_total_count: u64, + pub objects_total_size: u64, + pub items_healed: u64, + pub items_failed: u64, + pub item_skipped: u64, + pub bytes_done: u64, + pub bytes_failed: u64, + pub bytes_skipped: u64, + pub objects_healed: u64, + pub objects_failed: u64, + pub bucket: String, + pub object: String, + pub queue_buckets: Vec, + pub healed_buckets: Vec, + pub finished: bool, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub enum BackendByte { + #[default] + Unknown, + FS, + Erasure, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct StorageInfo { + pub disks: Vec, + pub backend: BackendInfo, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct BackendDisks(HashMap); + +impl BackendDisks { + pub fn new() -> Self { + Self(HashMap::new()) + } + pub fn sum(&self) -> usize { + self.0.values().sum() + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase", default)] +pub struct BackendInfo { + pub backend_type: BackendByte, + pub online_disks: BackendDisks, + pub offline_disks: BackendDisks, + #[serde(rename = "StandardSCData")] + pub standard_sc_data: Vec, + #[serde(rename = "StandardSCParities")] + pub standard_sc_parities: Vec, + #[serde(rename = "StandardSCParity")] + pub standard_sc_parity: Option, + #[serde(rename = "RRSCData")] + pub rr_sc_data: Vec, + #[serde(rename = "RRSCParities")] + pub rr_sc_parities: Vec, + #[serde(rename = "RRSCParity")] + pub rr_sc_parity: Option, + pub total_sets: Vec, + pub drives_per_set: Vec, +} diff --git a/madmin/src/lib.rs b/madmin/src/lib.rs index 908eb69d..fd416c42 100644 --- a/madmin/src/lib.rs +++ b/madmin/src/lib.rs @@ -2,3 +2,5 @@ pub mod health; pub mod info_commands; pub mod metrics; pub mod net; + +pub use info_commands::*; diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 1419beae..fea05ef4 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -10,6 +10,7 @@ use ecstore::global::GLOBAL_ALlHealState; use ecstore::heal::heal_commands::HealOpts; use ecstore::heal::heal_ops::new_heal_sequence; use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType}; +use ecstore::new_object_layer_fn; use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::store::is_valid_object_prefix; use ecstore::store_api::StorageAPI; @@ -17,7 +18,6 @@ use ecstore::utils::path::path_join; use ecstore::utils::time::parse_duration; use ecstore::utils::xml; use ecstore::GLOBAL_Endpoints; -use ecstore::{new_object_layer_fn, store_api::BackendInfo}; use futures::{Stream, StreamExt}; use http::Uri; use hyper::StatusCode; @@ -37,7 +37,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration as std_Duration; -use std::u64; use time::{Duration, OffsetDateTime}; use tokio::sync::mpsc::{self}; use tokio::time::interval; @@ -128,7 +127,7 @@ impl Operation for AssumeRoleHandle { #[serde(rename_all = "PascalCase", default)] pub struct AccountInfo { pub account_name: String, - pub server: BackendInfo, + pub server: madmin::BackendInfo, pub policy: BucketPolicy, } @@ -224,7 +223,18 @@ impl Operation for StorageInfoHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle StorageInfoHandler"); - return Err(s3_error!(NotImplemented)); + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + // TODO:getAggregatedBackgroundHealState + + let info = store.storage_info().await; + + let output = serde_json::to_string(&info) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } @@ -492,15 +502,11 @@ fn extract_heal_init_params(body: &Bytes, uri: &Uri, params: Params<'_, '_>) -> hip.client_token = value.to_string(); } } - if key == "forceStart" { - if parts.next().is_some() { - hip.force_start = true; - } + if key == "forceStart" && parts.next().is_some() { + hip.force_start = true; } - if key == "forceStop" { - if parts.next().is_some() { - hip.force_stop = true; - } + if key == "forceStop" && parts.next().is_some() { + hip.force_stop = true; } } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e2ba46dc..e0225424 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -8,7 +8,6 @@ use common::{ error::{Error, Result}, globals::set_global_addr, }; -use ecstore::global::set_global_rustfs_port; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::utils::net::{self, get_available_port}; use ecstore::{ @@ -18,6 +17,7 @@ use ecstore::{ store::{init_local_disks, ECStore}, update_erasure_type, }; +use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use grpc::make_server; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -212,10 +212,10 @@ async fn run(opt: config::Opt) -> Result<()> { })?; warn!(" init store success!"); - // new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { - // error!("new_global_notification_sys faild {:?}", &err); - // Error::from_string(err.to_string()) - // })?; + new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { + error!("new_global_notification_sys faild {:?}", &err); + Error::from_string(err.to_string()) + })?; // init scanner init_data_scanner().await;