mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
fix(admin): propagate heal handler background errors (#2124)
This commit is contained in:
@@ -117,6 +117,25 @@ pub fn register_heal_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct HealResp {
|
||||
resp_bytes: Vec<u8>,
|
||||
api_err: Option<String>,
|
||||
}
|
||||
|
||||
fn map_heal_response(result: Option<HealResp>) -> S3Result<(StatusCode, Vec<u8>)> {
|
||||
match result {
|
||||
Some(result) => {
|
||||
if let Some(err) = result.api_err {
|
||||
return Err(s3_error!(InternalError, "{err}"));
|
||||
}
|
||||
|
||||
Ok((StatusCode::OK, result.resp_bytes))
|
||||
}
|
||||
None => Err(s3_error!(InternalError, "heal channel closed unexpectedly")),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HealHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -137,12 +156,6 @@ impl Operation for HealHandler {
|
||||
let hip = extract_heal_init_params(&bytes, &req.uri, params)?;
|
||||
info!("body: {:?}", hip);
|
||||
|
||||
#[derive(Default)]
|
||||
struct HealResp {
|
||||
resp_bytes: Vec<u8>,
|
||||
api_err: Option<String>,
|
||||
}
|
||||
|
||||
let heal_path = path_join(&[PathBuf::from(hip.bucket.clone()), PathBuf::from(hip.obj_prefix.clone())]);
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
|
||||
@@ -154,6 +167,7 @@ impl Operation for HealHandler {
|
||||
spawn(async move {
|
||||
match rustfs_common::heal_channel::query_heal_status(heal_path_str, client_token).await {
|
||||
Ok(_) => {
|
||||
// TODO: Get actual response from channel
|
||||
let _ = tx_clone
|
||||
.send(HealResp {
|
||||
resp_bytes: vec![],
|
||||
@@ -164,7 +178,7 @@ impl Operation for HealHandler {
|
||||
Err(e) => {
|
||||
let _ = tx_clone
|
||||
.send(HealResp {
|
||||
api_err: Some(e),
|
||||
api_err: Some(format!("query heal status failed: {e}")),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
@@ -178,6 +192,7 @@ impl Operation for HealHandler {
|
||||
spawn(async move {
|
||||
match rustfs_common::heal_channel::cancel_heal_task(heal_path_str).await {
|
||||
Ok(_) => {
|
||||
// TODO: Get actual response from channel
|
||||
let _ = tx_clone
|
||||
.send(HealResp {
|
||||
resp_bytes: vec![],
|
||||
@@ -188,7 +203,7 @@ impl Operation for HealHandler {
|
||||
Err(e) => {
|
||||
let _ = tx_clone
|
||||
.send(HealResp {
|
||||
api_err: Some(e),
|
||||
api_err: Some(format!("cancel heal task failed: {e}")),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
@@ -225,7 +240,7 @@ impl Operation for HealHandler {
|
||||
// Error - send error response
|
||||
let _ = tx_clone
|
||||
.send(HealResp {
|
||||
api_err: Some(e),
|
||||
api_err: Some(format!("send heal request failed: {e}")),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
@@ -234,15 +249,8 @@ impl Operation for HealHandler {
|
||||
});
|
||||
}
|
||||
|
||||
match rx.recv().await {
|
||||
Some(result) => {
|
||||
if let Some(api_err) = result.api_err {
|
||||
return Err(s3_error!(InternalError, "{api_err}"));
|
||||
}
|
||||
Ok(S3Response::new((StatusCode::OK, Body::from(result.resp_bytes))))
|
||||
}
|
||||
None => Ok(S3Response::new((StatusCode::INTERNAL_SERVER_ERROR, Body::from(vec![])))),
|
||||
}
|
||||
let (status, body) = map_heal_response(rx.recv().await)?;
|
||||
Ok(S3Response::new((status, Body::from(body))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,11 +268,15 @@ impl Operation for BackgroundHealStatusHandler {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::extract_heal_init_params;
|
||||
use super::{HealResp, map_heal_response};
|
||||
use bytes::Bytes;
|
||||
use http::StatusCode;
|
||||
use http::Uri;
|
||||
use matchit::Router;
|
||||
use rustfs_common::heal_channel::HealOpts;
|
||||
use s3s::S3ErrorCode;
|
||||
use serde_json::json;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
|
||||
#[test]
|
||||
@@ -338,4 +350,35 @@ mod tests {
|
||||
let s: HealOpts = serde_urlencoded::from_bytes(b).unwrap();
|
||||
debug!("Parsed HealOpts: {:?}", s);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_map_heal_response_propagates_errors() {
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
drop(tx);
|
||||
let result = map_heal_response(rx.recv().await);
|
||||
assert!(result.is_err());
|
||||
assert_eq!(result.unwrap_err().code(), &S3ErrorCode::InternalError);
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
tx.send(HealResp {
|
||||
resp_bytes: vec![],
|
||||
api_err: Some("channel failed".to_string()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let result = map_heal_response(rx.recv().await);
|
||||
assert!(result.is_err());
|
||||
assert_eq!(result.unwrap_err().code(), &S3ErrorCode::InternalError);
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
let _ = tx
|
||||
.send(HealResp {
|
||||
resp_bytes: vec![1, 2, 3],
|
||||
api_err: None,
|
||||
})
|
||||
.await;
|
||||
let result = map_heal_response(rx.recv().await).expect("heal response should be successful");
|
||||
assert_eq!(result.0, StatusCode::OK);
|
||||
assert_eq!(result.1, vec![1, 2, 3]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user