This commit is contained in:
likewu
2026-01-17 21:16:20 +08:00
parent e15c619ed5
commit e68f791b1a
2 changed files with 75 additions and 127 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::{heal::storage::ECStoreHealStorage, init_heal_manager};
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
bucket::metadata_sys,
@@ -22,7 +23,6 @@ use rustfs_ecstore::{
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use rustfs_ahm::{heal::storage::ECStoreHealStorage, init_heal_manager};
use rustfs_scanner::scanner::init_data_scanner;
use serial_test::serial;
use std::{
@@ -351,7 +351,7 @@ mod serial_tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial]
//#[ignore]
#[ignore]
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;

View File

@@ -1071,38 +1071,37 @@ impl S3 for FS {
version_id,
..
} = req.input.clone();
let rreq = rreq.unwrap();
/*if let Err(e) = un_escape_path(object) {
warn!("post restore object failed, e: {:?}", e);
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}*/
let rreq = rreq.ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::Custom("ErrValidRestoreObject".into()),
"restore request is required",
)
})?;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
/*if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}*/
let version_id_str = version_id.unwrap_or_default();
let opts = post_restore_opts(&version_id_str, &bucket, &object)
.await
.map_err(|_| {
S3Error::with_message(
S3ErrorCode::Custom("ErrPostRestoreOpts".into()),
"restore object failed.",
)
})?;
/*if req.content_length <= 0 {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}*/
let Ok(opts) = post_restore_opts(&version_id.unwrap(), &bucket, &object).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrPostRestoreOpts".into()),
"restore object failed.",
));
};
let Ok(mut obj_info) = store.get_object_info(&bucket, &object, &opts).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
"restore object failed.",
));
};
let mut obj_info = store.get_object_info(&bucket, &object, &opts).await
.map_err(|_| {
S3Error::with_message(
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
"restore object failed.",
)
})?;
// Check if object is in a transitioned state
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrInvalidTransitionedState".into()),
@@ -1110,37 +1109,38 @@ impl S3 for FS {
));
}
//let mut api_err;
let mut _status_code = StatusCode::OK;
let mut already_restored = false;
if let Err(_err) = rreq.validate(store.clone()) {
//api_err = to_api_err(ErrMalformedXML);
//api_err.description = err.to_string();
return Err(S3Error::with_message(
// Validate restore request
rreq.validate(store.clone()).map_err(|_| {
S3Error::with_message(
S3ErrorCode::Custom("ErrValidRestoreObject".into()),
"restore object failed",
"restore object validation failed",
)
})?;
// Check if restore is already in progress
if obj_info.restore_ongoing && (rreq.type_.as_ref().map_or(true, |t| t.as_str() != "SELECT")) {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"restore object failed.",
));
} else {
if obj_info.restore_ongoing && (rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT") {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"restore object failed.",
));
}
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
_status_code = StatusCode::ACCEPTED;
}
let mut already_restored = false;
if let Some(restore_expires) = obj_info.restore_expires {
if !obj_info.restore_ongoing && restore_expires.unix_timestamp() != 0 {
already_restored = true;
}
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap());
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap_or(&1));
let mut metadata = obj_info.user_defined.clone();
let mut header = HeaderMap::new();
let obj_info_ = obj_info.clone();
if rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT" {
if rreq.type_.as_ref().map_or(true, |t| t.as_str() != "SELECT") {
obj_info.metadata_only = true;
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap().to_string());
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap_or(1).to_string());
metadata.insert(AMZ_RESTORE_REQUEST_DATE.to_string(), OffsetDateTime::now_utc().format(&Rfc3339).unwrap());
if already_restored {
metadata.insert(
@@ -1162,7 +1162,8 @@ impl S3 for FS {
);
}
obj_info.user_defined = metadata;
if let Err(_err) = store
store
.clone()
.copy_object(
&bucket,
@@ -1181,12 +1182,13 @@ impl S3 for FS {
},
)
.await
{
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrCopyObject".into()),
"restore object failed",
));
}
.map_err(|_| {
S3Error::with_message(
S3ErrorCode::Custom("ErrCopyObject".into()),
"restore object failed",
)
})?;
if already_restored {
let output = RestoreObjectOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
@@ -1196,11 +1198,11 @@ impl S3 for FS {
}
}
let restore_object = Uuid::new_v4().to_string();
//if let Some(rreq) = rreq {
// Handle output location for SELECT requests
if let Some(output_location) = &rreq.output_location {
if let Some(s3) = &output_location.s3 {
if !s3.bucket_name.is_empty() {
let restore_object = Uuid::new_v4().to_string();
header.insert(
X_AMZ_RESTORE_OUTPUT_PATH,
format!("{}{}{}", s3.bucket_name, s3.prefix, restore_object).parse().unwrap(),
@@ -1208,86 +1210,32 @@ impl S3 for FS {
}
}
}
//}
/*send_event(EventArgs {
event_name: event::ObjectRestorePost,
bucket_name: bucket,
object: obj_info,
req_params: extract_req_params(r),
user_agent: req.user_agent(),
host: handlers::get_source_ip(r),
});*/
// Spawn restoration task in the background
let store_clone = store.clone();
let bucket_clone = bucket.clone();
let object_clone = object.clone();
let rreq_clone = rreq.clone();
let obj_info_clone = obj_info_.clone();
tokio::spawn(async move {
/*if rreq.select_parameters.is_some() {
let actual_size = obj_info_.get_actual_size();
if actual_size.is_err() {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
let object_rsc = s3select.new_object_read_seek_closer(
|offset: i64| -> (ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return get_transitioned_object_reader(bucket, object, rs, r.Header,
obj_info, ObjectOptions {version_id: obj_info_.version_id});
},
actual_size.unwrap(),
);
if err = rreq.select_parameters.open(object_rsc); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
let encoded_error_response = encodeResponse(APIErrorResponse {
code: serr.ErrorCode(),
message: serr.ErrorMessage(),
bucket_name: bucket,
key: object,
resource: r.URL.Path,
request_id: w.Header().Get(xhttp.AmzRequestID),
host_id: globalDeploymentID(),
});
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
} else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
return Ok(());
}
let nr = httptest.NewRecorder();
let rw = xhttp.NewResponseRecorder(nr);
rw.log_err_body = true;
rw.log_all_body = true;
rreq.select_parameters.evaluate(rw);
rreq.select_parameters.Close();
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}*/
let opts = ObjectOptions {
transition: TransitionOptions {
restore_request: rreq,
restore_request: rreq_clone,
restore_expiry,
..Default::default()
},
version_id: obj_info_.version_id.map(|e| e.to_string()),
version_id: obj_info_clone.version_id.map(|e| e.to_string()),
..Default::default()
};
if let Err(err) = store.clone().restore_transitioned_object(&bucket, &object, &opts).await {
warn!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string());
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrRestoreTransitionedObject".into()),
format!("unable to restore transitioned bucket/object {bucket}/{object}: {err}"),
));
if let Err(err) = store_clone.restore_transitioned_object(&bucket_clone, &object_clone, &opts).await {
warn!("unable to restore transitioned bucket/object {}/{}: {}", bucket_clone, object_clone, err.to_string());
// Note: Errors from background tasks cannot be returned to client
// Consider adding to monitoring/metrics system
} else {
info!("successfully restored transitioned object: {}/{}", bucket_clone, object_clone);
}
/*send_event(EventArgs {
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
});*/
Ok(())
});
let output = RestoreObjectOutput {