mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
test StorageInfoHandler
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -764,6 +764,7 @@ dependencies = [
|
||||
"flatbuffers",
|
||||
"lazy_static",
|
||||
"lock",
|
||||
"madmin",
|
||||
"protos",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
@@ -1677,6 +1678,7 @@ dependencies = [
|
||||
"common",
|
||||
"psutil",
|
||||
"serde",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -10,7 +10,10 @@ fn test() {
|
||||
"bbb": "bbb"
|
||||
});
|
||||
|
||||
let jwt_token = encode(b"aaaa", &claims).unwrap();
|
||||
let new_claims = decode(&jwt_token, b"aaaa").unwrap();
|
||||
assert_eq!(new_claims.claims, claims);
|
||||
let jwt_token = encode(b"aaaa", &claims).unwrap_or_default();
|
||||
let new_claims = match decode(&jwt_token, b"aaaa") {
|
||||
Ok(res) => Some(res.claims),
|
||||
Err(_errr) => None,
|
||||
};
|
||||
assert_eq!(new_claims, Some(claims));
|
||||
}
|
||||
|
||||
@@ -20,4 +20,5 @@ serde_json.workspace = true
|
||||
tonic = { version = "0.12.3", features = ["gzip"] }
|
||||
tokio = { workspace = true }
|
||||
tower.workspace = true
|
||||
url.workspace = true
|
||||
url.workspace = true
|
||||
madmin.workspace =true
|
||||
@@ -1,6 +1,6 @@
|
||||
#![cfg(test)]
|
||||
|
||||
use ecstore::{disk::VolumeInfo, store_api::StorageInfo};
|
||||
use ecstore::disk::VolumeInfo;
|
||||
use protos::{
|
||||
models::{PingBody, PingBodyBuilder},
|
||||
node_service_time_out_client,
|
||||
@@ -118,7 +118,7 @@ async fn storage_info() -> Result<(), Box<dyn Error>> {
|
||||
let info = response.storage_info;
|
||||
|
||||
let mut buf = Deserializer::new(Cursor::new(info));
|
||||
let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
|
||||
let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
|
||||
println!("{:?}", storage_info);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -15,12 +15,7 @@ use protos::{
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tonic::Request;
|
||||
|
||||
use crate::{
|
||||
disk::endpoint::Endpoint,
|
||||
global::GLOBAL_Endpoints,
|
||||
new_object_layer_fn,
|
||||
store_api::{StorageAPI, StorageDisk},
|
||||
};
|
||||
use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI};
|
||||
|
||||
pub const ITEM_OFFLINE: &str = "offline";
|
||||
pub const ITEM_INITIALIZING: &str = "initializing";
|
||||
@@ -44,7 +39,7 @@ pub struct ServerProperties {
|
||||
pub version: String,
|
||||
pub commit_id: String,
|
||||
pub network: HashMap<String, String>,
|
||||
pub disks: Vec<StorageDisk>,
|
||||
pub disks: Vec<madmin::Disk>,
|
||||
pub pool_number: i32,
|
||||
pub pool_numbers: Vec<i32>,
|
||||
pub mem_stats: MemStats,
|
||||
|
||||
@@ -105,9 +105,9 @@ pub async fn read_config_without_migrate<S: StorageAPI>(api: Arc<S>) -> Result<C
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
if is_not_found(&err) {
|
||||
warn!("config not found init start");
|
||||
warn!("config not found, start to init");
|
||||
let cfg = new_and_save_server_config(api).await?;
|
||||
warn!("config not found init done");
|
||||
warn!("config init done");
|
||||
return Ok(cfg);
|
||||
} else {
|
||||
error!("read config err {:?}", &err);
|
||||
|
||||
@@ -1291,7 +1291,7 @@ impl DiskAPI for LocalDisk {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) {
|
||||
error!("list_dir err {:?}", &e);
|
||||
info!("list_dir err {:?}", &e);
|
||||
}
|
||||
|
||||
if opts.report_notfound && is_err_file_not_found(&e) {
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{
|
||||
disk::endpoint::{Endpoint, EndpointType},
|
||||
disks_layout::DisksLayout,
|
||||
error::{Error, Result},
|
||||
global::global_rustfs_port,
|
||||
utils::net,
|
||||
utils::net::{self, XHost},
|
||||
};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap, HashSet},
|
||||
@@ -530,20 +532,30 @@ impl EndpointServerPools {
|
||||
|
||||
nodes
|
||||
}
|
||||
pub fn hosts_sorted(&self) -> Vec<()> {
|
||||
pub fn hosts_sorted(&self) -> Vec<Option<XHost>> {
|
||||
let (mut peers, local) = self.peers();
|
||||
|
||||
let mut ret = vec![None; peers.len()];
|
||||
|
||||
peers.sort();
|
||||
|
||||
for peer in peers.iter() {
|
||||
for (i, peer) in peers.iter().enumerate() {
|
||||
if &local == peer {
|
||||
continue;
|
||||
}
|
||||
|
||||
// FIXME:TODO
|
||||
let host = match XHost::try_from(peer.clone()) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!("Xhost parse failed {:?}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
ret[i] = Some(host);
|
||||
}
|
||||
|
||||
unimplemented!()
|
||||
ret
|
||||
}
|
||||
pub fn peers(&self) -> (Vec<String>, String) {
|
||||
let mut local = None;
|
||||
@@ -554,12 +566,8 @@ impl EndpointServerPools {
|
||||
continue;
|
||||
}
|
||||
let host = endpoint.host_port();
|
||||
if endpoint.is_local {
|
||||
if endpoint.url.port() == Some(global_rustfs_port()) {
|
||||
if local.is_none() {
|
||||
local = Some(host.clone());
|
||||
}
|
||||
}
|
||||
if endpoint.is_local && endpoint.url.port() == Some(global_rustfs_port()) && local.is_none() {
|
||||
local = Some(host.clone());
|
||||
}
|
||||
|
||||
set.insert(host);
|
||||
@@ -570,6 +578,28 @@ impl EndpointServerPools {
|
||||
|
||||
(hosts, local.unwrap_or_default())
|
||||
}
|
||||
|
||||
pub fn find_grid_hosts_from_peer(&self, host: &XHost) -> Option<String> {
|
||||
for ep in self.0.iter() {
|
||||
for endpoint in ep.endpoints.0.iter() {
|
||||
if endpoint.is_local {
|
||||
continue;
|
||||
}
|
||||
let xhost = match XHost::try_from(endpoint.host_port()) {
|
||||
Ok(res) => res,
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if xhost.to_string() == host.to_string() {
|
||||
return Some(endpoint.grid_host());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -65,6 +65,14 @@ pub fn set_global_endpoints(eps: Vec<PoolEndpoints>) {
|
||||
.expect("GLOBAL_Endpoints set faild")
|
||||
}
|
||||
|
||||
pub fn get_global_endpoints() -> EndpointServerPools {
|
||||
if let Some(eps) = GLOBAL_Endpoints.get() {
|
||||
eps.clone()
|
||||
} else {
|
||||
EndpointServerPools::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_object_layer_fn() -> Option<Arc<ECStore>> {
|
||||
GLOBAL_OBJECT_API.get().map(|ec| ec.clone())
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use crate::{
|
||||
global::GLOBAL_BackgroundHealState,
|
||||
heal::heal_ops::HEALING_TRACKER_FILENAME,
|
||||
new_object_layer_fn,
|
||||
store_api::{BucketInfo, StorageAPI, StorageDisk},
|
||||
store_api::{BucketInfo, StorageAPI},
|
||||
utils::fs::read_file,
|
||||
};
|
||||
|
||||
@@ -131,35 +131,6 @@ impl Default for HealStartSuccess {
|
||||
|
||||
pub type HealStopSuccess = HealStartSuccess;
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct HealingDisk {
|
||||
pub id: String,
|
||||
pub heal_id: String,
|
||||
pub pool_index: Option<usize>,
|
||||
pub set_index: Option<usize>,
|
||||
pub disk_index: Option<usize>,
|
||||
pub endpoint: String,
|
||||
pub path: String,
|
||||
pub started: Option<OffsetDateTime>,
|
||||
pub last_update: Option<SystemTime>,
|
||||
pub retry_attempts: u64,
|
||||
pub objects_total_count: u64,
|
||||
pub objects_total_size: u64,
|
||||
pub items_healed: u64,
|
||||
pub items_failed: u64,
|
||||
pub item_skipped: u64,
|
||||
pub bytes_done: u64,
|
||||
pub bytes_failed: u64,
|
||||
pub bytes_skipped: u64,
|
||||
pub objects_healed: u64,
|
||||
pub objects_failed: u64,
|
||||
pub bucket: String,
|
||||
pub object: String,
|
||||
pub queue_buckets: Vec<String>,
|
||||
pub healed_buckets: Vec<String>,
|
||||
pub finished: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Serialize)]
|
||||
pub struct HealingTracker {
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
@@ -375,10 +346,10 @@ impl HealingTracker {
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn to_healing_disk(&self) -> HealingDisk {
|
||||
pub async fn to_healing_disk(&self) -> madmin::HealingDisk {
|
||||
let _ = self.mu.read().await;
|
||||
|
||||
HealingDisk {
|
||||
madmin::HealingDisk {
|
||||
id: self.id.clone(),
|
||||
heal_id: self.heal_id.clone(),
|
||||
pool_index: self.pool_index,
|
||||
@@ -516,7 +487,7 @@ pub struct SetStatus {
|
||||
pub heal_status: String,
|
||||
pub heal_priority: String,
|
||||
pub total_objects: usize,
|
||||
pub disks: Vec<StorageDisk>,
|
||||
pub disks: Vec<madmin::Disk>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
|
||||
@@ -3,8 +3,7 @@ use super::{
|
||||
data_scanner::HEAL_DELETE_DANGLING,
|
||||
error::ERR_SKIP_FILE,
|
||||
heal_commands::{
|
||||
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingDisk, HealingTracker,
|
||||
HEAL_ITEM_BUCKET_METADATA,
|
||||
HealItemType, HealOpts, HealResultItem, HealScanMode, HealStopSuccess, HealingTracker, HEAL_ITEM_BUCKET_METADATA,
|
||||
},
|
||||
};
|
||||
use crate::store_api::StorageAPI;
|
||||
@@ -664,7 +663,7 @@ impl AllHealState {
|
||||
self.heal_status.write().await.insert(tracker.id.clone(), tracker.clone());
|
||||
}
|
||||
|
||||
pub async fn get_local_healing_disks(&self) -> HashMap<String, HealingDisk> {
|
||||
pub async fn get_local_healing_disks(&self) -> HashMap<String, madmin::HealingDisk> {
|
||||
let _ = self.mu.read().await;
|
||||
|
||||
let mut dst = HashMap::new();
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use crate::endpoints::EndpointServerPools;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::global::get_global_endpoints;
|
||||
use crate::peer_rest_client::PeerRestClient;
|
||||
use crate::StorageAPI;
|
||||
use futures::future::join_all;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
lazy_static! {
|
||||
@@ -16,9 +19,13 @@ pub async fn new_global_notification_sys(eps: EndpointServerPools) -> Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_global_notification_sys() -> Option<&'static NotificationSys> {
|
||||
GLOBAL_NotificationSys.get()
|
||||
}
|
||||
|
||||
pub struct NotificationSys {
|
||||
pub peer_clients: Vec<PeerRestClient>,
|
||||
pub all_peer_clients: Vec<PeerRestClient>,
|
||||
peer_clients: Vec<Option<PeerRestClient>>,
|
||||
all_peer_clients: Vec<Option<PeerRestClient>>,
|
||||
}
|
||||
|
||||
impl NotificationSys {
|
||||
@@ -50,4 +57,57 @@ impl NotificationSys {
|
||||
pub async fn delete_user(&self) -> Vec<NotificationPeerErr> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
pub async fn storage_info<S: StorageAPI>(&self, api: &S) -> madmin::StorageInfo {
|
||||
let mut futures = Vec::with_capacity(self.peer_clients.len());
|
||||
|
||||
for client in self.peer_clients.iter() {
|
||||
futures.push(async move {
|
||||
if let Some(client) = client {
|
||||
match client.local_storage_info().await {
|
||||
Ok(info) => Some(info),
|
||||
Err(_) => Some(madmin::StorageInfo {
|
||||
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),
|
||||
..Default::default()
|
||||
}),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut replies = join_all(futures).await;
|
||||
|
||||
replies.push(Some(api.local_storage_info().await));
|
||||
|
||||
let mut disks = Vec::new();
|
||||
for info in replies.into_iter().flatten() {
|
||||
disks.extend(info.disks);
|
||||
}
|
||||
|
||||
let backend = api.backend_info().await;
|
||||
madmin::StorageInfo { disks, backend }
|
||||
}
|
||||
}
|
||||
|
||||
fn get_offline_disks(offline_host: &str, endpoints: &EndpointServerPools) -> Vec<madmin::Disk> {
|
||||
let mut offline_disks = Vec::new();
|
||||
|
||||
for pool in endpoints.as_ref() {
|
||||
for ep in pool.endpoints.as_ref() {
|
||||
if (offline_host.is_empty() && ep.is_local) || offline_host == ep.host_port() {
|
||||
offline_disks.push(madmin::Disk {
|
||||
endpoint: ep.to_string(),
|
||||
state: madmin::ItemState::Offline.to_string().to_owned(),
|
||||
pool_index: ep.pool_idx,
|
||||
set_index: ep.set_idx,
|
||||
disk_index: ep.disk_idx,
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
offline_disks
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
global::is_dist_erasure,
|
||||
heal::heal_commands::BgHealState,
|
||||
metrics_realtime::{CollectMetricsOpts, MetricType},
|
||||
store_api::StorageInfo,
|
||||
utils::net::XHost,
|
||||
};
|
||||
use common::error::{Error, Result};
|
||||
use madmin::{
|
||||
@@ -29,35 +29,53 @@ use protos::{
|
||||
use rmp_serde::{Deserializer, Serializer};
|
||||
use serde::{Deserialize, Serialize as _};
|
||||
use tonic::Request;
|
||||
use tracing::warn;
|
||||
|
||||
pub const PEER_RESTSIGNAL: &str = "signal";
|
||||
pub const PEER_RESTSUB_SYS: &str = "sub-sys";
|
||||
pub const PEER_RESTDRY_RUN: &str = "dry-run";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PeerRestClient {
|
||||
addr: String,
|
||||
pub host: XHost,
|
||||
pub grid_host: String,
|
||||
}
|
||||
|
||||
impl PeerRestClient {
|
||||
pub fn new(url: url::Url) -> Self {
|
||||
Self {
|
||||
addr: format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()),
|
||||
}
|
||||
pub fn new(host: XHost, grid_host: String) -> Self {
|
||||
Self { host, grid_host }
|
||||
}
|
||||
pub async fn new_clients(_eps: EndpointServerPools) -> (Vec<Self>, Vec<Self>) {
|
||||
pub async fn new_clients(eps: EndpointServerPools) -> (Vec<Option<Self>>, Vec<Option<Self>>) {
|
||||
if !is_dist_erasure().await {
|
||||
return (Vec::new(), Vec::new());
|
||||
}
|
||||
|
||||
// FIXME:TODO
|
||||
let eps = eps.clone();
|
||||
let hosts = eps.hosts_sorted();
|
||||
let mut remote = vec![None; hosts.len()];
|
||||
let mut all = Vec::with_capacity(hosts.len());
|
||||
for (i, hs_host) in hosts.iter().enumerate() {
|
||||
if let Some(host) = hs_host {
|
||||
if let Some(grid_host) = eps.find_grid_hosts_from_peer(host) {
|
||||
let client = PeerRestClient::new(host.clone(), grid_host);
|
||||
|
||||
todo!()
|
||||
all[i] = Some(client.clone());
|
||||
remote.push(Some(client));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if all.len() != remote.len() + 1 {
|
||||
warn!("Expected number of all hosts ({}) to be remote +1 ({})", all.len(), remote.len());
|
||||
}
|
||||
|
||||
(remote, all)
|
||||
}
|
||||
}
|
||||
|
||||
impl PeerRestClient {
|
||||
pub async fn local_storage_info(&self) -> Result<StorageInfo> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
pub async fn local_storage_info(&self) -> Result<madmin::StorageInfo> {
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LocalStorageInfoRequest { metrics: true });
|
||||
@@ -72,13 +90,13 @@ impl PeerRestClient {
|
||||
let data = response.storage_info;
|
||||
|
||||
let mut buf = Deserializer::new(Cursor::new(data));
|
||||
let storage_info: StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
|
||||
let storage_info: madmin::StorageInfo = Deserialize::deserialize(&mut buf).unwrap();
|
||||
|
||||
Ok(storage_info)
|
||||
}
|
||||
|
||||
pub async fn server_info(&self) -> Result<ServerProperties> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(ServerInfoRequest { metrics: true });
|
||||
@@ -99,7 +117,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_cpus(&self) -> Result<Cpus> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetCpusRequest {});
|
||||
@@ -120,7 +138,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_net_info(&self) -> Result<NetInfo> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetNetInfoRequest {});
|
||||
@@ -141,7 +159,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_partitions(&self) -> Result<Partitions> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetPartitionsRequest {});
|
||||
@@ -162,7 +180,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_os_info(&self) -> Result<OsInfo> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetOsInfoRequest {});
|
||||
@@ -183,7 +201,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_se_linux_info(&self) -> Result<SysService> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetSeLinuxInfoRequest {});
|
||||
@@ -204,7 +222,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_sys_config(&self) -> Result<SysConfig> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetSysConfigRequest {});
|
||||
@@ -225,7 +243,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_sys_errors(&self) -> Result<SysErrors> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetSysErrorsRequest {});
|
||||
@@ -246,7 +264,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_mem_info(&self) -> Result<MemInfo> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetMemInfoRequest {});
|
||||
@@ -267,7 +285,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result<RealtimeMetrics> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let mut buf_t = Vec::new();
|
||||
@@ -295,7 +313,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_proc_info(&self) -> Result<ProcInfo> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(GetProcInfoRequest {});
|
||||
@@ -316,7 +334,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn start_profiling(&self, profiler: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(StartProfilingRequest {
|
||||
@@ -350,7 +368,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadBucketMetadataRequest {
|
||||
@@ -368,7 +386,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(DeleteBucketMetadataRequest {
|
||||
@@ -386,7 +404,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn delete_policy(&self, policy: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(DeletePolicyRequest {
|
||||
@@ -404,7 +422,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_policy(&self, policy: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadPolicyRequest {
|
||||
@@ -422,7 +440,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadPolicyMappingRequest {
|
||||
@@ -442,7 +460,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn delete_user(&self, access_key: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(DeleteUserRequest {
|
||||
@@ -460,7 +478,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn delete_service_account(&self, access_key: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(DeleteServiceAccountRequest {
|
||||
@@ -478,7 +496,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadUserRequest {
|
||||
@@ -497,7 +515,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_service_account(&self, access_key: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadServiceAccountRequest {
|
||||
@@ -515,7 +533,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_group(&self, group: &str) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadGroupRequest {
|
||||
@@ -533,7 +551,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn reload_site_replication_config(&self) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(ReloadSiteReplicationConfigRequest {});
|
||||
@@ -549,7 +567,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let mut vars = HashMap::new();
|
||||
@@ -571,7 +589,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn background_heal_status(&self) -> Result<BgHealState> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(BackgroundHealStatusRequest {});
|
||||
@@ -592,21 +610,21 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn get_metacache_listing(&self) -> Result<()> {
|
||||
let mut _client = node_service_time_out_client(&self.addr)
|
||||
let mut _client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn update_metacache_listing(&self) -> Result<()> {
|
||||
let mut _client = node_service_time_out_client(&self.addr)
|
||||
let mut _client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn reload_pool_meta(&self) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(ReloadPoolMetaRequest {});
|
||||
@@ -623,7 +641,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn stop_rebalance(&self) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(StopRebalanceRequest {});
|
||||
@@ -640,7 +658,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadRebalanceMetaRequest { start_rebalance });
|
||||
@@ -657,7 +675,7 @@ impl PeerRestClient {
|
||||
}
|
||||
|
||||
pub async fn load_transition_tier_config(&self) -> Result<()> {
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
let mut client = node_service_time_out_client(&self.grid_host)
|
||||
.await
|
||||
.map_err(|err| Error::msg(err.to_string()))?;
|
||||
let request = Request::new(LoadTransitionTierConfigRequest {});
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::heal::heal_commands::HealOpts;
|
||||
use crate::new_object_layer_fn;
|
||||
use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI, StorageDisk, StorageInfo};
|
||||
use crate::store_api::{BucketOptions, MakeBucketOptions, StorageAPI};
|
||||
use crate::store_err::{is_err_bucket_exists, StorageError};
|
||||
use crate::utils::path::{path_join, SLASH_SEPARATOR};
|
||||
use crate::{sets::Sets, store::ECStore};
|
||||
@@ -670,7 +670,7 @@ impl ECStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize {
|
||||
fn get_total_usable_capacity(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize {
|
||||
let mut capacity = 0;
|
||||
for disk in disks.iter() {
|
||||
if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize {
|
||||
@@ -683,7 +683,7 @@ fn get_total_usable_capacity(disks: &[StorageDisk], info: &StorageInfo) -> usize
|
||||
capacity
|
||||
}
|
||||
|
||||
fn get_total_usable_capacity_free(disks: &[StorageDisk], info: &StorageInfo) -> usize {
|
||||
fn get_total_usable_capacity_free(disks: &[madmin::Disk], info: &madmin::StorageInfo) -> usize {
|
||||
let mut capacity = 0;
|
||||
for disk in disks.iter() {
|
||||
if disk.pool_index < 0 || info.backend.standard_sc_data.len() <= disk.pool_index as usize {
|
||||
|
||||
@@ -38,10 +38,9 @@ use crate::{
|
||||
},
|
||||
quorum::{object_op_ignored_errs, reduce_read_quorum_errs, reduce_write_quorum_errs, QuorumError},
|
||||
store_api::{
|
||||
BackendByte, BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo,
|
||||
GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo,
|
||||
MultipartUploadResult, ObjectIO, ObjectInfo, ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader,
|
||||
RawFileInfo, StorageAPI, StorageDisk, StorageInfo, DEFAULT_BITROT_ALGO,
|
||||
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, FileInfo, GetObjectReader, HTTPRangeSpec,
|
||||
ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, ObjectIO, ObjectInfo,
|
||||
ObjectOptions, ObjectPartInfo, ObjectToDelete, PartInfo, PutObjReader, RawFileInfo, StorageAPI, DEFAULT_BITROT_ALGO,
|
||||
},
|
||||
store_err::{to_object_err, StorageError},
|
||||
store_init::{load_format_erasure, ErasureError},
|
||||
@@ -3590,15 +3589,15 @@ impl ObjectIO for SetDisks {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for SetDisks {
|
||||
async fn backend_info(&self) -> BackendInfo {
|
||||
async fn backend_info(&self) -> madmin::BackendInfo {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn storage_info(&self) -> StorageInfo {
|
||||
async fn storage_info(&self) -> madmin::StorageInfo {
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
get_storage_info(&disks, &self.set_endpoints).await
|
||||
}
|
||||
async fn local_storage_info(&self) -> StorageInfo {
|
||||
async fn local_storage_info(&self) -> madmin::StorageInfo {
|
||||
let disks = self.get_disks_internal().await;
|
||||
|
||||
let mut local_disks: Vec<Option<Arc<crate::disk::Disk>>> = Vec::new();
|
||||
@@ -4941,13 +4940,13 @@ pub fn should_heal_object_on_disk(
|
||||
(false, err.clone())
|
||||
}
|
||||
|
||||
async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<StorageDisk> {
|
||||
async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<madmin::Disk> {
|
||||
let mut ret = Vec::new();
|
||||
|
||||
for (i, pool) in disks.iter().enumerate() {
|
||||
if let Some(disk) = pool {
|
||||
match disk.disk_info(&DiskInfoOptions::default()).await {
|
||||
Ok(res) => ret.push(StorageDisk {
|
||||
Ok(res) => ret.push(madmin::Disk {
|
||||
endpoint: eps[i].to_string(),
|
||||
local: eps[i].is_local,
|
||||
pool_index: eps[i].pool_idx,
|
||||
@@ -4978,7 +4977,7 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<St
|
||||
free_inodes: res.free_inodes,
|
||||
..Default::default()
|
||||
}),
|
||||
Err(err) => ret.push(StorageDisk {
|
||||
Err(err) => ret.push(madmin::Disk {
|
||||
state: err.to_string(),
|
||||
endpoint: eps[i].to_string(),
|
||||
local: eps[i].is_local,
|
||||
@@ -4989,7 +4988,7 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<St
|
||||
}),
|
||||
}
|
||||
} else {
|
||||
ret.push(StorageDisk {
|
||||
ret.push(madmin::Disk {
|
||||
endpoint: eps[i].to_string(),
|
||||
local: eps[i].is_local,
|
||||
pool_index: eps[i].pool_idx,
|
||||
@@ -5003,14 +5002,14 @@ async fn get_disks_info(disks: &[Option<DiskStore>], eps: &[Endpoint]) -> Vec<St
|
||||
|
||||
ret
|
||||
}
|
||||
async fn get_storage_info(disks: &Vec<Option<DiskStore>>, eps: &Vec<Endpoint>) -> StorageInfo {
|
||||
async fn get_storage_info(disks: &Vec<Option<DiskStore>>, eps: &Vec<Endpoint>) -> madmin::StorageInfo {
|
||||
let mut disks = get_disks_info(disks, eps).await;
|
||||
disks.sort_by(|a, b| a.total_space.cmp(&b.total_space));
|
||||
|
||||
StorageInfo {
|
||||
madmin::StorageInfo {
|
||||
disks,
|
||||
backend: BackendInfo {
|
||||
backend_type: BackendByte::Erasure,
|
||||
backend: madmin::BackendInfo {
|
||||
backend_type: madmin::BackendByte::Erasure,
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
|
||||
@@ -23,9 +23,9 @@ use crate::{
|
||||
},
|
||||
set_disk::SetDisks,
|
||||
store_api::{
|
||||
BackendInfo, BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
|
||||
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
|
||||
ListMultipartsInfo, ListObjectVersionsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult,
|
||||
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, StorageInfo,
|
||||
ObjectIO, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
|
||||
},
|
||||
store_init::{check_format_erasure_values, get_format_erasure_in_quorum, load_format_erasure_all, save_format_file},
|
||||
utils::hash,
|
||||
@@ -294,10 +294,10 @@ impl ObjectIO for Sets {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for Sets {
|
||||
async fn backend_info(&self) -> BackendInfo {
|
||||
async fn backend_info(&self) -> madmin::BackendInfo {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn storage_info(&self) -> StorageInfo {
|
||||
async fn storage_info(&self) -> madmin::StorageInfo {
|
||||
let mut futures = Vec::with_capacity(self.disk_set.len());
|
||||
|
||||
for set in self.disk_set.iter() {
|
||||
@@ -312,12 +312,12 @@ impl StorageAPI for Sets {
|
||||
disks.extend_from_slice(&res.disks);
|
||||
}
|
||||
|
||||
StorageInfo {
|
||||
madmin::StorageInfo {
|
||||
disks,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
async fn local_storage_info(&self) -> StorageInfo {
|
||||
async fn local_storage_info(&self) -> madmin::StorageInfo {
|
||||
let mut futures = Vec::with_capacity(self.disk_set.len());
|
||||
|
||||
for set in self.disk_set.iter() {
|
||||
@@ -331,7 +331,7 @@ impl StorageAPI for Sets {
|
||||
for res in results.into_iter() {
|
||||
disks.extend_from_slice(&res.disks);
|
||||
}
|
||||
StorageInfo {
|
||||
madmin::StorageInfo {
|
||||
disks,
|
||||
..Default::default()
|
||||
}
|
||||
|
||||
@@ -15,10 +15,9 @@ use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
|
||||
use crate::heal::heal_commands::{HealOpts, HealResultItem, HealScanMode, HEAL_ITEM_METADATA};
|
||||
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
|
||||
use crate::new_object_layer_fn;
|
||||
use crate::notification_sys::get_global_notification_sys;
|
||||
use crate::pools::PoolMeta;
|
||||
use crate::store_api::{
|
||||
BackendByte, BackendDisks, BackendInfo, ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO, StorageInfo,
|
||||
};
|
||||
use crate::store_api::{ListMultipartsInfo, ListObjectVersionsInfo, MultipartInfo, ObjectIO};
|
||||
use crate::store_err::{
|
||||
is_err_bucket_exists, is_err_invalid_upload_id, is_err_object_not_found, is_err_read_quorum, is_err_version_not_found,
|
||||
to_object_err, StorageError,
|
||||
@@ -1120,7 +1119,7 @@ lazy_static! {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for ECStore {
|
||||
async fn backend_info(&self) -> BackendInfo {
|
||||
async fn backend_info(&self) -> madmin::BackendInfo {
|
||||
let (standard_sc_parity, rr_sc_parity) = {
|
||||
if let Some(sc) = GLOBAL_StorageClass.get() {
|
||||
let sc_parity = sc
|
||||
@@ -1151,10 +1150,10 @@ impl StorageAPI for ECStore {
|
||||
drives_per_set.push(*set_count);
|
||||
}
|
||||
|
||||
BackendInfo {
|
||||
backend_type: BackendByte::Erasure,
|
||||
online_disks: BackendDisks::new(),
|
||||
offline_disks: BackendDisks::new(),
|
||||
madmin::BackendInfo {
|
||||
backend_type: madmin::BackendByte::Erasure,
|
||||
online_disks: madmin::BackendDisks::new(),
|
||||
offline_disks: madmin::BackendDisks::new(),
|
||||
standard_sc_data,
|
||||
standard_sc_parity,
|
||||
rr_sc_data,
|
||||
@@ -1164,11 +1163,14 @@ impl StorageAPI for ECStore {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
async fn storage_info(&self) -> StorageInfo {
|
||||
// FIXME: globalNotificationSys.StorageInfo
|
||||
unimplemented!()
|
||||
async fn storage_info(&self) -> madmin::StorageInfo {
|
||||
let Some(notification_sy) = get_global_notification_sys() else {
|
||||
return madmin::StorageInfo::default();
|
||||
};
|
||||
|
||||
notification_sy.storage_info(self).await
|
||||
}
|
||||
async fn local_storage_info(&self) -> StorageInfo {
|
||||
async fn local_storage_info(&self) -> madmin::StorageInfo {
|
||||
let mut futures = Vec::with_capacity(self.pools.len());
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
@@ -1184,7 +1186,7 @@ impl StorageAPI for ECStore {
|
||||
}
|
||||
|
||||
let backend = self.backend_info().await;
|
||||
StorageInfo { backend, disks }
|
||||
madmin::StorageInfo { backend, disks }
|
||||
}
|
||||
|
||||
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::heal::heal_commands::HealingDisk;
|
||||
use crate::heal::heal_ops::HealSequence;
|
||||
use crate::{
|
||||
disk::DiskStore,
|
||||
@@ -9,7 +8,6 @@ use crate::{
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use madmin::info_commands::DiskMetrics;
|
||||
use rmp_serde::Serializer;
|
||||
use s3s::{dto::StreamingBlob, Body};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -804,84 +802,6 @@ pub struct DeletedObject {
|
||||
// pub replication_state: ReplicationState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub enum BackendByte {
|
||||
#[default]
|
||||
Unknown,
|
||||
FS,
|
||||
Erasure,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
|
||||
pub struct StorageDisk {
|
||||
pub endpoint: String,
|
||||
pub root_disk: bool,
|
||||
pub drive_path: String,
|
||||
pub healing: bool,
|
||||
pub scanning: bool,
|
||||
pub state: String,
|
||||
pub uuid: String,
|
||||
pub major: u32,
|
||||
pub minor: u32,
|
||||
pub model: Option<String>,
|
||||
pub total_space: u64,
|
||||
pub used_space: u64,
|
||||
pub available_space: u64,
|
||||
pub read_throughput: f64,
|
||||
pub write_throughput: f64,
|
||||
pub read_latency: f64,
|
||||
pub write_latency: f64,
|
||||
pub utilization: f64,
|
||||
pub metrics: Option<DiskMetrics>,
|
||||
pub heal_info: Option<HealingDisk>,
|
||||
pub used_inodes: u64,
|
||||
pub free_inodes: u64,
|
||||
pub local: bool,
|
||||
pub pool_index: i32,
|
||||
pub set_index: i32,
|
||||
pub disk_index: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub struct StorageInfo {
|
||||
pub disks: Vec<StorageDisk>,
|
||||
pub backend: BackendInfo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub struct BackendDisks(HashMap<String, usize>);
|
||||
|
||||
impl BackendDisks {
|
||||
pub fn new() -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
pub fn sum(&self) -> usize {
|
||||
self.0.values().sum()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "PascalCase", default)]
|
||||
pub struct BackendInfo {
|
||||
pub backend_type: BackendByte,
|
||||
pub online_disks: BackendDisks,
|
||||
pub offline_disks: BackendDisks,
|
||||
#[serde(rename = "StandardSCData")]
|
||||
pub standard_sc_data: Vec<usize>,
|
||||
#[serde(rename = "StandardSCParities")]
|
||||
pub standard_sc_parities: Vec<usize>,
|
||||
#[serde(rename = "StandardSCParity")]
|
||||
pub standard_sc_parity: Option<usize>,
|
||||
#[serde(rename = "RRSCData")]
|
||||
pub rr_sc_data: Vec<usize>,
|
||||
#[serde(rename = "RRSCParities")]
|
||||
pub rr_sc_parities: Vec<usize>,
|
||||
#[serde(rename = "RRSCParity")]
|
||||
pub rr_sc_parity: Option<usize>,
|
||||
pub total_sets: Vec<usize>,
|
||||
pub drives_per_set: Vec<usize>,
|
||||
}
|
||||
|
||||
pub struct ListObjectVersionsInfo {
|
||||
pub is_truncated: bool,
|
||||
pub next_marker: String,
|
||||
@@ -911,9 +831,9 @@ pub trait StorageAPI: ObjectIO {
|
||||
// Shutdown TODO:
|
||||
// NSScanner TODO:
|
||||
|
||||
async fn backend_info(&self) -> BackendInfo;
|
||||
async fn storage_info(&self) -> StorageInfo;
|
||||
async fn local_storage_info(&self) -> StorageInfo;
|
||||
async fn backend_info(&self) -> madmin::BackendInfo;
|
||||
async fn storage_info(&self) -> madmin::StorageInfo;
|
||||
async fn local_storage_info(&self) -> madmin::StorageInfo;
|
||||
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
|
||||
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::error::{Error, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
fmt::Display,
|
||||
net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs},
|
||||
};
|
||||
|
||||
@@ -105,27 +106,38 @@ pub(crate) fn must_get_local_ips() -> Result<Vec<IpAddr>> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct XHost {
|
||||
pub name: String,
|
||||
pub port: u16,
|
||||
pub is_port_set: bool,
|
||||
}
|
||||
|
||||
impl ToString for XHost {
|
||||
fn to_string(&self) -> String {
|
||||
impl Display for XHost {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if !self.is_port_set {
|
||||
self.name.clone()
|
||||
write!(f, "{}", self.name)
|
||||
} else if self.name.contains(':') {
|
||||
write!(f, "[{}]:{}", self.name, self.port)
|
||||
} else {
|
||||
join_host_port(&self.name, self.port)
|
||||
write!(f, "{}:{}", self.name, self.port)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn join_host_port(host: &str, port: u16) -> String {
|
||||
if host.contains(':') {
|
||||
format!("[{}]:{}", host, port)
|
||||
} else {
|
||||
format!("{}:{}", host, port)
|
||||
impl TryFrom<String> for XHost {
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
|
||||
if let Some(addr) = value.to_socket_addrs()?.next() {
|
||||
Ok(Self {
|
||||
name: addr.ip().to_string(),
|
||||
port: addr.port(),
|
||||
is_port_set: addr.port() > 0,
|
||||
})
|
||||
} else {
|
||||
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "value invalid"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,3 +11,4 @@ chrono.workspace = true
|
||||
common.workspace = true
|
||||
psutil = "3.3.0"
|
||||
serde.workspace = true
|
||||
time.workspace =true
|
||||
|
||||
@@ -1,9 +1,36 @@
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashMap, time::SystemTime};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::metrics::TimedAction;
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
pub enum ItemState {
|
||||
Offline,
|
||||
Initializing,
|
||||
Online,
|
||||
}
|
||||
|
||||
impl ItemState {
|
||||
pub fn to_string(&self) -> &str {
|
||||
match self {
|
||||
ItemState::Offline => "offline",
|
||||
ItemState::Initializing => "initializing",
|
||||
ItemState::Online => "online",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_string(s: &str) -> Option<ItemState> {
|
||||
match s {
|
||||
"offline" => Some(ItemState::Offline),
|
||||
"initializing" => Some(ItemState::Initializing),
|
||||
"online" => Some(ItemState::Online),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct DiskMetrics {
|
||||
pub last_minute: HashMap<String, TimedAction>,
|
||||
@@ -14,3 +41,110 @@ pub struct DiskMetrics {
|
||||
pub total_writes: u64,
|
||||
pub total_deletes: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
|
||||
pub struct Disk {
|
||||
pub endpoint: String,
|
||||
pub root_disk: bool,
|
||||
pub drive_path: String,
|
||||
pub healing: bool,
|
||||
pub scanning: bool,
|
||||
pub state: String,
|
||||
pub uuid: String,
|
||||
pub major: u32,
|
||||
pub minor: u32,
|
||||
pub model: Option<String>,
|
||||
pub total_space: u64,
|
||||
pub used_space: u64,
|
||||
pub available_space: u64,
|
||||
pub read_throughput: f64,
|
||||
pub write_throughput: f64,
|
||||
pub read_latency: f64,
|
||||
pub write_latency: f64,
|
||||
pub utilization: f64,
|
||||
pub metrics: Option<DiskMetrics>,
|
||||
pub heal_info: Option<HealingDisk>,
|
||||
pub used_inodes: u64,
|
||||
pub free_inodes: u64,
|
||||
pub local: bool,
|
||||
pub pool_index: i32,
|
||||
pub set_index: i32,
|
||||
pub disk_index: i32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct HealingDisk {
|
||||
pub id: String,
|
||||
pub heal_id: String,
|
||||
pub pool_index: Option<usize>,
|
||||
pub set_index: Option<usize>,
|
||||
pub disk_index: Option<usize>,
|
||||
pub endpoint: String,
|
||||
pub path: String,
|
||||
pub started: Option<OffsetDateTime>,
|
||||
pub last_update: Option<SystemTime>,
|
||||
pub retry_attempts: u64,
|
||||
pub objects_total_count: u64,
|
||||
pub objects_total_size: u64,
|
||||
pub items_healed: u64,
|
||||
pub items_failed: u64,
|
||||
pub item_skipped: u64,
|
||||
pub bytes_done: u64,
|
||||
pub bytes_failed: u64,
|
||||
pub bytes_skipped: u64,
|
||||
pub objects_healed: u64,
|
||||
pub objects_failed: u64,
|
||||
pub bucket: String,
|
||||
pub object: String,
|
||||
pub queue_buckets: Vec<String>,
|
||||
pub healed_buckets: Vec<String>,
|
||||
pub finished: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub enum BackendByte {
|
||||
#[default]
|
||||
Unknown,
|
||||
FS,
|
||||
Erasure,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub struct StorageInfo {
|
||||
pub disks: Vec<Disk>,
|
||||
pub backend: BackendInfo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub struct BackendDisks(HashMap<String, usize>);
|
||||
|
||||
impl BackendDisks {
|
||||
pub fn new() -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
pub fn sum(&self) -> usize {
|
||||
self.0.values().sum()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "PascalCase", default)]
|
||||
pub struct BackendInfo {
|
||||
pub backend_type: BackendByte,
|
||||
pub online_disks: BackendDisks,
|
||||
pub offline_disks: BackendDisks,
|
||||
#[serde(rename = "StandardSCData")]
|
||||
pub standard_sc_data: Vec<usize>,
|
||||
#[serde(rename = "StandardSCParities")]
|
||||
pub standard_sc_parities: Vec<usize>,
|
||||
#[serde(rename = "StandardSCParity")]
|
||||
pub standard_sc_parity: Option<usize>,
|
||||
#[serde(rename = "RRSCData")]
|
||||
pub rr_sc_data: Vec<usize>,
|
||||
#[serde(rename = "RRSCParities")]
|
||||
pub rr_sc_parities: Vec<usize>,
|
||||
#[serde(rename = "RRSCParity")]
|
||||
pub rr_sc_parity: Option<usize>,
|
||||
pub total_sets: Vec<usize>,
|
||||
pub drives_per_set: Vec<usize>,
|
||||
}
|
||||
|
||||
@@ -2,3 +2,5 @@ pub mod health;
|
||||
pub mod info_commands;
|
||||
pub mod metrics;
|
||||
pub mod net;
|
||||
|
||||
pub use info_commands::*;
|
||||
|
||||
@@ -10,6 +10,7 @@ use ecstore::global::GLOBAL_ALlHealState;
|
||||
use ecstore::heal::heal_commands::HealOpts;
|
||||
use ecstore::heal::heal_ops::new_heal_sequence;
|
||||
use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType};
|
||||
use ecstore::new_object_layer_fn;
|
||||
use ecstore::peer::is_reserved_or_invalid_bucket;
|
||||
use ecstore::store::is_valid_object_prefix;
|
||||
use ecstore::store_api::StorageAPI;
|
||||
@@ -17,7 +18,6 @@ use ecstore::utils::path::path_join;
|
||||
use ecstore::utils::time::parse_duration;
|
||||
use ecstore::utils::xml;
|
||||
use ecstore::GLOBAL_Endpoints;
|
||||
use ecstore::{new_object_layer_fn, store_api::BackendInfo};
|
||||
use futures::{Stream, StreamExt};
|
||||
use http::Uri;
|
||||
use hyper::StatusCode;
|
||||
@@ -37,7 +37,6 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration as std_Duration;
|
||||
use std::u64;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
use tokio::sync::mpsc::{self};
|
||||
use tokio::time::interval;
|
||||
@@ -128,7 +127,7 @@ impl Operation for AssumeRoleHandle {
|
||||
#[serde(rename_all = "PascalCase", default)]
|
||||
pub struct AccountInfo {
|
||||
pub account_name: String,
|
||||
pub server: BackendInfo,
|
||||
pub server: madmin::BackendInfo,
|
||||
pub policy: BucketPolicy,
|
||||
}
|
||||
|
||||
@@ -224,7 +223,18 @@ impl Operation for StorageInfoHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle StorageInfoHandler");
|
||||
|
||||
return Err(s3_error!(NotImplemented));
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
// TODO:getAggregatedBackgroundHealState
|
||||
|
||||
let info = store.storage_info().await;
|
||||
|
||||
let output = serde_json::to_string(&info)
|
||||
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse accountInfo failed"))?;
|
||||
|
||||
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -492,15 +502,11 @@ fn extract_heal_init_params(body: &Bytes, uri: &Uri, params: Params<'_, '_>) ->
|
||||
hip.client_token = value.to_string();
|
||||
}
|
||||
}
|
||||
if key == "forceStart" {
|
||||
if parts.next().is_some() {
|
||||
hip.force_start = true;
|
||||
}
|
||||
if key == "forceStart" && parts.next().is_some() {
|
||||
hip.force_start = true;
|
||||
}
|
||||
if key == "forceStop" {
|
||||
if parts.next().is_some() {
|
||||
hip.force_stop = true;
|
||||
}
|
||||
if key == "forceStop" && parts.next().is_some() {
|
||||
hip.force_stop = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use common::{
|
||||
error::{Error, Result},
|
||||
globals::set_global_addr,
|
||||
};
|
||||
use ecstore::global::set_global_rustfs_port;
|
||||
use ecstore::heal::background_heal_ops::init_auto_heal;
|
||||
use ecstore::utils::net::{self, get_available_port};
|
||||
use ecstore::{
|
||||
@@ -18,6 +17,7 @@ use ecstore::{
|
||||
store::{init_local_disks, ECStore},
|
||||
update_erasure_type,
|
||||
};
|
||||
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
|
||||
use grpc::make_server;
|
||||
use hyper_util::{
|
||||
rt::{TokioExecutor, TokioIo},
|
||||
@@ -212,10 +212,10 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
})?;
|
||||
warn!(" init store success!");
|
||||
|
||||
// new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
|
||||
// error!("new_global_notification_sys faild {:?}", &err);
|
||||
// Error::from_string(err.to_string())
|
||||
// })?;
|
||||
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
|
||||
error!("new_global_notification_sys faild {:?}", &err);
|
||||
Error::from_string(err.to_string())
|
||||
})?;
|
||||
|
||||
// init scanner
|
||||
init_data_scanner().await;
|
||||
|
||||
Reference in New Issue
Block a user