fix #110 #112 add ServerInfoHandler need test

This commit is contained in:
weisd
2025-01-06 15:04:36 +08:00
parent 3a799a6cca
commit d25d233cdc
16 changed files with 889 additions and 111 deletions

View File

@@ -1,53 +1,64 @@
use std::{
collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH},
use crate::{
disk::endpoint::Endpoint,
global::GLOBAL_Endpoints,
heal::{
data_usage::{load_data_usage_from_backend, DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
data_usage_cache::DataUsageCache,
heal_commands::{DRIVE_STATE_OK, DRIVE_STATE_UNFORMATTED},
},
new_object_layer_fn,
notification_sys::get_global_notification_sys,
store_api::StorageAPI,
};
use common::{
error::{Error, Result},
globals::GLOBAL_Local_Node_Name,
};
use madmin::{BackendDisks, Disk, ErasureSetInfo, InfoMessage, ServerProperties, ITEM_INITIALIZING, ITEM_OFFLINE, ITEM_ONLINE};
use protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{PingRequest, PingResponse},
};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH},
};
use time::OffsetDateTime;
use tonic::Request;
use tracing::warn;
use crate::{disk::endpoint::Endpoint, global::GLOBAL_Endpoints, new_object_layer_fn, store_api::StorageAPI};
// pub const ITEM_OFFLINE: &str = "offline";
// pub const ITEM_INITIALIZING: &str = "initializing";
// pub const ITEM_ONLINE: &str = "online";
pub const ITEM_OFFLINE: &str = "offline";
pub const ITEM_INITIALIZING: &str = "initializing";
pub const ITEM_ONLINE: &str = "online";
// #[derive(Debug, Default, Serialize, Deserialize)]
// pub struct MemStats {
// alloc: u64,
// total_alloc: u64,
// mallocs: u64,
// frees: u64,
// heap_alloc: u64,
// }
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MemStats {
alloc: u64,
total_alloc: u64,
mallocs: u64,
frees: u64,
heap_alloc: u64,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ServerProperties {
pub state: String,
pub endpoint: String,
pub scheme: String,
pub uptime: u64,
pub version: String,
pub commit_id: String,
pub network: HashMap<String, String>,
pub disks: Vec<madmin::Disk>,
pub pool_number: i32,
pub pool_numbers: Vec<i32>,
pub mem_stats: MemStats,
pub max_procs: u64,
pub num_cpu: u64,
pub runtime_version: String,
pub rustfs_env_vars: HashMap<String, String>,
}
// #[derive(Debug, Default, Serialize, Deserialize)]
// pub struct ServerProperties {
// pub state: String,
// pub endpoint: String,
// pub scheme: String,
// pub uptime: u64,
// pub version: String,
// pub commit_id: String,
// pub network: HashMap<String, String>,
// pub disks: Vec<madmin::Disk>,
// pub pool_number: i32,
// pub pool_numbers: Vec<i32>,
// pub mem_stats: MemStats,
// pub max_procs: u64,
// pub num_cpu: u64,
// pub runtime_version: String,
// pub rustfs_env_vars: HashMap<String, String>,
// }
async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
let addr = format!(
@@ -139,7 +150,7 @@ pub async fn get_local_server_property() -> ServerProperties {
}
props.pool_numbers.sort();
props.pool_number = if props.pool_numbers.len() == 1 {
props.pool_numbers[1]
props.pool_numbers[0]
} else {
i32::MAX
};
@@ -160,3 +171,188 @@ pub async fn get_local_server_property() -> ServerProperties {
props
}
pub async fn get_server_info(get_pools: bool) -> InfoMessage {
let nowt: OffsetDateTime = OffsetDateTime::now_utc();
warn!("get_server_info start {:?}", nowt);
let local = get_local_server_property().await;
let after1 = OffsetDateTime::now_utc();
warn!("get_local_server_property end {:?}", after1 - nowt);
let mut servers = {
if let Some(sys) = get_global_notification_sys() {
sys.server_info().await
} else {
vec![]
}
};
let after2 = OffsetDateTime::now_utc();
warn!("server_info end {:?}", after2 - after1);
servers.push(local);
let mut buckets = madmin::Buckets::default();
let mut objects = madmin::Objects::default();
let mut versions = madmin::Versions::default();
let mut delete_markers = madmin::DeleteMarkers::default();
let mut usage = madmin::Usage::default();
let mut mode = madmin::ITEM_INITIALIZING;
let mut backend = madmin::ErasureBackend::default();
let mut pools: HashMap<i32, HashMap<i32, madmin::ErasureSetInfo>> = HashMap::new();
if let Some(store) = new_object_layer_fn() {
mode = madmin::ITEM_ONLINE;
match load_data_usage_from_backend(store.clone()).await {
Ok(res) => {
buckets.count = res.buckets_count;
objects.count = res.objects_total_count;
versions.count = res.versions_total_count;
delete_markers.count = res.delete_markers_total_count;
usage.size = res.objects_total_size;
}
Err(err) => {
buckets.error = Some(err.to_string());
objects.error = Some(err.to_string());
versions.error = Some(err.to_string());
delete_markers.error = Some(err.to_string());
usage.error = Some(err.to_string());
}
}
let after3 = OffsetDateTime::now_utc();
warn!("load_data_usage_from_backend end {:?}", after3 - after2);
let backen_info = store.clone().backend_info().await;
let after4 = OffsetDateTime::now_utc();
warn!("backend_info end {:?}", after4 - after3);
let mut all_disks: Vec<madmin::Disk> = Vec::new();
for server in servers.iter() {
all_disks.extend(server.disks.clone());
}
let (online_disks, offline_disks) = get_online_offline_disks_stats(&all_disks);
let after5 = OffsetDateTime::now_utc();
warn!("get_online_offline_disks_stats end {:?}", after5 - after4);
backend = madmin::ErasureBackend {
backend_type: madmin::BackendType::ErasureType,
online_disks: online_disks.sum(),
offline_disks: offline_disks.sum(),
standard_sc_parity: backen_info.standard_sc_parity,
rr_sc_parity: backen_info.rr_sc_parity,
total_sets: backen_info.total_sets,
drives_per_set: backen_info.drives_per_set,
};
if get_pools {
pools = get_pools_info(&all_disks).await.unwrap_or_default();
let after6 = OffsetDateTime::now_utc();
warn!("get_pools_info end {:?}", after6 - after5);
}
}
let services = madmin::Services::default();
InfoMessage {
mode: Some(mode.to_string()),
domain: None,
region: None,
sqs_arn: None,
deployment_id: None,
buckets: Some(buckets),
objects: Some(objects),
versions: Some(versions),
delete_markers: Some(delete_markers),
usage: Some(usage),
backend: Some(backend),
services: Some(services),
servers: Some(servers),
pools: Some(pools),
}
}
fn get_online_offline_disks_stats(disks_info: &[Disk]) -> (BackendDisks, BackendDisks) {
let mut online_disks: HashMap<String, usize> = HashMap::new();
let mut offline_disks: HashMap<String, usize> = HashMap::new();
for disk in disks_info {
let ep = &disk.endpoint;
offline_disks.entry(ep.clone()).or_insert(0);
online_disks.entry(ep.clone()).or_insert(0);
}
for disk in disks_info {
let ep = &disk.endpoint;
let state = &disk.state;
if *state != DRIVE_STATE_OK && *state != DRIVE_STATE_UNFORMATTED {
*offline_disks.get_mut(ep).unwrap() += 1;
continue;
}
*online_disks.get_mut(ep).unwrap() += 1;
}
let mut root_disk_count = 0;
for di in disks_info {
if di.root_disk {
root_disk_count += 1;
}
}
if disks_info.len() == (root_disk_count + offline_disks.values().sum::<usize>()) {
return (BackendDisks(online_disks), BackendDisks(offline_disks));
}
for disk in disks_info {
let ep = &disk.endpoint;
if disk.root_disk {
*offline_disks.get_mut(ep).unwrap() += 1;
*online_disks.get_mut(ep).unwrap() -= 1;
}
}
(BackendDisks(online_disks), BackendDisks(offline_disks))
}
async fn get_pools_info(all_disks: &[Disk]) -> Result<HashMap<i32, HashMap<i32, ErasureSetInfo>>> {
let Some(store) = new_object_layer_fn() else {
return Err(Error::msg("ServerNotInitialized"));
};
let mut pools_info: HashMap<i32, HashMap<i32, ErasureSetInfo>> = HashMap::new();
for d in all_disks {
let pool_info = pools_info.entry(d.pool_index).or_default();
let erasure_set = pool_info.entry(d.set_index).or_default();
if erasure_set.id == 0 {
erasure_set.id = d.set_index;
if let Ok(cache) = DataUsageCache::load(
&store.pools[d.pool_index as usize].disk_set[d.set_index as usize].clone(),
DATA_USAGE_CACHE_NAME,
)
.await
{
let data_usage_info = cache.dui(DATA_USAGE_ROOT, &vec![]);
erasure_set.objects_count = data_usage_info.objects_total_count;
erasure_set.versions_count = data_usage_info.versions_total_count;
erasure_set.delete_markers_count = data_usage_info.delete_markers_total_count;
erasure_set.usage = data_usage_info.objects_total_size;
};
}
erasure_set.raw_capacity += d.total_space;
erasure_set.raw_usage += d.used_space;
if d.healing {
erasure_set.heal_disks = 1;
}
}
Ok(pools_info)
}

View File

@@ -358,7 +358,7 @@ pub async fn load_bucket_metadata_parse(api: Arc<ECStore>, bucket: &str, parse:
let mut bm = match read_bucket_metadata(api.clone(), bucket).await {
Ok(res) => res,
Err(err) => {
if !config::error::is_not_found(&err) {
if !config::error::is_err_config_not_found(&err) {
return Err(err);
}

View File

@@ -253,7 +253,7 @@ impl BucketMetadataSys {
let meta = match self.get_config_from_disk(bucket).await {
Ok(res) => res,
Err(err) => {
if !config::error::is_not_found(&err) {
if !config::error::is_err_config_not_found(&err) {
return Err(err);
} else {
BucketMetadata::new(bucket)
@@ -357,7 +357,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_versioning_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Ok((VersioningConfiguration::default(), OffsetDateTime::UNIX_EPOCH));
} else {
return Err(err);
@@ -377,7 +377,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_bucket_policy err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketPolicyNotFound));
} else {
return Err(err);
@@ -397,7 +397,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_tagging_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::TaggingNotFound));
} else {
return Err(err);
@@ -417,7 +417,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_object_lock_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound));
} else {
return Err(err);
@@ -437,7 +437,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_lifecycle_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketLifecycleNotFound));
} else {
return Err(err);
@@ -461,7 +461,7 @@ impl BucketMetadataSys {
Ok((bm, _)) => bm.notification_config.clone(),
Err(err) => {
warn!("get_notification_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
None
} else {
return Err(err);
@@ -477,7 +477,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_sse_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound));
} else {
return Err(err);
@@ -508,7 +508,7 @@ impl BucketMetadataSys {
Ok((res, _)) => res,
Err(err) => {
warn!("get_quota_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound));
} else {
return Err(err);
@@ -528,7 +528,7 @@ impl BucketMetadataSys {
Ok(res) => res,
Err(err) => {
warn!("get_replication_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound));
} else {
return Err(err);
@@ -552,7 +552,7 @@ impl BucketMetadataSys {
Ok(res) => res,
Err(err) => {
warn!("get_replication_config err {:?}", &err);
if config::error::is_not_found(&err) {
if config::error::is_err_config_not_found(&err) {
return Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound));
} else {
return Err(err);

View File

@@ -1,9 +1,8 @@
use std::collections::HashSet;
use std::sync::Arc;
use super::error::ConfigError;
use super::error::{is_err_config_not_found, ConfigError};
use super::{storageclass, Config, GLOBAL_StorageClass, KVS};
use crate::config::error::is_not_found;
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};
@@ -102,7 +101,7 @@ pub async fn read_config_without_migrate<S: StorageAPI>(api: Arc<S>) -> Result<C
let data = match read_config(api.clone(), config_file.as_str()).await {
Ok(res) => res,
Err(err) => {
if is_not_found(&err) {
if is_err_config_not_found(&err) {
warn!("config not found, start to init");
let cfg = new_and_save_server_config(api).await?;
warn!("config init done");
@@ -124,7 +123,7 @@ async fn read_server_config<S: StorageAPI>(api: Arc<S>, data: &[u8]) -> Result<C
let cfg_data = match read_config(api.clone(), config_file.as_str()).await {
Ok(res) => res,
Err(err) => {
if is_not_found(&err) {
if is_err_config_not_found(&err) {
warn!("config not found init start");
let cfg = new_and_save_server_config(api).await?;
warn!("config not found init done");

View File

@@ -31,7 +31,7 @@ impl ConfigError {
}
}
pub fn is_not_found(err: &Error) -> bool {
pub fn is_err_config_not_found(err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<ConfigError>() {
ConfigError::is_not_found(e)
} else if let Some(e) = err.downcast_ref::<disk::error::DiskError>() {

View File

@@ -1,14 +1,20 @@
use std::{collections::HashMap, time::SystemTime};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use tokio::sync::mpsc::Receiver;
use tracing::error;
use tracing::{error, warn};
use crate::{
config::common::save_config,
bucket::metadata_sys::get_replication_config,
config::{
common::{read_config, save_config},
error::is_err_config_not_found,
},
disk::{BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
error::Result,
new_object_layer_fn,
store::ECStore,
store_err::to_object_err,
utils::path::SLASH_SEPARATOR,
};
@@ -133,3 +139,72 @@ pub async fn store_data_usage_in_backend(mut rx: Receiver<DataUsageInfo>) {
}
}
}
// TODO: cancel ctx
pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsageInfo> {
let buf = match read_config(store, &DATA_USAGE_OBJ_NAME_PATH).await {
Ok(data) => data,
Err(e) => {
error!("Failed to read data usage info from backend: {}", e);
if is_err_config_not_found(&e) {
return Ok(DataUsageInfo::default());
}
return Err(to_object_err(e, vec![RUSTFS_META_BUCKET, &DATA_USAGE_OBJ_NAME_PATH]));
}
};
let mut data_usage_info: DataUsageInfo = serde_json::from_slice(&buf)?;
warn!("Loaded data usage info from backend {:?}", &data_usage_info);
if data_usage_info.buckets_usage.is_empty() {
data_usage_info.buckets_usage = data_usage_info
.bucket_sizes
.iter()
.map(|(bucket, &size)| {
(
bucket.clone(),
BucketUsageInfo {
size,
..Default::default()
},
)
})
.collect();
}
if data_usage_info.bucket_sizes.is_empty() {
data_usage_info.bucket_sizes = data_usage_info
.buckets_usage
.iter()
.map(|(bucket, bui)| (bucket.clone(), bui.size))
.collect();
}
for (bucket, bui) in &data_usage_info.buckets_usage {
if bui.replicated_size_v1 > 0
|| bui.replication_failed_count_v1 > 0
|| bui.replication_failed_size_v1 > 0
|| bui.replication_pending_count_v1 > 0
{
if let Ok((cfg, _)) = get_replication_config(bucket).await {
if !cfg.role.is_empty() {
data_usage_info.replication_info.insert(
cfg.role.clone(),
BucketTargetUsageInfo {
replication_failed_size: bui.replication_failed_size_v1,
replication_failed_count: bui.replication_failed_count_v1,
replicated_size: bui.replicated_size_v1,
replication_pending_count: bui.replication_pending_count_v1,
replication_pending_size: bui.replication_pending_size_v1,
..Default::default()
},
);
}
}
}
}
Ok(data_usage_info)
}

View File

@@ -18,6 +18,7 @@ use std::path::Path;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use tracing::warn;
use super::data_scanner::{SizeSummary, DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS};
use super::data_usage::{BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, DATA_USAGE_ROOT};
@@ -379,6 +380,7 @@ impl DataUsageCache {
let mut retries = 0;
while retries < 5 {
let path = Path::new(BUCKET_META_PREFIX).join(name);
warn!("Loading data usage cache from backend: {}", path.display());
match store
.get_object_reader(
RUSTFS_META_BUCKET,
@@ -398,37 +400,42 @@ impl DataUsageCache {
}
break;
}
Err(err) => match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => {
match store
.get_object_reader(
RUSTFS_META_BUCKET,
name,
None,
HeaderMap::new(),
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.await
{
Ok(mut reader) => {
if let Ok(info) = Self::unmarshal(&reader.read_all().await?) {
d = info
}
break;
}
Err(_) => match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => {
Err(err) => {
warn!("Failed to load data usage cache from backend: {}", &err);
match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => {
match store
.get_object_reader(
RUSTFS_META_BUCKET,
name,
None,
HeaderMap::new(),
&ObjectOptions {
no_lock: true,
..Default::default()
},
)
.await
{
Ok(mut reader) => {
if let Ok(info) = Self::unmarshal(&reader.read_all().await?) {
d = info
}
break;
}
_ => {}
},
Err(_) => match err.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) | Some(DiskError::VolumeNotFound) => {
break;
}
_ => {}
},
}
}
_ => {
break;
}
}
_ => {}
},
}
}
retries += 1;
let dur = {
@@ -446,11 +453,14 @@ impl DataUsageCache {
let buf_clone = buf.clone();
let store_clone = store.clone();
let name_clone = name.to_string();
let name = Path::new(BUCKET_META_PREFIX).join(name).to_string_lossy().to_string();
let name_clone = name.clone();
tokio::spawn(async move {
let _ = save_config(store_clone, &format!("{}{}", &name_clone, ".bkp"), &buf_clone).await;
});
save_config(store, name, &buf).await
save_config(store, &name, &buf).await
}
pub fn replace(&mut self, path: &str, parent: &str, e: DataUsageEntry) {

View File

@@ -1,5 +1,3 @@
use std::sync::OnceLock;
use crate::endpoints::EndpointServerPools;
use crate::error::{Error, Result};
use crate::global::get_global_endpoints;
@@ -7,6 +5,8 @@ use crate::peer_rest_client::PeerRestClient;
use crate::StorageAPI;
use futures::future::join_all;
use lazy_static::lazy_static;
use madmin::{ItemState, ServerProperties};
use std::sync::OnceLock;
use tracing::error;
lazy_static! {
@@ -95,6 +95,30 @@ impl NotificationSys {
madmin::StorageInfo { disks, backend }
}
pub async fn server_info(&self) -> Vec<ServerProperties> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter() {
futures.push(async move {
if let Some(client) = client {
match client.server_info().await {
Ok(info) => info,
Err(_) => ServerProperties {
endpoint: client.host.to_string(),
state: ItemState::Offline.to_string().to_owned(),
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),
..Default::default()
},
}
} else {
ServerProperties::default()
}
});
}
join_all(futures).await
}
pub async fn reload_pool_meta(&self) {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().flatten() {

View File

@@ -1,7 +1,4 @@
use std::{collections::HashMap, io::Cursor, time::SystemTime};
use crate::{
admin_server_info::ServerProperties,
endpoints::EndpointServerPools,
global::is_dist_erasure,
heal::heal_commands::BgHealState,
@@ -13,6 +10,7 @@ use madmin::{
health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService},
metrics::RealtimeMetrics,
net::NetInfo,
ServerProperties,
};
use protos::{
node_service_time_out_client,
@@ -28,6 +26,7 @@ use protos::{
};
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize as _};
use std::{collections::HashMap, io::Cursor, time::SystemTime};
use tonic::Request;
use tracing::warn;

View File

@@ -6,8 +6,6 @@ use std::{
time::Duration,
};
use crate::config::error::is_not_found;
use crate::global::GLOBAL_MRFState;
use crate::{
bitrot::{bitrot_verify, close_bitrot_writers, new_bitrot_filereader, new_bitrot_filewriter, BitrotFileWriter},
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
@@ -52,6 +50,7 @@ use crate::{
},
xhttp,
};
use crate::{config::error::is_err_config_not_found, global::GLOBAL_MRFState};
use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation};
use crate::{file_meta::file_info_from_raw, heal::data_usage_cache::DataUsageCache};
use crate::{
@@ -1894,7 +1893,7 @@ impl SetDisks {
version_id: fi.version_id.map(|v| v.to_string()),
set_index,
pool_index,
bitrot_scan: !is_not_found(e),
bitrot_scan: !is_err_config_not_found(e),
..Default::default()
})
.await;

View File

@@ -1,9 +1,9 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use ecstore::{
config::error::is_not_found,
config::error::is_err_config_not_found,
store::ECStore,
store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader},
store_api::{ObjectIO, ObjectInfo, ObjectOptions, PutObjReader},
utils::path::dir,
StorageAPI,
};
@@ -59,7 +59,7 @@ impl ObjectStore {
match items {
Ok(items) => Result::<_, crate::Error>::Ok(items.prefixes),
Err(e) => {
if is_not_found(&e) {
if is_err_config_not_found(&e) {
Result::<_, crate::Error>::Ok(vec![])
} else {
Err(Error::StringError(format!("list {prefix} failed, err: {e:?}")))
@@ -290,7 +290,7 @@ impl Store for ObjectStore {
match self.load_mapped_policy(UserType::Sts, parent.as_str(), false).await {
Ok(m) => sts_policies.lock().await.insert(name.to_owned(), m),
Err(Error::EcstoreError(e)) if is_not_found(&e) => return Ok(()),
Err(Error::EcstoreError(e)) if is_err_config_not_found(&e) => return Ok(()),
Err(e) => return Err(e),
};
}

View File

@@ -116,7 +116,7 @@ pub struct StorageInfo {
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BackendDisks(HashMap<String, usize>);
pub struct BackendDisks(pub HashMap<String, usize>);
impl BackendDisks {
pub fn new() -> Self {
@@ -148,3 +148,177 @@ pub struct BackendInfo {
pub total_sets: Vec<usize>,
pub drives_per_set: Vec<usize>,
}
pub const ITEM_OFFLINE: &str = "offline";
pub const ITEM_INITIALIZING: &str = "initializing";
pub const ITEM_ONLINE: &str = "online";
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MemStats {
pub alloc: u64,
pub total_alloc: u64,
pub mallocs: u64,
pub frees: u64,
pub heap_alloc: u64,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ServerProperties {
pub state: String,
pub endpoint: String,
pub scheme: String,
pub uptime: u64,
pub version: String,
#[serde(rename = "commitID")]
pub commit_id: String,
pub network: HashMap<String, String>,
#[serde(rename = "drives")]
pub disks: Vec<Disk>,
#[serde(rename = "poolNumber")]
pub pool_number: i32,
#[serde(rename = "poolNumbers")]
pub pool_numbers: Vec<i32>,
pub mem_stats: MemStats,
pub max_procs: u64,
pub num_cpu: u64,
pub runtime_version: String,
pub rustfs_env_vars: HashMap<String, String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Kms {
pub status: Option<String>,
pub encrypt: Option<String>,
pub decrypt: Option<String>,
pub endpoint: Option<String>,
pub version: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Ldap {
pub status: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Status {
pub status: Option<String>,
}
pub type Audit = HashMap<String, Status>;
pub type Logger = HashMap<String, Status>;
pub type TargetIDStatus = HashMap<String, Status>;
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct Services {
pub kms: Option<Kms>, // deprecated july 2023
#[serde(rename = "kmsStatus")]
pub kms_status: Option<Vec<Kms>>,
pub ldap: Option<Ldap>,
pub logger: Option<Vec<Logger>>,
pub audit: Option<Vec<Audit>>,
pub notifications: Option<Vec<HashMap<String, Vec<TargetIDStatus>>>>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Buckets {
pub count: u64,
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Objects {
pub count: u64,
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Versions {
pub count: u64,
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct DeleteMarkers {
pub count: u64,
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Usage {
pub size: u64,
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct ErasureSetInfo {
pub id: i32,
#[serde(rename = "rawUsage")]
pub raw_usage: u64,
#[serde(rename = "rawCapacity")]
pub raw_capacity: u64,
pub usage: u64,
#[serde(rename = "objectsCount")]
pub objects_count: u64,
#[serde(rename = "versionsCount")]
pub versions_count: u64,
#[serde(rename = "deleteMarkersCount")]
pub delete_markers_count: u64,
#[serde(rename = "healDisks")]
pub heal_disks: i32,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub enum BackendType {
#[default]
#[serde(rename = "FS")]
FsType,
#[serde(rename = "Erasure")]
ErasureType,
}
#[derive(Serialize, Deserialize)]
pub struct FSBackend {
#[serde(rename = "backendType")]
pub backend_type: BackendType,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct ErasureBackend {
#[serde(rename = "backendType")]
pub backend_type: BackendType,
#[serde(rename = "onlineDisks")]
pub online_disks: usize,
#[serde(rename = "offlineDisks")]
pub offline_disks: usize,
#[serde(rename = "standardSCParity")]
pub standard_sc_parity: Option<usize>,
#[serde(rename = "rrSCParity")]
pub rr_sc_parity: Option<usize>,
#[serde(rename = "totalSets")]
pub total_sets: Vec<usize>,
#[serde(rename = "totalDrivesPerSet")]
pub drives_per_set: Vec<usize>,
}
#[derive(Serialize, Deserialize)]
pub struct InfoMessage {
pub mode: Option<String>,
pub domain: Option<Vec<String>>,
pub region: Option<String>,
#[serde(rename = "sqsARN")]
pub sqs_arn: Option<Vec<String>>,
#[serde(rename = "deploymentID")]
pub deployment_id: Option<String>,
pub buckets: Option<Buckets>,
pub objects: Option<Objects>,
pub versions: Option<Versions>,
#[serde(rename = "deletemarkers")]
pub delete_markers: Option<DeleteMarkers>,
pub usage: Option<Usage>,
pub services: Option<Services>,
pub backend: Option<ErasureBackend>,
pub servers: Option<Vec<ServerProperties>>,
pub pools: Option<std::collections::HashMap<i32, std::collections::HashMap<i32, ErasureSetInfo>>>,
}

View File

@@ -1,12 +1,14 @@
use super::router::Operation;
use crate::storage::error::to_s3_error;
use bytes::Bytes;
use ecstore::admin_server_info::get_server_info;
use ecstore::bucket::policy::action::{Action, ActionSet};
use ecstore::bucket::policy::bucket_policy::{BPStatement, BucketPolicy};
use ecstore::bucket::policy::effect::Effect;
use ecstore::bucket::policy::resource::{Resource, ResourceSet};
use ecstore::error::Error as ec_Error;
use ecstore::global::GLOBAL_ALlHealState;
use ecstore::heal::data_usage::load_data_usage_from_backend;
use ecstore::heal::heal_commands::HealOpts;
use ecstore::heal::heal_ops::new_heal_sequence;
use ecstore::metrics_realtime::{collect_local_metrics, CollectMetricsOpts, MetricType};
@@ -47,6 +49,7 @@ use tracing::{error, info, warn};
pub mod service_account;
pub mod trace;
pub mod user;
const ASSUME_ROLE_ACTION: &str = "AssumeRole";
const ASSUME_ROLE_VERSION: &str = "2011-06-15";
@@ -271,7 +274,12 @@ impl Operation for ServerInfoHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle ServerInfoHandler");
return Err(s3_error!(NotImplemented));
let info = get_server_info(true).await;
let output = serde_json::to_string(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse serverInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}
@@ -315,7 +323,19 @@ impl Operation for DataUsageInfoHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle DataUsageInfoHandler");
return Err(s3_error!(NotImplemented));
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let info = load_data_usage_from_backend(store).await.map_err(|e| {
error!("load_data_usage_from_backend failed {:?}", e);
s3_error!(InternalError, "load_data_usage_from_backend failed")
})?;
let output = serde_json::to_string(&info)
.map_err(|_e| S3Error::with_message(S3ErrorCode::InternalError, "parse DataUsageInfo failed"))?;
Ok(S3Response::new((StatusCode::OK, Body::from(output))))
}
}

View File

@@ -0,0 +1,21 @@
use http::StatusCode;
use matchit::Params;
use s3s::{s3_error, Body, S3Request, S3Response, S3Result};
use crate::admin::router::Operation;
pub struct AddUser {}
#[async_trait::async_trait]
impl Operation for AddUser {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
return Err(s3_error!(NotImplemented));
}
}
pub struct SetUserStatus {}
#[async_trait::async_trait]
impl Operation for SetUserStatus {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
return Err(s3_error!(NotImplemented));
}
}

View File

@@ -4,8 +4,9 @@ pub mod router;
use common::error::Result;
// use ecstore::global::{is_dist_erasure, is_erasure};
use handlers::service_account::{
AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount,
use handlers::{
service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount},
user,
};
use hyper::Method;
use router::{AdminOperation, S3Router};
@@ -14,19 +15,19 @@ use s3s::route::S3Route;
const ADMIN_PREFIX: &str = "/rustfs/admin";
pub fn make_admin_route() -> Result<impl S3Route> {
let mut r = S3Router::new();
let mut r: S3Router<AdminOperation> = S3Router::new();
// 1
r.insert(Method::POST, "/", AdminOperation(&handlers::AssumeRoleHandle {}))?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/accountinfo").as_str(),
AdminOperation(&handlers::AccountInfoHandler {}),
)?;
regist_user_route(&mut r)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/service").as_str(),
AdminOperation(&handlers::ServiceHandle {}),
)?;
// 1
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/info").as_str(),
@@ -42,11 +43,13 @@ pub fn make_admin_route() -> Result<impl S3Route> {
format!("{}{}", ADMIN_PREFIX, "/v3/inspect-data").as_str(),
AdminOperation(&handlers::InspectDataHandler {}),
)?;
// 1
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/storageinfo").as_str(),
AdminOperation(&handlers::StorageInfoHandler {}),
)?;
// 1
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/datausageinfo").as_str(),
@@ -58,21 +61,25 @@ pub fn make_admin_route() -> Result<impl S3Route> {
AdminOperation(&handlers::MetricsHandler {}),
)?;
// 1
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/pools/list").as_str(),
AdminOperation(&handlers::ListPools {}),
)?;
// 1
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/pools/status").as_str(),
AdminOperation(&handlers::StatusPool {}),
)?;
// todo
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/pools/decommission").as_str(),
AdminOperation(&handlers::StartDecommission {}),
)?;
// todo
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/pools/cancel").as_str(),
@@ -109,6 +116,28 @@ pub fn make_admin_route() -> Result<impl S3Route> {
)?;
// }
Ok(r)
}
fn regist_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
// 1
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/accountinfo").as_str(),
AdminOperation(&handlers::AccountInfoHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/add-user").as_str(),
AdminOperation(&user::AddUser {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/set-user-status").as_str(),
AdminOperation(&user::SetUserStatus {}),
)?;
// Service accounts
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/update-service-account").as_str(),
@@ -139,5 +168,5 @@ pub fn make_admin_route() -> Result<impl S3Route> {
AdminOperation(&AddServiceAccount {}),
)?;
Ok(r)
Ok(())
}

232
rustfs/src/admin/test.json Normal file
View File

@@ -0,0 +1,232 @@
{
"mode": "online",
"domain": null,
"region": null,
"sqsARN": null,
"deploymentID": null,
"buckets": {
"count": 0,
"error": null
},
"objects": {
"count": 0,
"error": null
},
"versions": {
"count": 0,
"error": null
},
"deletemarkers": {
"count": 0,
"error": null
},
"usage": {
"size": 0,
"error": null
},
"services": {
"kms": null,
"kmsStatus": null,
"ldap": null,
"logger": null,
"audit": null,
"notifications": null
},
"backend": {
"backendType": "Erasure",
"onlineDisks": 5,
"offlineDisks": 0,
"standardSCParity": 2,
"rrSCParity": 1,
"totalSets": [
1
],
"totalDrivesPerSet": [
5
]
},
"servers": [
{
"state": "online",
"endpoint": "127.0.0.1:9000",
"scheme": "",
"uptime": 1736146443,
"version": "",
"commitID": "",
"network": {
"127.0.0.1:9000": "online"
},
"drives": [
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test0",
"healing": false,
"scanning": true,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 0
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test1",
"healing": false,
"scanning": true,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 1
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test2",
"healing": false,
"scanning": false,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 2
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test3",
"healing": false,
"scanning": false,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 3
},
{
"endpoint": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4",
"root_disk": true,
"drive_path": "/Users/weisd/project/weisd/s3-rustfs/target/volume/test4",
"healing": false,
"scanning": false,
"state": "ok",
"uuid": "",
"major": 0,
"minor": 0,
"model": null,
"total_space": 494384795648,
"used_space": 283710812160,
"available_space": 210673983488,
"read_throughput": 0.0,
"write_throughput": 0.0,
"read_latency": 0.0,
"write_latency": 0.0,
"utilization": 57.386637828967736,
"metrics": null,
"heal_info": null,
"used_inodes": 2353357,
"free_inodes": 2057363120,
"local": true,
"pool_index": 0,
"set_index": 0,
"disk_index": 4
}
],
"poolNumber": 1,
"poolNumbers": [
1
],
"mem_stats": {
"alloc": 0,
"total_alloc": 0,
"mallocs": 0,
"frees": 0,
"heap_alloc": 0
},
"max_procs": 0,
"num_cpu": 0,
"runtime_version": "",
"rustfs_env_vars": {}
}
],
"pools": {
"0": {
"0": {
"id": 0,
"rawUsage": 1418554060800,
"rawCapacity": 2471923978240,
"usage": 0,
"objectsCount": 0,
"versionsCount": 0,
"deleteMarkersCount": 0,
"healDisks": 0
}
}
}
}