From d25d233cdcf69985a064dc1ff106a77ad111e2df Mon Sep 17 00:00:00 2001 From: weisd Date: Mon, 6 Jan 2025 15:04:36 +0800 Subject: [PATCH] fix #110 #112 add ServerInfoHandler need test --- ecstore/src/admin_server_info.rs | 270 +++++++++++++++++++++++---- ecstore/src/bucket/metadata.rs | 2 +- ecstore/src/bucket/metadata_sys.rs | 22 +-- ecstore/src/config/common.rs | 7 +- ecstore/src/config/error.rs | 2 +- ecstore/src/heal/data_usage.rs | 83 +++++++- ecstore/src/heal/data_usage_cache.rs | 68 ++++--- ecstore/src/notification_sys.rs | 28 ++- ecstore/src/peer_rest_client.rs | 5 +- ecstore/src/set_disk.rs | 5 +- iam/src/store/object.rs | 8 +- madmin/src/info_commands.rs | 176 ++++++++++++++++- rustfs/src/admin/handlers.rs | 24 ++- rustfs/src/admin/handlers/user.rs | 21 +++ rustfs/src/admin/mod.rs | 47 ++++- rustfs/src/admin/test.json | 232 +++++++++++++++++++++++ 16 files changed, 889 insertions(+), 111 deletions(-) create mode 100644 rustfs/src/admin/handlers/user.rs create mode 100644 rustfs/src/admin/test.json diff --git a/ecstore/src/admin_server_info.rs b/ecstore/src/admin_server_info.rs index 6ecadff2..0f3f144d 100644 --- a/ecstore/src/admin_server_info.rs +++ b/ecstore/src/admin_server_info.rs @@ -1,53 +1,64 @@ -use std::{ - collections::{HashMap, HashSet}, - time::{SystemTime, UNIX_EPOCH}, +use crate::{ + disk::endpoint::Endpoint, + global::GLOBAL_Endpoints, + heal::{ + data_usage::{load_data_usage_from_backend, DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT}, + data_usage_cache::DataUsageCache, + heal_commands::{DRIVE_STATE_OK, DRIVE_STATE_UNFORMATTED}, + }, + new_object_layer_fn, + notification_sys::get_global_notification_sys, + store_api::StorageAPI, }; - use common::{ error::{Error, Result}, globals::GLOBAL_Local_Node_Name, }; +use madmin::{BackendDisks, Disk, ErasureSetInfo, InfoMessage, ServerProperties, ITEM_INITIALIZING, ITEM_OFFLINE, ITEM_ONLINE}; use protos::{ models::{PingBody, PingBodyBuilder}, node_service_time_out_client, proto_gen::node_service::{PingRequest, PingResponse}, }; -use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, HashSet}, + time::{SystemTime, UNIX_EPOCH}, +}; +use time::OffsetDateTime; use tonic::Request; +use tracing::warn; -use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI}; +// pub const ITEM_OFFLINE: &str = "offline"; +// pub const ITEM_INITIALIZING: &str = "initializing"; +// pub const ITEM_ONLINE: &str = "online"; -pub const ITEM_OFFLINE: &str = "offline"; -pub const ITEM_INITIALIZING: &str = "initializing"; -pub const ITEM_ONLINE: &str = "online"; +// #[derive(Debug, Default, Serialize, Deserialize)] +// pub struct MemStats { +// alloc: u64, +// total_alloc: u64, +// mallocs: u64, +// frees: u64, +// heap_alloc: u64, +// } -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct MemStats { - alloc: u64, - total_alloc: u64, - mallocs: u64, - frees: u64, - heap_alloc: u64, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct ServerProperties { - pub state: String, - pub endpoint: String, - pub scheme: String, - pub uptime: u64, - pub version: String, - pub commit_id: String, - pub network: HashMap, - pub disks: Vec, - pub pool_number: i32, - pub pool_numbers: Vec, - pub mem_stats: MemStats, - pub max_procs: u64, - pub num_cpu: u64, - pub runtime_version: String, - pub rustfs_env_vars: HashMap, -} +// #[derive(Debug, Default, Serialize, Deserialize)] +// pub struct ServerProperties { +// pub state: String, +// pub endpoint: String, +// pub scheme: String, +// pub uptime: u64, +// pub version: String, +// pub commit_id: String, +// pub network: HashMap, +// pub disks: Vec, +// pub pool_number: i32, +// pub pool_numbers: Vec, +// pub mem_stats: MemStats, +// pub max_procs: u64, +// pub num_cpu: u64, +// pub runtime_version: String, +// pub rustfs_env_vars: HashMap, +// } async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> { let addr = format!( @@ -139,7 +150,7 @@ pub async fn get_local_server_property() -> ServerProperties { } props.pool_numbers.sort(); props.pool_number = if props.pool_numbers.len() == 1 { - props.pool_numbers[1] + props.pool_numbers[0] } else { i32::MAX }; @@ -160,3 +171,188 @@ pub async fn get_local_server_property() -> ServerProperties { props } + +pub async fn get_server_info(get_pools: bool) -> InfoMessage { + let nowt: OffsetDateTime = OffsetDateTime::now_utc(); + + warn!("get_server_info start {:?}", nowt); + + let local = get_local_server_property().await; + + let after1 = OffsetDateTime::now_utc(); + + warn!("get_local_server_property end {:?}", after1 - nowt); + + let mut servers = { + if let Some(sys) = get_global_notification_sys() { + sys.server_info().await + } else { + vec![] + } + }; + + let after2 = OffsetDateTime::now_utc(); + + warn!("server_info end {:?}", after2 - after1); + servers.push(local); + + let mut buckets = madmin::Buckets::default(); + let mut objects = madmin::Objects::default(); + let mut versions = madmin::Versions::default(); + let mut delete_markers = madmin::DeleteMarkers::default(); + let mut usage = madmin::Usage::default(); + let mut mode = madmin::ITEM_INITIALIZING; + let mut backend = madmin::ErasureBackend::default(); + let mut pools: HashMap> = HashMap::new(); + + if let Some(store) = new_object_layer_fn() { + mode = madmin::ITEM_ONLINE; + match load_data_usage_from_backend(store.clone()).await { + Ok(res) => { + buckets.count = res.buckets_count; + objects.count = res.objects_total_count; + versions.count = res.versions_total_count; + delete_markers.count = res.delete_markers_total_count; + usage.size = res.objects_total_size; + } + Err(err) => { + buckets.error = Some(err.to_string()); + objects.error = Some(err.to_string()); + versions.error = Some(err.to_string()); + delete_markers.error = Some(err.to_string()); + usage.error = Some(err.to_string()); + } + } + + let after3 = OffsetDateTime::now_utc(); + + warn!("load_data_usage_from_backend end {:?}", after3 - after2); + + let backen_info = store.clone().backend_info().await; + + let after4 = OffsetDateTime::now_utc(); + + warn!("backend_info end {:?}", after4 - after3); + + let mut all_disks: Vec = Vec::new(); + for server in servers.iter() { + all_disks.extend(server.disks.clone()); + } + let (online_disks, offline_disks) = get_online_offline_disks_stats(&all_disks); + + let after5 = OffsetDateTime::now_utc(); + + warn!("get_online_offline_disks_stats end {:?}", after5 - after4); + backend = madmin::ErasureBackend { + backend_type: madmin::BackendType::ErasureType, + online_disks: online_disks.sum(), + offline_disks: offline_disks.sum(), + standard_sc_parity: backen_info.standard_sc_parity, + rr_sc_parity: backen_info.rr_sc_parity, + total_sets: backen_info.total_sets, + drives_per_set: backen_info.drives_per_set, + }; + if get_pools { + pools = get_pools_info(&all_disks).await.unwrap_or_default(); + let after6 = OffsetDateTime::now_utc(); + + warn!("get_pools_info end {:?}", after6 - after5); + } + } + + let services = madmin::Services::default(); + + InfoMessage { + mode: Some(mode.to_string()), + domain: None, + region: None, + sqs_arn: None, + deployment_id: None, + buckets: Some(buckets), + objects: Some(objects), + versions: Some(versions), + delete_markers: Some(delete_markers), + usage: Some(usage), + backend: Some(backend), + services: Some(services), + servers: Some(servers), + pools: Some(pools), + } +} + +fn get_online_offline_disks_stats(disks_info: &[Disk]) -> (BackendDisks, BackendDisks) { + let mut online_disks: HashMap = HashMap::new(); + let mut offline_disks: HashMap = HashMap::new(); + + for disk in disks_info { + let ep = &disk.endpoint; + offline_disks.entry(ep.clone()).or_insert(0); + online_disks.entry(ep.clone()).or_insert(0); + } + + for disk in disks_info { + let ep = &disk.endpoint; + let state = &disk.state; + if *state != DRIVE_STATE_OK && *state != DRIVE_STATE_UNFORMATTED { + *offline_disks.get_mut(ep).unwrap() += 1; + continue; + } + *online_disks.get_mut(ep).unwrap() += 1; + } + + let mut root_disk_count = 0; + for di in disks_info { + if di.root_disk { + root_disk_count += 1; + } + } + + if disks_info.len() == (root_disk_count + offline_disks.values().sum::()) { + return (BackendDisks(online_disks), BackendDisks(offline_disks)); + } + + for disk in disks_info { + let ep = &disk.endpoint; + if disk.root_disk { + *offline_disks.get_mut(ep).unwrap() += 1; + *online_disks.get_mut(ep).unwrap() -= 1; + } + } + + (BackendDisks(online_disks), BackendDisks(offline_disks)) +} + +async fn get_pools_info(all_disks: &[Disk]) -> Result>> { + let Some(store) = new_object_layer_fn() else { + return Err(Error::msg("ServerNotInitialized")); + }; + + let mut pools_info: HashMap> = HashMap::new(); + for d in all_disks { + let pool_info = pools_info.entry(d.pool_index).or_default(); + let erasure_set = pool_info.entry(d.set_index).or_default(); + + if erasure_set.id == 0 { + erasure_set.id = d.set_index; + if let Ok(cache) = DataUsageCache::load( + &store.pools[d.pool_index as usize].disk_set[d.set_index as usize].clone(), + DATA_USAGE_CACHE_NAME, + ) + .await + { + let data_usage_info = cache.dui(DATA_USAGE_ROOT, &vec![]); + erasure_set.objects_count = data_usage_info.objects_total_count; + erasure_set.versions_count = data_usage_info.versions_total_count; + erasure_set.delete_markers_count = data_usage_info.delete_markers_total_count; + erasure_set.usage = data_usage_info.objects_total_size; + }; + } + + erasure_set.raw_capacity += d.total_space; + erasure_set.raw_usage += d.used_space; + if d.healing { + erasure_set.heal_disks = 1; + } + } + Ok(pools_info) +} diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index f1df7b17..44fe33be 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -358,7 +358,7 @@ pub async fn load_bucket_metadata_parse(api: Arc, bucket: &str, parse: let mut bm = match read_bucket_metadata(api.clone(), bucket).await { Ok(res) => res, Err(err) => { - if !config::error::is_not_found(&err) { + if !config::error::is_err_config_not_found(&err) { return Err(err); } diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 16286212..ef455bd9 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -253,7 +253,7 @@ impl BucketMetadataSys { let meta = match self.get_config_from_disk(bucket).await { Ok(res) => res, Err(err) => { - if !config::error::is_not_found(&err) { + if !config::error::is_err_config_not_found(&err) { return Err(err); } else { BucketMetadata::new(bucket) @@ -357,7 +357,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_versioning_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Ok((VersioningConfiguration::default(), OffsetDateTime::UNIX_EPOCH)); } else { return Err(err); @@ -377,7 +377,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_bucket_policy err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketPolicyNotFound)); } else { return Err(err); @@ -397,7 +397,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_tagging_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::TaggingNotFound)); } else { return Err(err); @@ -417,7 +417,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_object_lock_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound)); } else { return Err(err); @@ -437,7 +437,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_lifecycle_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketLifecycleNotFound)); } else { return Err(err); @@ -461,7 +461,7 @@ impl BucketMetadataSys { Ok((bm, _)) => bm.notification_config.clone(), Err(err) => { warn!("get_notification_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { None } else { return Err(err); @@ -477,7 +477,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_sse_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound)); } else { return Err(err); @@ -508,7 +508,7 @@ impl BucketMetadataSys { Ok((res, _)) => res, Err(err) => { warn!("get_quota_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound)); } else { return Err(err); @@ -528,7 +528,7 @@ impl BucketMetadataSys { Ok(res) => res, Err(err) => { warn!("get_replication_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound)); } else { return Err(err); @@ -552,7 +552,7 @@ impl BucketMetadataSys { Ok(res) => res, Err(err) => { warn!("get_replication_config err {:?}", &err); - if config::error::is_not_found(&err) { + if config::error::is_err_config_not_found(&err) { return Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound)); } else { return Err(err); diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs index e52f827e..b209840c 100644 --- a/ecstore/src/config/common.rs +++ b/ecstore/src/config/common.rs @@ -1,9 +1,8 @@ use std::collections::HashSet; use std::sync::Arc; -use super::error::ConfigError; +use super::error::{is_err_config_not_found, ConfigError}; use super::{storageclass, Config, GLOBAL_StorageClass, KVS}; -use crate::config::error::is_not_found; use crate::disk::RUSTFS_META_BUCKET; use crate::error::{Error, Result}; use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI}; @@ -102,7 +101,7 @@ pub async fn read_config_without_migrate(api: Arc) -> Result res, Err(err) => { - if is_not_found(&err) { + if is_err_config_not_found(&err) { warn!("config not found, start to init"); let cfg = new_and_save_server_config(api).await?; warn!("config init done"); @@ -124,7 +123,7 @@ async fn read_server_config(api: Arc, data: &[u8]) -> Result res, Err(err) => { - if is_not_found(&err) { + if is_err_config_not_found(&err) { warn!("config not found init start"); let cfg = new_and_save_server_config(api).await?; warn!("config not found init done"); diff --git a/ecstore/src/config/error.rs b/ecstore/src/config/error.rs index 30efb20e..f734adb9 100644 --- a/ecstore/src/config/error.rs +++ b/ecstore/src/config/error.rs @@ -31,7 +31,7 @@ impl ConfigError { } } -pub fn is_not_found(err: &Error) -> bool { +pub fn is_err_config_not_found(err: &Error) -> bool { if let Some(e) = err.downcast_ref::() { ConfigError::is_not_found(e) } else if let Some(e) = err.downcast_ref::() { diff --git a/ecstore/src/heal/data_usage.rs b/ecstore/src/heal/data_usage.rs index 17b8ec5d..5460de3c 100644 --- a/ecstore/src/heal/data_usage.rs +++ b/ecstore/src/heal/data_usage.rs @@ -1,14 +1,20 @@ -use std::{collections::HashMap, time::SystemTime}; - use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc, time::SystemTime}; use tokio::sync::mpsc::Receiver; -use tracing::error; +use tracing::{error, warn}; use crate::{ - config::common::save_config, + bucket::metadata_sys::get_replication_config, + config::{ + common::{read_config, save_config}, + error::is_err_config_not_found, + }, disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET}, + error::Result, new_object_layer_fn, + store::ECStore, + store_err::to_object_err, utils::path::SLASH_SEPARATOR, }; @@ -133,3 +139,72 @@ pub async fn store_data_usage_in_backend(mut rx: Receiver) { } } } + +// TODO: cancel ctx +pub async fn load_data_usage_from_backend(store: Arc) -> Result { + let buf = match read_config(store, &DATA_USAGE_OBJ_NAME_PATH).await { + Ok(data) => data, + Err(e) => { + error!("Failed to read data usage info from backend: {}", e); + if is_err_config_not_found(&e) { + return Ok(DataUsageInfo::default()); + } + + return Err(to_object_err(e, vec![RUSTFS_META_BUCKET, &DATA_USAGE_OBJ_NAME_PATH])); + } + }; + + let mut data_usage_info: DataUsageInfo = serde_json::from_slice(&buf)?; + + warn!("Loaded data usage info from backend {:?}", &data_usage_info); + + if data_usage_info.buckets_usage.is_empty() { + data_usage_info.buckets_usage = data_usage_info + .bucket_sizes + .iter() + .map(|(bucket, &size)| { + ( + bucket.clone(), + BucketUsageInfo { + size, + ..Default::default() + }, + ) + }) + .collect(); + } + + if data_usage_info.bucket_sizes.is_empty() { + data_usage_info.bucket_sizes = data_usage_info + .buckets_usage + .iter() + .map(|(bucket, bui)| (bucket.clone(), bui.size)) + .collect(); + } + + for (bucket, bui) in &data_usage_info.buckets_usage { + if bui.replicated_size_v1 > 0 + || bui.replication_failed_count_v1 > 0 + || bui.replication_failed_size_v1 > 0 + || bui.replication_pending_count_v1 > 0 + { + if let Ok((cfg, _)) = get_replication_config(bucket).await { + if !cfg.role.is_empty() { + data_usage_info.replication_info.insert( + cfg.role.clone(), + BucketTargetUsageInfo { + replication_failed_size: bui.replication_failed_size_v1, + replication_failed_count: bui.replication_failed_count_v1, + replicated_size: bui.replicated_size_v1, + replication_pending_count: bui.replication_pending_count_v1, + replication_pending_size: bui.replication_pending_size_v1, + ..Default::default() + }, + ); + } + } + } + } + + Ok(data_usage_info) +} diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index ead58802..c21a3544 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -18,6 +18,7 @@ use std::path::Path; use std::time::{Duration, SystemTime}; use tokio::sync::mpsc::Sender; use tokio::time::sleep; +use tracing::warn; use super::data_scanner::{SizeSummary, DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS}; use super::data_usage::{BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, DATA_USAGE_ROOT}; @@ -379,6 +380,7 @@ impl DataUsageCache { let mut retries = 0; while retries < 5 { let path = Path::new(BUCKET_META_PREFIX).join(name); + warn!("Loading data usage cache from backend: {}", path.display()); match store .get_object_reader( RUSTFS_META_BUCKET, @@ -398,37 +400,42 @@ impl DataUsageCache { } break; } - Err(err) => match err.downcast_ref::() { - Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => { - match store - .get_object_reader( - RUSTFS_META_BUCKET, - name, - None, - HeaderMap::new(), - &ObjectOptions { - no_lock: true, - ..Default::default() - }, - ) - .await - { - Ok(mut reader) => { - if let Ok(info) = Self::unmarshal(&reader.read_all().await?) { - d = info - } - break; - } - Err(_) => match err.downcast_ref::() { - Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => { + Err(err) => { + warn!("Failed to load data usage cache from backend: {}", &err); + match err.downcast_ref::() { + Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => { + match store + .get_object_reader( + RUSTFS_META_BUCKET, + name, + None, + HeaderMap::new(), + &ObjectOptions { + no_lock: true, + ..Default::default() + }, + ) + .await + { + Ok(mut reader) => { + if let Ok(info) = Self::unmarshal(&reader.read_all().await?) { + d = info + } break; } - _ => {} - }, + Err(_) => match err.downcast_ref::() { + Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => { + break; + } + _ => {} + }, + } + } + _ => { + break; } } - _ => {} - }, + } } retries += 1; let dur = { @@ -446,11 +453,14 @@ impl DataUsageCache { let buf_clone = buf.clone(); let store_clone = store.clone(); - let name_clone = name.to_string(); + + let name = Path::new(BUCKET_META_PREFIX).join(name).to_string_lossy().to_string(); + + let name_clone = name.clone(); tokio::spawn(async move { let _ = save_config(store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await; }); - save_config(store, name, &buf).await + save_config(store, &name, &buf).await } pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) { diff --git a/ecstore/src/notification_sys.rs b/ecstore/src/notification_sys.rs index c0b2463b..37b23489 100644 --- a/ecstore/src/notification_sys.rs +++ b/ecstore/src/notification_sys.rs @@ -1,5 +1,3 @@ -use std::sync::OnceLock; - use crate::endpoints::EndpointServerPools; use crate::error::{Error, Result}; use crate::global::get_global_endpoints; @@ -7,6 +5,8 @@ use crate::peer_rest_client::PeerRestClient; use crate::StorageAPI; use futures::future::join_all; use lazy_static::lazy_static; +use madmin::{ItemState, ServerProperties}; +use std::sync::OnceLock; use tracing::error; lazy_static! { @@ -95,6 +95,30 @@ impl NotificationSys { madmin::StorageInfo { disks, backend } } + pub async fn server_info(&self) -> Vec { + let mut futures = Vec::with_capacity(self.peer_clients.len()); + + for client in self.peer_clients.iter() { + futures.push(async move { + if let Some(client) = client { + match client.server_info().await { + Ok(info) => info, + Err(_) => ServerProperties { + endpoint: client.host.to_string(), + state: ItemState::Offline.to_string().to_owned(), + disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()), + ..Default::default() + }, + } + } else { + ServerProperties::default() + } + }); + } + + join_all(futures).await + } + pub async fn reload_pool_meta(&self) { let mut futures = Vec::with_capacity(self.peer_clients.len()); for client in self.peer_clients.iter().flatten() { diff --git a/ecstore/src/peer_rest_client.rs b/ecstore/src/peer_rest_client.rs index 95496125..8778af5c 100644 --- a/ecstore/src/peer_rest_client.rs +++ b/ecstore/src/peer_rest_client.rs @@ -1,7 +1,4 @@ -use std::{collections::HashMap, io::Cursor, time::SystemTime}; - use crate::{ - admin_server_info::ServerProperties, endpoints::EndpointServerPools, global::is_dist_erasure, heal::heal_commands::BgHealState, @@ -13,6 +10,7 @@ use madmin::{ health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService}, metrics::RealtimeMetrics, net::NetInfo, + ServerProperties, }; use protos::{ node_service_time_out_client, @@ -28,6 +26,7 @@ use protos::{ }; use rmp_serde::{Deserializer, Serializer}; use serde::{Deserialize, Serialize as _}; +use std::{collections::HashMap, io::Cursor, time::SystemTime}; use tonic::Request; use tracing::warn; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 42d15a91..3850a339 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -6,8 +6,6 @@ use std::{ time::Duration, }; -use crate::config::error::is_not_found; -use crate::global::GLOBAL_MRFState; use crate::{ bitrot::{bitrot_verify, close_bitrot_writers, new_bitrot_filereader, new_bitrot_filewriter, BitrotFileWriter}, cache_value::metacache_set::{list_path_raw, ListPathRawOptions}, @@ -52,6 +50,7 @@ use crate::{ }, xhttp, }; +use crate::{config::error::is_err_config_not_found, global::GLOBAL_MRFState}; use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation}; use crate::{file_meta::file_info_from_raw, heal::data_usage_cache::DataUsageCache}; use crate::{ @@ -1894,7 +1893,7 @@ impl SetDisks { version_id: fi.version_id.map(|v| v.to_string()), set_index, pool_index, - bitrot_scan: !is_not_found(e), + bitrot_scan: !is_err_config_not_found(e), ..Default::default() }) .await; diff --git a/iam/src/store/object.rs b/iam/src/store/object.rs index 8199903c..9290395f 100644 --- a/iam/src/store/object.rs +++ b/iam/src/store/object.rs @@ -1,9 +1,9 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use ecstore::{ - config::error::is_not_found, + config::error::is_err_config_not_found, store::ECStore, - store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader}, + store_api::{ObjectIO, ObjectInfo, ObjectOptions, PutObjReader}, utils::path::dir, StorageAPI, }; @@ -59,7 +59,7 @@ impl ObjectStore { match items { Ok(items) => Result::<_, crate::Error>::Ok(items.prefixes), Err(e) => { - if is_not_found(&e) { + if is_err_config_not_found(&e) { Result::<_, crate::Error>::Ok(vec![]) } else { Err(Error::StringError(format!("list {prefix} failed, err: {e:?}"))) @@ -290,7 +290,7 @@ impl Store for ObjectStore { match self.load_mapped_policy(UserType::Sts, parent.as_str(), false).await { Ok(m) => sts_policies.lock().await.insert(name.to_owned(), m), - Err(Error::EcstoreError(e)) if is_not_found(&e) => return Ok(()), + Err(Error::EcstoreError(e)) if is_err_config_not_found(&e) => return Ok(()), Err(e) => return Err(e), }; } diff --git a/madmin/src/info_commands.rs b/madmin/src/info_commands.rs index ec8f0f27..72c5972f 100644 --- a/madmin/src/info_commands.rs +++ b/madmin/src/info_commands.rs @@ -116,7 +116,7 @@ pub struct StorageInfo { } #[derive(Debug, Default, Serialize, Deserialize)] -pub struct BackendDisks(HashMap); +pub struct BackendDisks(pub HashMap); impl BackendDisks { pub fn new() -> Self { @@ -148,3 +148,177 @@ pub struct BackendInfo { pub total_sets: Vec, pub drives_per_set: Vec, } + +pub const ITEM_OFFLINE: &str = "offline"; +pub const ITEM_INITIALIZING: &str = "initializing"; +pub const ITEM_ONLINE: &str = "online"; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct MemStats { + pub alloc: u64, + pub total_alloc: u64, + pub mallocs: u64, + pub frees: u64, + pub heap_alloc: u64, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct ServerProperties { + pub state: String, + pub endpoint: String, + pub scheme: String, + pub uptime: u64, + pub version: String, + #[serde(rename = "commitID")] + pub commit_id: String, + pub network: HashMap, + #[serde(rename = "drives")] + pub disks: Vec, + #[serde(rename = "poolNumber")] + pub pool_number: i32, + #[serde(rename = "poolNumbers")] + pub pool_numbers: Vec, + pub mem_stats: MemStats, + pub max_procs: u64, + pub num_cpu: u64, + pub runtime_version: String, + pub rustfs_env_vars: HashMap, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Kms { + pub status: Option, + pub encrypt: Option, + pub decrypt: Option, + pub endpoint: Option, + pub version: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Ldap { + pub status: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Status { + pub status: Option, +} + +pub type Audit = HashMap; + +pub type Logger = HashMap; + +pub type TargetIDStatus = HashMap; + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct Services { + pub kms: Option, // deprecated july 2023 + #[serde(rename = "kmsStatus")] + pub kms_status: Option>, + pub ldap: Option, + pub logger: Option>, + pub audit: Option>, + pub notifications: Option>>>, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Buckets { + pub count: u64, + pub error: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Objects { + pub count: u64, + pub error: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Versions { + pub count: u64, + pub error: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct DeleteMarkers { + pub count: u64, + pub error: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Usage { + pub size: u64, + pub error: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct ErasureSetInfo { + pub id: i32, + #[serde(rename = "rawUsage")] + pub raw_usage: u64, + #[serde(rename = "rawCapacity")] + pub raw_capacity: u64, + pub usage: u64, + #[serde(rename = "objectsCount")] + pub objects_count: u64, + #[serde(rename = "versionsCount")] + pub versions_count: u64, + #[serde(rename = "deleteMarkersCount")] + pub delete_markers_count: u64, + #[serde(rename = "healDisks")] + pub heal_disks: i32, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub enum BackendType { + #[default] + #[serde(rename = "FS")] + FsType, + #[serde(rename = "Erasure")] + ErasureType, +} + +#[derive(Serialize, Deserialize)] +pub struct FSBackend { + #[serde(rename = "backendType")] + pub backend_type: BackendType, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct ErasureBackend { + #[serde(rename = "backendType")] + pub backend_type: BackendType, + #[serde(rename = "onlineDisks")] + pub online_disks: usize, + #[serde(rename = "offlineDisks")] + pub offline_disks: usize, + #[serde(rename = "standardSCParity")] + pub standard_sc_parity: Option, + #[serde(rename = "rrSCParity")] + pub rr_sc_parity: Option, + #[serde(rename = "totalSets")] + pub total_sets: Vec, + #[serde(rename = "totalDrivesPerSet")] + pub drives_per_set: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct InfoMessage { + pub mode: Option, + pub domain: Option>, + pub region: Option, + #[serde(rename = "sqsARN")] + pub sqs_arn: Option>, + #[serde(rename = "deploymentID")] + pub deployment_id: Option, + pub buckets: Option, + pub objects: Option, + pub versions: Option, + #[serde(rename = "deletemarkers")] + pub delete_markers: Option, + pub usage: Option, + pub services: Option, + pub backend: Option, + pub servers: Option>, + pub pools: Option>>, +} diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 48039e06..ec0280e4 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -1,12 +1,14 @@ use super::router::Operation; use crate::storage::error::to_s3_error; use bytes::Bytes; +use ecstore::admin_server_info::get_server_info; use ecstore::bucket::policy::action::{Action, ActionSet}; use ecstore::bucket::policy::bucket_policy::{BPStatement, BucketPolicy}; use ecstore::bucket::policy::effect::Effect; use ecstore::bucket::policy::resource::{Resource, ResourceSet}; use ecstore::error::Error as ec_Error; use ecstore::global::GLOBAL_ALlHealState; +use ecstore::heal::data_usage::load_data_usage_from_backend; use ecstore::heal::heal_commands::HealOpts; use ecstore::heal::heal_ops::new_heal_sequence; use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType}; @@ -47,6 +49,7 @@ use tracing::{error, info, warn}; pub mod service_account; pub mod trace; +pub mod user; const ASSUME_ROLE_ACTION: &str = "AssumeRole"; const ASSUME_ROLE_VERSION: &str = "2011-06-15"; @@ -271,7 +274,12 @@ impl Operation for ServerInfoHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle ServerInfoHandler"); - return Err(s3_error!(NotImplemented)); + let info = get_server_info(true).await; + + let output = serde_json::to_string(&info) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse serverInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } @@ -315,7 +323,19 @@ impl Operation for DataUsageInfoHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle DataUsageInfoHandler"); - return Err(s3_error!(NotImplemented)); + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + let info = load_data_usage_from_backend(store).await.map_err(|e| { + error!("load_data_usage_from_backend failed {:?}", e); + s3_error!(InternalError, "load_data_usage_from_backend failed") + })?; + + let output = serde_json::to_string(&info) + .map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse DataUsageInfo failed"))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(output)))) } } diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs new file mode 100644 index 00000000..3c961fa8 --- /dev/null +++ b/rustfs/src/admin/handlers/user.rs @@ -0,0 +1,21 @@ +use http::StatusCode; +use matchit::Params; +use s3s::{s3_error, Body, S3Request, S3Response, S3Result}; + +use crate::admin::router::Operation; + +pub struct AddUser {} +#[async_trait::async_trait] +impl Operation for AddUser { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + return Err(s3_error!(NotImplemented)); + } +} + +pub struct SetUserStatus {} +#[async_trait::async_trait] +impl Operation for SetUserStatus { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + return Err(s3_error!(NotImplemented)); + } +} diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index eb82136c..713eab9e 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -4,8 +4,9 @@ pub mod router; use common::error::Result; // use ecstore::global::{is_dist_erasure, is_erasure}; -use handlers::service_account::{ - AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount, +use handlers::{ + service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount}, + user, }; use hyper::Method; use router::{AdminOperation, S3Router}; @@ -14,19 +15,19 @@ use s3s::route::S3Route; const ADMIN_PREFIX: &str = "/rustfs/admin"; pub fn make_admin_route() -> Result { - let mut r = S3Router::new(); + let mut r: S3Router = S3Router::new(); + // 1 r.insert(Method::POST, "/", AdminOperation(&handlers::AssumeRoleHandle {}))?; - r.insert( - Method::GET, - format!("{}{}", ADMIN_PREFIX, "/v3/accountinfo").as_str(), - AdminOperation(&handlers::AccountInfoHandler {}), - )?; + + regist_user_route(&mut r)?; + r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/service").as_str(), AdminOperation(&handlers::ServiceHandle {}), )?; + // 1 r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/info").as_str(), @@ -42,11 +43,13 @@ pub fn make_admin_route() -> Result { format!("{}{}", ADMIN_PREFIX, "/v3/inspect-data").as_str(), AdminOperation(&handlers::InspectDataHandler {}), )?; + // 1 r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/storageinfo").as_str(), AdminOperation(&handlers::StorageInfoHandler {}), )?; + // 1 r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/datausageinfo").as_str(), @@ -58,21 +61,25 @@ pub fn make_admin_route() -> Result { AdminOperation(&handlers::MetricsHandler {}), )?; + // 1 r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/pools/list").as_str(), AdminOperation(&handlers::ListPools {}), )?; + // 1 r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/pools/status").as_str(), AdminOperation(&handlers::StatusPool {}), )?; + // todo r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/pools/decommission").as_str(), AdminOperation(&handlers::StartDecommission {}), )?; + // todo r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/pools/cancel").as_str(), @@ -109,6 +116,28 @@ pub fn make_admin_route() -> Result { )?; // } + Ok(r) +} + +fn regist_user_route(r: &mut S3Router) -> Result<()> { + // 1 + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/accountinfo").as_str(), + AdminOperation(&handlers::AccountInfoHandler {}), + )?; + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/add-user").as_str(), + AdminOperation(&user::AddUser {}), + )?; + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/set-user-status").as_str(), + AdminOperation(&user::SetUserStatus {}), + )?; + + // Service accounts r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/update-service-account").as_str(), @@ -139,5 +168,5 @@ pub fn make_admin_route() -> Result { AdminOperation(&AddServiceAccount {}), )?; - Ok(r) + Ok(()) } diff --git a/rustfs/src/admin/test.json b/rustfs/src/admin/test.json new file mode 100644 index 00000000..4becb978 --- /dev/null +++ b/rustfs/src/admin/test.json @@ -0,0 +1,232 @@ +{ + "mode": "online", + "domain": null, + "region": null, + "sqsARN": null, + "deploymentID": null, + "buckets": { + "count": 0, + "error": null + }, + "objects": { + "count": 0, + "error": null + }, + "versions": { + "count": 0, + "error": null + }, + "deletemarkers": { + "count": 0, + "error": null + }, + "usage": { + "size": 0, + "error": null + }, + "services": { + "kms": null, + "kmsStatus": null, + "ldap": null, + "logger": null, + "audit": null, + "notifications": null + }, + "backend": { + "backendType": "Erasure", + "onlineDisks": 5, + "offlineDisks": 0, + "standardSCParity": 2, + "rrSCParity": 1, + "totalSets": [ + 1 + ], + "totalDrivesPerSet": [ + 5 + ] + }, + "servers": [ + { + "state": "online", + "endpoint": "127.0.0.1:9000", + "scheme": "", + "uptime": 1736146443, + "version": "", + "commitID": "", + "network": { + "127.0.0.1:9000": "online" + }, + "drives": [ + { + "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0", + "root_disk": true, + "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0", + "healing": false, + "scanning": true, + "state": "ok", + "uuid": "", + "major": 0, + "minor": 0, + "model": null, + "total_space": 494384795648, + "used_space": 283710812160, + "available_space": 210673983488, + "read_throughput": 0.0, + "write_throughput": 0.0, + "read_latency": 0.0, + "write_latency": 0.0, + "utilization": 57.386637828967736, + "metrics": null, + "heal_info": null, + "used_inodes": 2353357, + "free_inodes": 2057363120, + "local": true, + "pool_index": 0, + "set_index": 0, + "disk_index": 0 + }, + { + "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1", + "root_disk": true, + "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1", + "healing": false, + "scanning": true, + "state": "ok", + "uuid": "", + "major": 0, + "minor": 0, + "model": null, + "total_space": 494384795648, + "used_space": 283710812160, + "available_space": 210673983488, + "read_throughput": 0.0, + "write_throughput": 0.0, + "read_latency": 0.0, + "write_latency": 0.0, + "utilization": 57.386637828967736, + "metrics": null, + "heal_info": null, + "used_inodes": 2353357, + "free_inodes": 2057363120, + "local": true, + "pool_index": 0, + "set_index": 0, + "disk_index": 1 + }, + { + "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2", + "root_disk": true, + "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2", + "healing": false, + "scanning": false, + "state": "ok", + "uuid": "", + "major": 0, + "minor": 0, + "model": null, + "total_space": 494384795648, + "used_space": 283710812160, + "available_space": 210673983488, + "read_throughput": 0.0, + "write_throughput": 0.0, + "read_latency": 0.0, + "write_latency": 0.0, + "utilization": 57.386637828967736, + "metrics": null, + "heal_info": null, + "used_inodes": 2353357, + "free_inodes": 2057363120, + "local": true, + "pool_index": 0, + "set_index": 0, + "disk_index": 2 + }, + { + "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3", + "root_disk": true, + "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3", + "healing": false, + "scanning": false, + "state": "ok", + "uuid": "", + "major": 0, + "minor": 0, + "model": null, + "total_space": 494384795648, + "used_space": 283710812160, + "available_space": 210673983488, + "read_throughput": 0.0, + "write_throughput": 0.0, + "read_latency": 0.0, + "write_latency": 0.0, + "utilization": 57.386637828967736, + "metrics": null, + "heal_info": null, + "used_inodes": 2353357, + "free_inodes": 2057363120, + "local": true, + "pool_index": 0, + "set_index": 0, + "disk_index": 3 + }, + { + "endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4", + "root_disk": true, + "drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4", + "healing": false, + "scanning": false, + "state": "ok", + "uuid": "", + "major": 0, + "minor": 0, + "model": null, + "total_space": 494384795648, + "used_space": 283710812160, + "available_space": 210673983488, + "read_throughput": 0.0, + "write_throughput": 0.0, + "read_latency": 0.0, + "write_latency": 0.0, + "utilization": 57.386637828967736, + "metrics": null, + "heal_info": null, + "used_inodes": 2353357, + "free_inodes": 2057363120, + "local": true, + "pool_index": 0, + "set_index": 0, + "disk_index": 4 + } + ], + "poolNumber": 1, + "poolNumbers": [ + 1 + ], + "mem_stats": { + "alloc": 0, + "total_alloc": 0, + "mallocs": 0, + "frees": 0, + "heap_alloc": 0 + }, + "max_procs": 0, + "num_cpu": 0, + "runtime_version": "", + "rustfs_env_vars": {} + } + ], + "pools": { + "0": { + "0": { + "id": 0, + "rawUsage": 1418554060800, + "rawCapacity": 2471923978240, + "usage": 0, + "objectsCount": 0, + "versionsCount": 0, + "deleteMarkersCount": 0, + "healDisks": 0 + } + } + } +} \ No newline at end of file