mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
feat: adjust metrics push interval to 3 seconds (#686)
- Reduce metrics push frequency from default to 3s for better performance - Optimize resource utilization during metrics collection - Improve real-time monitoring responsiveness Related to admin metrics optimization on fix/admin-metrics branch
This commit is contained in:
@@ -23,7 +23,7 @@ use rustfs_common::{
|
||||
use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};
|
||||
use rustfs_utils::os::get_drive_stats;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{
|
||||
admin_server_info::get_local_server_property,
|
||||
@@ -44,7 +44,7 @@ pub struct CollectMetricsOpts {
|
||||
pub struct MetricType(u32);
|
||||
|
||||
impl MetricType {
|
||||
// 定义一些常量
|
||||
// Define some constants
|
||||
pub const NONE: MetricType = MetricType(0);
|
||||
pub const SCANNER: MetricType = MetricType(1 << 0);
|
||||
pub const DISK: MetricType = MetricType(1 << 1);
|
||||
@@ -70,8 +70,18 @@ impl MetricType {
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect local metrics based on the specified types and options.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `types` - A `MetricType` specifying which types of metrics to collect.
|
||||
/// * `opts` - A reference to `CollectMetricsOpts` containing additional options for metric collection.
|
||||
///
|
||||
/// # Returns
|
||||
/// * A `RealtimeMetrics` struct containing the collected metrics.
|
||||
///
|
||||
pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) -> RealtimeMetrics {
|
||||
info!("collect_local_metrics");
|
||||
debug!("collect_local_metrics");
|
||||
let mut real_time_metrics = RealtimeMetrics::default();
|
||||
if types.0 == MetricType::NONE.0 {
|
||||
info!("types is None, return");
|
||||
@@ -93,13 +103,13 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts)
|
||||
}
|
||||
|
||||
if types.contains(&MetricType::DISK) {
|
||||
info!("start get disk metrics");
|
||||
debug!("start get disk metrics");
|
||||
let mut aggr = DiskMetric {
|
||||
collected_at: Utc::now(),
|
||||
..Default::default()
|
||||
};
|
||||
for (name, disk) in collect_local_disks_metrics(&opts.disks).await.into_iter() {
|
||||
info!("got disk metric, name: {name}, metric: {disk:?}");
|
||||
debug!("got disk metric, name: {name}, metric: {disk:?}");
|
||||
real_time_metrics.by_disk.insert(name, disk.clone());
|
||||
aggr.merge(&disk);
|
||||
}
|
||||
@@ -107,7 +117,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts)
|
||||
}
|
||||
|
||||
if types.contains(&MetricType::SCANNER) {
|
||||
info!("start get scanner metrics");
|
||||
debug!("start get scanner metrics");
|
||||
let metrics = globalMetrics.report().await;
|
||||
real_time_metrics.aggregated.scanner = Some(metrics);
|
||||
}
|
||||
|
||||
@@ -323,7 +323,7 @@ impl Operation for ServiceHandle {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle ServiceHandle");
|
||||
|
||||
return Err(s3_error!(NotImplemented));
|
||||
Err(s3_error!(NotImplemented))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -367,7 +367,7 @@ impl Operation for InspectDataHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle InspectDataHandler");
|
||||
|
||||
return Err(s3_error!(NotImplemented));
|
||||
Err(s3_error!(NotImplemented))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -489,7 +489,7 @@ impl Operation for DataUsageInfoHandler {
|
||||
|
||||
let mut info_for_refresh = info.clone();
|
||||
let store_for_refresh = store.clone();
|
||||
tokio::spawn(async move {
|
||||
spawn(async move {
|
||||
if let Err(e) = collect_realtime_data_usage(&mut info_for_refresh, store_for_refresh.clone()).await {
|
||||
warn!("Background data usage refresh failed: {}", e);
|
||||
return;
|
||||
@@ -641,10 +641,7 @@ impl Operation for MetricsHandler {
|
||||
let mp = extract_metrics_init_params(&req.uri);
|
||||
info!("mp: {:?}", mp);
|
||||
|
||||
let tick = match parse_duration(&mp.tick) {
|
||||
Ok(i) => i,
|
||||
Err(_) => std_Duration::from_secs(1),
|
||||
};
|
||||
let tick = parse_duration(&mp.tick).unwrap_or_else(|_| std_Duration::from_secs(3));
|
||||
|
||||
let mut n = mp.n;
|
||||
if n == 0 {
|
||||
@@ -657,28 +654,18 @@ impl Operation for MetricsHandler {
|
||||
MetricType::ALL
|
||||
};
|
||||
|
||||
let disks = mp.disks.split(",").map(String::from).collect::<Vec<String>>();
|
||||
let by_disk = mp.by_disk == "true";
|
||||
let mut disk_map = HashSet::new();
|
||||
if !disks.is_empty() && !disks[0].is_empty() {
|
||||
for d in disks.iter() {
|
||||
if !d.is_empty() {
|
||||
disk_map.insert(d.to_string());
|
||||
}
|
||||
}
|
||||
fn parse_comma_separated(s: &str) -> HashSet<String> {
|
||||
s.split(',').filter(|part| !part.is_empty()).map(String::from).collect()
|
||||
}
|
||||
|
||||
let disks = parse_comma_separated(&mp.disks);
|
||||
let by_disk = mp.by_disk == "true";
|
||||
let disk_map = disks;
|
||||
|
||||
let job_id = mp.by_job_id;
|
||||
let hosts = mp.hosts.split(",").map(String::from).collect::<Vec<String>>();
|
||||
let hosts = parse_comma_separated(&mp.hosts);
|
||||
let by_host = mp.by_host == "true";
|
||||
let mut host_map = HashSet::new();
|
||||
if !hosts.is_empty() && !hosts[0].is_empty() {
|
||||
for d in hosts.iter() {
|
||||
if !d.is_empty() {
|
||||
host_map.insert(d.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
let host_map = hosts;
|
||||
|
||||
let d_id = mp.by_dep_id;
|
||||
let mut interval = interval(tick);
|
||||
@@ -694,7 +681,7 @@ impl Operation for MetricsHandler {
|
||||
inner: ReceiverStream::new(rx),
|
||||
});
|
||||
let body = Body::from(in_stream);
|
||||
tokio::spawn(async move {
|
||||
spawn(async move {
|
||||
while n > 0 {
|
||||
info!("loop, n: {n}");
|
||||
let mut m = RealtimeMetrics::default();
|
||||
@@ -932,7 +919,7 @@ impl Operation for BackgroundHealStatusHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle BackgroundHealStatusHandler");
|
||||
|
||||
return Err(s3_error!(NotImplemented));
|
||||
Err(s3_error!(NotImplemented))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -967,7 +954,7 @@ impl Operation for GetReplicationMetricsHandler {
|
||||
}
|
||||
//return Err(s3_error!(InvalidArgument, "Invalid bucket name"));
|
||||
//Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header))
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
|
||||
Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -994,7 +981,7 @@ impl Operation for SetRemoteTargetHandler {
|
||||
};
|
||||
|
||||
store
|
||||
.get_bucket_info(bucket, &rustfs_ecstore::store_api::BucketOptions::default())
|
||||
.get_bucket_info(bucket, &BucketOptions::default())
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
@@ -1008,7 +995,7 @@ impl Operation for SetRemoteTargetHandler {
|
||||
};
|
||||
|
||||
let mut remote_target: BucketTarget = serde_json::from_slice(&body).map_err(|e| {
|
||||
tracing::error!("Failed to parse BucketTarget from body: {}", e);
|
||||
error!("Failed to parse BucketTarget from body: {}", e);
|
||||
ApiError::other(e)
|
||||
})?;
|
||||
|
||||
@@ -1120,10 +1107,7 @@ impl Operation for ListRemoteTargetHandler {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not initialized".to_string()));
|
||||
};
|
||||
|
||||
if let Err(err) = store
|
||||
.get_bucket_info(bucket, &rustfs_ecstore::store_api::BucketOptions::default())
|
||||
.await
|
||||
{
|
||||
if let Err(err) = store.get_bucket_info(bucket, &BucketOptions::default()).await {
|
||||
error!("Error fetching bucket info: {:?}", err);
|
||||
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string()))));
|
||||
}
|
||||
@@ -1152,7 +1136,7 @@ impl Operation for ListRemoteTargetHandler {
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
return Ok(S3Response::with_headers((StatusCode::OK, Body::from(json_targets)), header));
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(json_targets)), header))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1177,10 +1161,7 @@ impl Operation for RemoveRemoteTargetHandler {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not initialized".to_string()));
|
||||
};
|
||||
|
||||
if let Err(err) = store
|
||||
.get_bucket_info(bucket, &rustfs_ecstore::store_api::BucketOptions::default())
|
||||
.await
|
||||
{
|
||||
if let Err(err) = store.get_bucket_info(bucket, &BucketOptions::default()).await {
|
||||
error!("Error fetching bucket info: {:?}", err);
|
||||
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string()))));
|
||||
}
|
||||
@@ -1209,7 +1190,7 @@ impl Operation for RemoveRemoteTargetHandler {
|
||||
S3Error::with_message(S3ErrorCode::InternalError, format!("Failed to update bucket targets: {e}"))
|
||||
})?;
|
||||
|
||||
return Ok(S3Response::new((StatusCode::NO_CONTENT, Body::from("".to_string()))));
|
||||
Ok(S3Response::new((StatusCode::NO_CONTENT, Body::from("".to_string()))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1219,9 +1200,7 @@ async fn collect_realtime_data_usage(
|
||||
store: Arc<rustfs_ecstore::store::ECStore>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Get bucket list and collect basic statistics
|
||||
let buckets = store
|
||||
.list_bucket(&rustfs_ecstore::store_api::BucketOptions::default())
|
||||
.await?;
|
||||
let buckets = store.list_bucket(&BucketOptions::default()).await?;
|
||||
|
||||
info.buckets_count = buckets.len() as u64;
|
||||
info.last_update = Some(std::time::SystemTime::now());
|
||||
|
||||
@@ -137,16 +137,15 @@ impl Operation for ExportBucketMetadata {
|
||||
let conf_path = path_join_buf(&[bucket.name.as_str(), conf]);
|
||||
match conf {
|
||||
BUCKET_POLICY_CONFIG => {
|
||||
let config: rustfs_policy::policy::BucketPolicy =
|
||||
match metadata_sys::get_bucket_policy(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
let config: BucketPolicy = match metadata_sys::get_bucket_policy(&bucket.name).await {
|
||||
Ok((res, _)) => res,
|
||||
Err(e) => {
|
||||
if e == StorageError::ConfigNotFound {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
return Err(s3_error!(InternalError, "get bucket metadata failed: {e}"));
|
||||
}
|
||||
};
|
||||
let config_json =
|
||||
serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?;
|
||||
zip_writer
|
||||
|
||||
@@ -98,17 +98,11 @@ async fn validate_queue_dir(queue_dir: &str) -> S3Result<()> {
|
||||
}
|
||||
|
||||
if let Err(e) = retry_metadata(queue_dir).await {
|
||||
match e.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
return Err(s3_error!(InvalidArgument, "queue_dir does not exist"));
|
||||
}
|
||||
ErrorKind::PermissionDenied => {
|
||||
return Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied"));
|
||||
}
|
||||
_ => {
|
||||
return Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e));
|
||||
}
|
||||
}
|
||||
return match e.kind() {
|
||||
ErrorKind::NotFound => Err(s3_error!(InvalidArgument, "queue_dir does not exist")),
|
||||
ErrorKind::PermissionDenied => Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied")),
|
||||
_ => Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e)),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -153,7 +153,7 @@ impl Operation for CreateKeyHandler {
|
||||
let tags = request.tags.unwrap_or_default();
|
||||
let key_name = tags.get("name").cloned();
|
||||
|
||||
let kms_request = rustfs_kms::types::CreateKeyRequest {
|
||||
let kms_request = CreateKeyRequest {
|
||||
key_name,
|
||||
key_usage: request.key_usage.unwrap_or(KeyUsage::EncryptDecrypt),
|
||||
description: request.description,
|
||||
@@ -216,7 +216,7 @@ impl Operation for DescribeKeyHandler {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
let request = rustfs_kms::types::DescribeKeyRequest { key_id: key_id.clone() };
|
||||
let request = DescribeKeyRequest { key_id: key_id.clone() };
|
||||
|
||||
match service.describe_key(request).await {
|
||||
Ok(response) => {
|
||||
@@ -270,7 +270,7 @@ impl Operation for ListKeysHandler {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
let request = rustfs_kms::types::ListKeysRequest {
|
||||
let request = ListKeysRequest {
|
||||
limit: Some(limit),
|
||||
marker,
|
||||
status_filter: None,
|
||||
@@ -336,7 +336,7 @@ impl Operation for GenerateDataKeyHandler {
|
||||
return Err(s3_error!(InternalError, "KMS service not initialized"));
|
||||
};
|
||||
|
||||
let kms_request = rustfs_kms::GenerateDataKeyRequest {
|
||||
let kms_request = GenerateDataKeyRequest {
|
||||
key_id: request.key_id,
|
||||
key_spec: request.key_spec,
|
||||
encryption_context: request.encryption_context.unwrap_or_default(),
|
||||
|
||||
@@ -72,14 +72,11 @@ impl Operation for ConfigureKmsHandler {
|
||||
|
||||
info!("Configuring KMS with request: {:?}", configure_request);
|
||||
|
||||
let service_manager = match get_global_kms_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
let service_manager = get_global_kms_service_manager().unwrap_or_else(|| {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
});
|
||||
|
||||
// Convert request to KmsConfig
|
||||
let kms_config = configure_request.to_kms_config();
|
||||
@@ -162,14 +159,11 @@ impl Operation for StartKmsHandler {
|
||||
|
||||
info!("Starting KMS service with force: {:?}", start_request.force);
|
||||
|
||||
let service_manager = match get_global_kms_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
let service_manager = get_global_kms_service_manager().unwrap_or_else(|| {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
});
|
||||
|
||||
// Check if already running and force flag
|
||||
let current_status = service_manager.get_status().await;
|
||||
@@ -280,14 +274,11 @@ impl Operation for StopKmsHandler {
|
||||
|
||||
info!("Stopping KMS service");
|
||||
|
||||
let service_manager = match get_global_kms_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
let service_manager = get_global_kms_service_manager().unwrap_or_else(|| {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
});
|
||||
|
||||
let (success, message, status) = match service_manager.stop().await {
|
||||
Ok(()) => {
|
||||
@@ -348,14 +339,11 @@ impl Operation for GetKmsStatusHandler {
|
||||
|
||||
info!("Getting KMS service status");
|
||||
|
||||
let service_manager = match get_global_kms_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
let service_manager = get_global_kms_service_manager().unwrap_or_else(|| {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
});
|
||||
|
||||
let status = service_manager.get_status().await;
|
||||
let config = service_manager.get_config().await;
|
||||
@@ -443,14 +431,11 @@ impl Operation for ReconfigureKmsHandler {
|
||||
|
||||
info!("Reconfiguring KMS with request: {:?}", configure_request);
|
||||
|
||||
let service_manager = match get_global_kms_service_manager() {
|
||||
Some(manager) => manager,
|
||||
None => {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
}
|
||||
};
|
||||
let service_manager = get_global_kms_service_manager().unwrap_or_else(|| {
|
||||
warn!("KMS service manager not initialized, initializing now as fallback");
|
||||
// Initialize the service manager as a fallback
|
||||
rustfs_kms::init_global_kms_service_manager()
|
||||
});
|
||||
|
||||
// Convert request to KmsConfig
|
||||
let kms_config = configure_request.to_kms_config();
|
||||
|
||||
@@ -103,10 +103,10 @@ impl Operation for AssumeRoleHandle {
|
||||
|
||||
claims.insert(
|
||||
"exp".to_string(),
|
||||
serde_json::Value::Number(serde_json::Number::from(OffsetDateTime::now_utc().unix_timestamp() + exp as i64)),
|
||||
Value::Number(serde_json::Number::from(OffsetDateTime::now_utc().unix_timestamp() + exp as i64)),
|
||||
);
|
||||
|
||||
claims.insert("parent".to_string(), serde_json::Value::String(cred.access_key.clone()));
|
||||
claims.insert("parent".to_string(), Value::String(cred.access_key.clone()));
|
||||
|
||||
// warn!("AssumeRole get cred {:?}", &user);
|
||||
// warn!("AssumeRole get body {:?}", &body);
|
||||
@@ -176,7 +176,7 @@ pub fn populate_session_policy(claims: &mut HashMap<String, Value>, policy: &str
|
||||
|
||||
claims.insert(
|
||||
SESSION_POLICY_NAME.to_string(),
|
||||
serde_json::Value::String(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&policy_buf)),
|
||||
Value::String(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&policy_buf)),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -139,35 +139,35 @@ impl Operation for AddTier {
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
if let Err(err) = tier_config_mgr.add(args, force).await {
|
||||
if err.code == ERR_TIER_ALREADY_EXISTS.code {
|
||||
return Err(S3Error::with_message(
|
||||
return if err.code == ERR_TIER_ALREADY_EXISTS.code {
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameAlreadyExist".into()),
|
||||
"tier name already exists!",
|
||||
));
|
||||
))
|
||||
} else if err.code == ERR_TIER_NAME_NOT_UPPERCASE.code {
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameNotUppercase".into()),
|
||||
"tier name not uppercase!",
|
||||
));
|
||||
))
|
||||
} else if err.code == ERR_TIER_BACKEND_IN_USE.code {
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameBackendInUse!".into()),
|
||||
"tier name backend in use!",
|
||||
));
|
||||
))
|
||||
} else if err.code == ERR_TIER_CONNECT_ERR.code {
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierConnectError".into()),
|
||||
"tier connect error!",
|
||||
));
|
||||
))
|
||||
} else if err.code == ERR_TIER_INVALID_CREDENTIALS.code {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom(err.code.clone().into()), err.message.clone()));
|
||||
Err(S3Error::with_message(S3ErrorCode::Custom(err.code.clone().into()), err.message.clone()))
|
||||
} else {
|
||||
warn!("tier_config_mgr add failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierAddFailed".into()),
|
||||
format!("tier add failed. {err}"),
|
||||
));
|
||||
}
|
||||
))
|
||||
};
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
@@ -223,20 +223,20 @@ impl Operation for EditTier {
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
if let Err(err) = tier_config_mgr.edit(&tier_name, creds).await {
|
||||
if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found!"));
|
||||
return if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found!"))
|
||||
} else if err.code == ERR_TIER_MISSING_CREDENTIALS.code {
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierMissingCredentials".into()),
|
||||
"tier missing credentials!",
|
||||
));
|
||||
))
|
||||
} else {
|
||||
warn!("tier_config_mgr edit failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierEditFailed".into()),
|
||||
format!("tier edit failed. {err}"),
|
||||
));
|
||||
}
|
||||
))
|
||||
};
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
@@ -329,17 +329,17 @@ impl Operation for RemoveTier {
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
if let Err(err) = tier_config_mgr.remove(&tier_name, force).await {
|
||||
if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found."));
|
||||
return if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found."))
|
||||
} else if err.code == ERR_TIER_BACKEND_NOT_EMPTY.code {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used."));
|
||||
Err(S3Error::with_message(S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used."))
|
||||
} else {
|
||||
warn!("tier_config_mgr remove failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierRemoveFailed".into()),
|
||||
format!("tier remove failed. {err}"),
|
||||
));
|
||||
}
|
||||
))
|
||||
};
|
||||
}
|
||||
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
|
||||
@@ -43,10 +43,10 @@ impl Operation for Trace {
|
||||
let _trace_opts = extract_trace_options(&req.uri)?;
|
||||
|
||||
// let (tx, rx) = mpsc::channel(10000);
|
||||
let _perrs = match GLOBAL_Endpoints.get() {
|
||||
let _peers = match GLOBAL_Endpoints.get() {
|
||||
Some(ep) => PeerRestClient::new_clients(ep.clone()).await,
|
||||
None => (Vec::new(), Vec::new()),
|
||||
};
|
||||
return Err(s3_error!(NotImplemented));
|
||||
Err(s3_error!(NotImplemented))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ use rustfs_madmin::{
|
||||
user::{ImportIAMResult, SRSessionPolicy, SRSvcAccCreate},
|
||||
};
|
||||
use rustfs_policy::policy::action::{Action, AdminAction};
|
||||
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use s3s::{
|
||||
Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result,
|
||||
|
||||
@@ -598,7 +598,7 @@ mod tests {
|
||||
// The struct should be created successfully
|
||||
// We can't easily test internal state without exposing it,
|
||||
// but we can test it doesn't panic on creation
|
||||
assert_eq!(std::mem::size_of_val(&iam_auth), std::mem::size_of::<IAMAuth>());
|
||||
assert_eq!(size_of_val(&iam_auth), size_of::<IAMAuth>());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,13 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::version;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::version;
|
||||
|
||||
/// Update check related errors
|
||||
#[derive(Error, Debug)]
|
||||
pub enum UpdateCheckError {
|
||||
|
||||
@@ -1,3 +1,17 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use shadow_rs::shadow;
|
||||
use std::process::Command;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user