Merge branch 'fix/lifecycle' of https://github.com/rustfs/rustfs into fix/lifecycle

# Conflicts:
#	rustfs/src/storage/ecfs.rs
This commit is contained in:
likewu
2026-01-17 21:26:35 +08:00
18 changed files with 533 additions and 171 deletions

15
.vscode/launch.json vendored
View File

@@ -99,7 +99,17 @@
"name": "Debug executable target/debug/rustfs",
"type": "lldb",
"request": "launch",
"program": "${workspaceFolder}/target/debug/rustfs",
"cargo": {
"args": [
"run",
"--bin",
"rustfs",
"-j",
"1",
"--profile",
"dev"
]
},
"args": [],
"cwd": "${workspaceFolder}",
//"stopAtEntry": false,
@@ -107,7 +117,7 @@
"env": {
"RUSTFS_ACCESS_KEY": "rustfsadmin",
"RUSTFS_SECRET_KEY": "rustfsadmin",
"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
//"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
"RUSTFS_ADDRESS": ":9000",
"RUSTFS_CONSOLE_ENABLE": "true",
// "RUSTFS_OBS_TRACE_ENDPOINT": "http://127.0.0.1:4318/v1/traces", // jeager otlp http endpoint
@@ -116,6 +126,7 @@
// "RUSTFS_COMPRESS_ENABLE": "true",
"RUSTFS_CONSOLE_ADDRESS": "127.0.0.1:9001",
"RUSTFS_OBS_LOG_DIRECTORY": "./target/logs",
"RUST_LOG":"rustfs=debug,ecstore=debug,s3s=debug,iam=debug",
},
"sourceLanguages": [
"rust"

3
Cargo.lock generated
View File

@@ -7251,12 +7251,15 @@ dependencies = [
"faster-hex",
"flatbuffers",
"futures",
"futures-util",
"glob",
"google-cloud-auth",
"google-cloud-storage",
"hex-simd",
"hmac 0.13.0-rc.3",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.8.1",
"hyper-rustls 0.27.7",
"hyper-util",

View File

@@ -45,6 +45,7 @@ glob = { workspace = true }
thiserror.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
tracing.workspace = true
serde.workspace = true
time.workspace = true
@@ -53,6 +54,8 @@ serde_json.workspace = true
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
s3s.workspace = true
http.workspace = true
http-body = { workspace = true }
http-body-util.workspace = true
url.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
reed-solomon-simd = { workspace = true }

View File

@@ -18,9 +18,11 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use std::collections::HashMap;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use crate::client::{
api_error_response::http_resp_to_error_response,
@@ -61,9 +63,13 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
//defer closeResponse(resp)
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp != nil {
if resp.status() != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
if resp_status != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "")));
}
//}
Ok(())
@@ -97,8 +103,11 @@ impl TransitionClient {
.await?;
//defer closeResponse(resp)
if resp.status() != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
let resp_status = resp.status();
let h = resp.headers().clone();
if resp_status != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "")));
}
Ok(())
@@ -136,7 +145,15 @@ impl TransitionClient {
)
.await?;
let policy = String::from_utf8_lossy(&resp.body().bytes().expect("err").to_vec()).to_string();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let policy = String::from_utf8_lossy(&body_vec).to_string();
Ok(policy)
}
}

View File

@@ -18,12 +18,12 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::StatusCode;
use http::{HeaderMap, StatusCode};
use serde::{Deserialize, Serialize};
use serde::{de::Deserializer, ser::Serializer};
use std::fmt::Display;
use s3s::Body;
//use s3s::Body;
use s3s::S3ErrorCode;
const _REPORT_ISSUE: &str = "Please report this issue at https://github.com/rustfs/rustfs/issues.";
@@ -95,7 +95,8 @@ pub fn to_error_response(err: &std::io::Error) -> ErrorResponse {
}
pub fn http_resp_to_error_response(
resp: &http::Response<Body>,
resp_status: StatusCode,
h: &HeaderMap,
b: Vec<u8>,
bucket_name: &str,
object_name: &str,
@@ -104,11 +105,11 @@ pub fn http_resp_to_error_response(
let err_resp_ = quick_xml::de::from_str::<ErrorResponse>(&err_body);
let mut err_resp = ErrorResponse::default();
if err_resp_.is_err() {
match resp.status() {
match resp_status {
StatusCode::NOT_FOUND => {
if object_name == "" {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::NoSuchBucket,
message: "The specified bucket does not exist.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -116,7 +117,7 @@ pub fn http_resp_to_error_response(
};
} else {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::NoSuchKey,
message: "The specified key does not exist.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -127,7 +128,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::FORBIDDEN => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::AccessDenied,
message: "Access Denied.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -137,7 +138,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::CONFLICT => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::BucketNotEmpty,
message: "Bucket not empty.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -146,7 +147,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::PRECONDITION_FAILED => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::PreconditionFailed,
message: "Pre condition failed.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -155,13 +156,13 @@ pub fn http_resp_to_error_response(
};
}
_ => {
let mut msg = resp.status().to_string();
let mut msg = resp_status.to_string();
if err_body.len() > 0 {
msg = err_body;
}
err_resp = ErrorResponse {
status_code: resp.status(),
code: S3ErrorCode::Custom(resp.status().to_string().into()),
status_code: resp_status,
code: S3ErrorCode::Custom(resp_status.to_string().into()),
message: msg,
bucket_name: bucket_name.to_string(),
..Default::default()
@@ -171,32 +172,32 @@ pub fn http_resp_to_error_response(
} else {
err_resp = err_resp_.unwrap();
}
err_resp.status_code = resp.status();
if let Some(server_name) = resp.headers().get("Server") {
err_resp.status_code = resp_status;
if let Some(server_name) = h.get("Server") {
err_resp.server = server_name.to_str().expect("err").to_string();
}
let code = resp.headers().get("x-minio-error-code");
let code = h.get("x-minio-error-code");
if code.is_some() {
err_resp.code = S3ErrorCode::Custom(code.expect("err").to_str().expect("err").into());
}
let desc = resp.headers().get("x-minio-error-desc");
let desc = h.get("x-minio-error-desc");
if desc.is_some() {
err_resp.message = desc.expect("err").to_str().expect("err").trim_matches('"').to_string();
}
if err_resp.request_id == "" {
if let Some(x_amz_request_id) = resp.headers().get("x-amz-request-id") {
if let Some(x_amz_request_id) = h.get("x-amz-request-id") {
err_resp.request_id = x_amz_request_id.to_str().expect("err").to_string();
}
}
if err_resp.host_id == "" {
if let Some(x_amz_id_2) = resp.headers().get("x-amz-id-2") {
if let Some(x_amz_id_2) = h.get("x-amz-id-2") {
err_resp.host_id = x_amz_id_2.to_str().expect("err").to_string();
}
}
if err_resp.region == "" {
if let Some(x_amz_bucket_region) = resp.headers().get("x-amz-bucket-region") {
if let Some(x_amz_bucket_region) = h.get("x-amz-bucket-region") {
err_resp.region = x_amz_bucket_region.to_str().expect("err").to_string();
}
}

View File

@@ -19,10 +19,14 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
//use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::ready;
use http::HeaderMap;
use std::io::Cursor;
use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Read};
use tokio::io::BufReader;
use tokio_util::io::StreamReader;
use crate::client::{
api_error_response::err_invalid_argument,
@@ -30,6 +34,11 @@ use crate::client::{
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use futures_util::StreamExt;
use tokio_util::io::ReaderStream;
impl TransitionClient {
pub fn get_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result<Object, std::io::Error> {
@@ -65,11 +74,19 @@ impl TransitionClient {
)
.await?;
let resp = &resp;
let object_stat = to_object_info(bucket_name, object_name, resp.headers())?;
let b = resp.body().bytes().expect("err").to_vec();
Ok((object_stat, resp.headers().clone(), BufReader::new(Cursor::new(b))))
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
Ok((object_stat, h, BufReader::new(Cursor::new(body_vec))))
}
}

View File

@@ -29,6 +29,7 @@ use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::dto::Owner;
use std::collections::HashMap;
use http_body_util::BodyExt;
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Grantee {
@@ -83,18 +84,23 @@ impl TransitionClient {
)
.await?;
if resp.status() != http::StatusCode::OK {
let b = resp.body().bytes().expect("err").to_vec();
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, object_name)));
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
if resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, object_name)));
}
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -18,7 +18,6 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -30,8 +29,12 @@ use crate::client::{
};
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
//use s3s::Body;
use s3s::header::{X_AMZ_MAX_PARTS, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, X_AMZ_VERSION_ID};
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper::body::Body;
use http_body_util::BodyExt;
pub struct ObjectAttributesOptions {
pub max_parts: i64,
@@ -130,19 +133,12 @@ struct ObjectAttributePart {
}
impl ObjectAttributes {
pub async fn parse_response(&mut self, resp: &mut http::Response<Body>) -> Result<(), std::io::Error> {
let h = resp.headers();
pub async fn parse_response(&mut self, h: &HeaderMap, body_vec: Vec<u8>) -> Result<(), std::io::Error> {
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
self.last_modified = mod_time;
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
@@ -213,7 +209,8 @@ impl TransitionClient {
)
.await?;
let h = resp.headers();
let resp_status = resp.status();
let h = resp.headers().clone();
let has_etag = h.get("ETag").unwrap().to_str().unwrap();
if !has_etag.is_empty() {
return Err(std::io::Error::other(
@@ -221,14 +218,17 @@ impl TransitionClient {
));
}
if resp.status() != http::StatusCode::OK {
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let err_body = String::from_utf8(b).unwrap();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
if resp_status != http::StatusCode::OK {
let err_body = String::from_utf8(body_vec).unwrap();
let mut er = match quick_xml::de::from_str::<AccessControlPolicy>(&err_body) {
Ok(result) => result,
Err(err) => {
@@ -240,7 +240,7 @@ impl TransitionClient {
}
let mut oa = ObjectAttributes::new();
oa.parse_response(&mut resp).await?;
oa.parse_response(&h, body_vec).await?;
Ok(oa)
}

View File

@@ -27,11 +27,14 @@ use crate::client::{
transition_api::{ReaderImpl, RequestMetadata, TransitionClient},
};
use crate::store_api::BucketInfo;
use bytes::Bytes;
//use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use std::collections::HashMap;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
impl TransitionClient {
pub fn list_buckets(&self) -> Result<Vec<BucketInfo>, std::io::Error> {
@@ -97,18 +100,24 @@ impl TransitionClient {
},
)
.await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "")));
}
//let mut list_bucket_result = ListBucketV2Result::default();
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(b).unwrap()) {
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -17,13 +17,14 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
//use bytes::Bytes;
use http::{HeaderMap, HeaderName, StatusCode};
use s3s::S3ErrorCode;
use std::collections::HashMap;
use time::OffsetDateTime;
use tracing::warn;
use uuid::Uuid;
use hyper::body::Bytes;
use crate::client::checksum::ChecksumMode;
use crate::client::utils::base64_encode;
@@ -225,10 +226,15 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp.is_none() {
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,
@@ -287,9 +293,14 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
&p.bucket_name.clone(),
&p.object_name,
@@ -370,7 +381,8 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let b = resp.body().bytes().expect("err").to_vec();
let h = resp.headers().clone();
let complete_multipart_upload_result: CompleteMultipartUploadResult = CompleteMultipartUploadResult::default();
let (exp_time, rule_id) = if let Some(h_x_amz_expiration) = resp.headers().get(X_AMZ_EXPIRATION) {
@@ -382,7 +394,6 @@ impl TransitionClient {
(OffsetDateTime::now_utc(), "".to_string())
};
let h = resp.headers();
Ok(UploadInfo {
bucket: complete_multipart_upload_result.bucket,
key: complete_multipart_upload_result.key,

View File

@@ -479,9 +479,13 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,

View File

@@ -18,7 +18,6 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue, Method, StatusCode};
use rustfs_utils::HashAlgorithm;
use s3s::S3ErrorCode;
@@ -28,6 +27,9 @@ use std::fmt::Display;
use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime;
use tokio::sync::mpsc::{self, Receiver, Sender};
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use crate::client::utils::base64_encode;
use crate::client::{
@@ -344,8 +346,15 @@ impl TransitionClient {
)
.await?;
let body_bytes: Vec<u8> = resp.body().bytes().expect("err").to_vec();
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_bytes)), result_tx.clone());
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_vec)), result_tx.clone());
}
Ok(())
}
@@ -390,6 +399,10 @@ impl TransitionClient {
},
)
.await?;
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp.is_some() {
if resp.status() != StatusCode::NO_CONTENT {
let error_response: ErrorResponse;
@@ -426,7 +439,8 @@ impl TransitionClient {
}
_ => {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,

View File

@@ -24,12 +24,14 @@ use crate::client::{
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use bytes::Bytes;
use http::HeaderMap;
use s3s::dto::RestoreRequest;
use std::collections::HashMap;
use std::io::Cursor;
use tokio::io::BufReader;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
const TIER_STANDARD: &str = "Standard";
const TIER_BULK: &str = "Bulk";
@@ -107,9 +109,19 @@ impl TransitionClient {
)
.await?;
let b = resp.body().bytes().expect("err").to_vec();
if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, "")));
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
if resp_status != http::StatusCode::ACCEPTED && resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "")));
}
Ok(())
}

View File

@@ -18,13 +18,15 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use std::{collections::HashMap, str::FromStr};
use tokio::io::BufReader;
use tracing::warn;
use uuid::Uuid;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use crate::client::{
api_error_response::{ErrorResponse, err_invalid_argument, http_resp_to_error_response},
@@ -66,10 +68,20 @@ impl TransitionClient {
return Ok(false);
}
let b = resp.body().bytes().expect("err").to_vec();
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
warn!("bucket exists, resp: {:?}, resperr: {:?}", resp, resperr);
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
warn!("bucket exists, resperr: {:?}", resperr);
/*if to_error_response(resperr).code == "NoSuchBucket" {
return Ok(false);
}
@@ -108,10 +120,20 @@ impl TransitionClient {
match resp {
Ok(resp) => {
let b = resp.body().bytes().expect("get bucket versioning err").to_vec();
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
warn!("get bucket versioning, resp: {:?}, resperr: {:?}", resp, resperr);
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
warn!("get bucket versioning, resperr: {:?}", resperr);
Ok(VersioningConfiguration::default())
}

View File

@@ -28,9 +28,12 @@ use http::Request;
use hyper::StatusCode;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
use s3s::S3ErrorCode;
use std::collections::HashMap;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use hyper::body::Incoming;
#[derive(Debug, Clone)]
pub struct BucketLocationCache {
@@ -86,7 +89,7 @@ impl TransitionClient {
Ok(location)
}
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<Body>, std::io::Error> {
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<s3s::Body>, std::io::Error> {
let mut url_values = HashMap::new();
url_values.insert("location".to_string(), "".to_string());
@@ -120,7 +123,7 @@ impl TransitionClient {
url_str = target_url.to_string();
}
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(Body::empty()) else {
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(s3s::Body::empty()) else {
return Err(std::io::Error::other("create request error"));
};
@@ -172,13 +175,16 @@ impl TransitionClient {
}
async fn process_bucket_location_response(
mut resp: http::Response<Body>,
mut resp: http::Response<Incoming>,
bucket_name: &str,
tier_type: &str,
) -> Result<String, std::io::Error> {
//if resp != nil {
if resp.status() != StatusCode::OK {
let err_resp = http_resp_to_error_response(&resp, vec![], bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
let err_resp = http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "");
match err_resp.code {
S3ErrorCode::NotImplemented => {
match err_resp.server.as_str() {
@@ -208,18 +214,20 @@ async fn process_bucket_location_response(
}
//}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut location = "".to_string();
if tier_type == "huaweicloud" {
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(b).unwrap()).unwrap();
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(body_vec).unwrap()).unwrap();
location = d.location_constraint;
} else {
if let Ok(LocationConstraint { field }) = quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(b).unwrap()) {
if let Ok(LocationConstraint { field }) = quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(body_vec).unwrap()) {
location = field;
}
}

View File

@@ -118,38 +118,51 @@ pub fn new_getobjectreader<'a>(
let mut is_encrypted = false;
let is_compressed = false; //oi.is_compressed_ok();
let mut rs_ = None;
let rs_;
if rs.is_none() && opts.part_number.is_some() && opts.part_number.unwrap() > 0 {
rs_ = part_number_to_rangespec(oi.clone(), opts.part_number.unwrap());
} else {
rs_ = rs.clone();
}
let mut get_fn: ObjReaderFn;
let (off, length) = match rs_.unwrap().get_offset_length(oi.size) {
Ok(x) => x,
Err(err) => {
return Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: err.to_string(),
key: None,
bucket_name: None,
region: None,
request_id: None,
host_id: "".to_string(),
});
}
};
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
//Box::pin({
let r = GetObjectReader {
object_info: oi.clone(),
stream: Box::new(input_reader),
if let Some(rs_) = rs_ {
let (off, length) = match rs_.get_offset_length(oi.size) {
Ok(x) => x,
Err(err) => {
return Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: err.to_string(),
key: None,
bucket_name: None,
region: None,
request_id: None,
host_id: "".to_string(),
});
}
};
r
//})
});
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
//Box::pin({
let r = GetObjectReader {
object_info: oi.clone(),
stream: Box::new(input_reader),
};
r
//})
});
Ok((get_fn, off as i64, length as i64))
return Ok((get_fn, off as i64, length as i64));
}
Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: "Invalid range".to_string(),
key: Some(oi.name.clone()),
bucket_name: Some(oi.bucket.clone()),
region: Some("".to_string()),
request_id: None,
host_id: "".to_string(),
})
}
/// Convert a raw stored ETag into the strongly-typed `s3s::dto::ETag`.

View File

@@ -30,9 +30,10 @@ use crate::client::{
},
constants::{UNSIGNED_PAYLOAD, UNSIGNED_PAYLOAD_TRAILER},
credentials::{CredContext, Credentials, SignatureType, Static},
api_error_response::ErrorResponse,
};
use crate::{client::checksum::ChecksumMode, store_api::GetObjectReader};
use bytes::Bytes;
//use bytes::Bytes;
use futures::{Future, StreamExt};
use http::{HeaderMap, HeaderName};
use http::{
@@ -55,7 +56,7 @@ use rustfs_utils::{
};
use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
use s3s::{Body, dto::Owner};
use s3s::dto::Owner;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::io::Cursor;
@@ -72,6 +73,10 @@ use tokio::io::BufReader;
use tracing::{debug, error, warn};
use url::{Url, form_urlencoded};
use uuid::Uuid;
use http_body::Body;
use hyper::body::Incoming;
use hyper::body::Bytes;
use http_body_util::BodyExt;
const C_USER_AGENT: &str = "RustFS (linux; x86)";
@@ -95,7 +100,7 @@ pub struct TransitionClient {
pub creds_provider: Arc<Mutex<Credentials<Static>>>,
pub override_signer_type: SignatureType,
pub secure: bool,
pub http_client: Client<HttpsConnector<HttpConnector>, Body>,
pub http_client: Client<HttpsConnector<HttpConnector>, s3s::Body>,
pub bucket_loc_cache: Arc<Mutex<BucketLocationCache>>,
pub is_trace_enabled: Arc<Mutex<bool>>,
pub trace_errors_only: Arc<Mutex<bool>>,
@@ -248,7 +253,7 @@ impl TransitionClient {
todo!();
}
fn dump_http(&self, req: &http::Request<Body>, resp: &http::Response<Body>) -> Result<(), std::io::Error> {
fn dump_http(&self, req: &http::Request<s3s::Body>, resp: &http::Response<Incoming>) -> Result<(), std::io::Error> {
let mut resp_trace: Vec<u8>;
//info!("{}{}", self.trace_output, "---------BEGIN-HTTP---------");
@@ -257,7 +262,7 @@ impl TransitionClient {
Ok(())
}
pub async fn doit(&self, req: http::Request<Body>) -> Result<http::Response<Body>, std::io::Error> {
pub async fn doit(&self, req: http::Request<s3s::Body>) -> Result<http::Response<Incoming>, std::io::Error> {
let req_method;
let req_uri;
let req_headers;
@@ -273,8 +278,7 @@ impl TransitionClient {
resp = http_client.request(req);
}
let resp = resp
.await /*.map_err(Into::into)*/
.map(|res| res.map(Body::from));
.await;
debug!("http_client url: {} {}", req_method, req_uri);
debug!("http_client headers: {:?}", req_headers);
if let Err(err) = resp {
@@ -282,7 +286,7 @@ impl TransitionClient {
return Err(std::io::Error::other(err));
}
let mut resp = resp.unwrap();
let resp = resp.unwrap();
debug!("http_resp: {:?}", resp);
//let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
@@ -291,23 +295,26 @@ impl TransitionClient {
//if self.is_trace_enabled && !(self.trace_errors_only && resp.status() == StatusCode::OK) {
if resp.status() != StatusCode::OK {
//self.dump_http(&cloned_req, &resp)?;
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
warn!("err_body: {}", String::from_utf8(b).unwrap());
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
warn!("err_body: {}", String::from_utf8(body_vec).unwrap());
Err(std::io::Error::other("http_client call error."))
} else {
Ok(resp)
}
Ok(resp)
}
pub async fn execute_method(
&self,
method: http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Response<Body>, std::io::Error> {
) -> Result<http::Response<Incoming>, std::io::Error> {
if self.is_offline() {
let mut s = self.endpoint_url.to_string();
s.push_str(" is offline.");
@@ -317,7 +324,7 @@ impl TransitionClient {
let retryable: bool;
//let mut body_seeker: BufferReader;
let mut req_retry = self.max_retries;
let mut resp: http::Response<Body>;
let mut resp: http::Response<Incoming>;
//if metadata.content_body != nil {
//body_seeker = BufferReader::new(metadata.content_body.read_all().await?);
@@ -339,13 +346,18 @@ impl TransitionClient {
}
}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut err_response = http_resp_to_error_response(&resp, b.clone(), &metadata.bucket_name, &metadata.object_name);
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut err_response = http_resp_to_error_response(resp_status, &h, body_vec.clone(), &metadata.bucket_name, &metadata.object_name);
err_response.message = format!("remote tier error: {}", err_response.message);
if self.region == "" {
@@ -381,7 +393,7 @@ impl TransitionClient {
continue;
}
if is_http_status_retryable(&resp.status()) {
if is_http_status_retryable(&resp_status) {
continue;
}
@@ -395,7 +407,7 @@ impl TransitionClient {
&self,
method: &http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Request<Body>, std::io::Error> {
) -> Result<http::Request<s3s::Body>, std::io::Error> {
let mut location = metadata.bucket_location.clone();
if location == "" && metadata.bucket_name != "" {
location = self.get_bucket_location(&metadata.bucket_name).await?;
@@ -415,7 +427,7 @@ impl TransitionClient {
let Ok(mut req) = Request::builder()
.method(method)
.uri(target_url.to_string())
.body(Body::empty())
.body(s3s::Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -527,10 +539,10 @@ impl TransitionClient {
if metadata.content_length > 0 {
match &mut metadata.content_body {
ReaderImpl::Body(content_body) => {
*req.body_mut() = Body::from(content_body.clone());
*req.body_mut() = s3s::Body::from(content_body.clone());
}
ReaderImpl::ObjectBody(content_body) => {
*req.body_mut() = Body::from(content_body.read_all().await?);
*req.body_mut() = s3s::Body::from(content_body.read_all().await?);
}
}
}
@@ -538,7 +550,7 @@ impl TransitionClient {
Ok(req)
}
pub fn set_user_agent(&self, req: &mut Request<Body>) {
pub fn set_user_agent(&self, req: &mut Request<s3s::Body>) {
let headers = req.headers_mut();
headers.insert("User-Agent", C_USER_AGENT.parse().expect("err"));
}
@@ -976,25 +988,222 @@ impl Default for UploadInfo {
}
}
/// Convert HTTP headers to ObjectInfo struct
/// This function parses various S3 response headers to construct an ObjectInfo struct
/// containing metadata about an S3 object.
pub fn to_object_info(bucket_name: &str, object_name: &str, h: &HeaderMap) -> Result<ObjectInfo, std::io::Error> {
todo!()
// Helper function to get header value as string
let get_header = |name: &str| -> String {
h.get(name)
.and_then(|val| val.to_str().ok())
.unwrap_or("")
.to_string()
};
// Get and process the ETag
let etag = {
let etag_raw = get_header("ETag");
// Remove surrounding quotes if present (trimming ETag)
let trimmed = etag_raw.trim_start_matches('"').trim_end_matches('"');
Some(trimmed.to_string())
};
// Parse content length if it exists
let size = {
let content_length_str = get_header("Content-Length");
if !content_length_str.is_empty() {
content_length_str
.parse::<i64>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Content-Length is not an integer"))?
} else {
-1
}
};
// Parse Last-Modified time
let mod_time = {
let last_modified_str = get_header("Last-Modified");
if !last_modified_str.is_empty() {
// Parse HTTP date format (RFC 7231)
// Using time crate to parse HTTP dates
let parsed_time = OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc2822)
.or_else(|_| OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc3339))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Last-Modified time format is invalid"))?;
Some(parsed_time)
} else {
Some(OffsetDateTime::now_utc())
}
};
// Get content type
let content_type = {
let content_type_raw = get_header("Content-Type");
let content_type_trimmed = content_type_raw.trim();
if content_type_trimmed.is_empty() {
Some("application/octet-stream".to_string())
} else {
Some(content_type_trimmed.to_string())
}
};
// Parse Expires time
let expiration = {
let expiry_str = get_header("Expires");
if !expiry_str.is_empty() {
OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc2822)
.or_else(|_| OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc3339))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "'Expires' is not in supported format"))?
} else {
OffsetDateTime::now_utc()
}
};
// Extract user metadata (headers prefixed with "X-Amz-Meta-")
let user_metadata = {
let mut meta = HashMap::new();
for (name, value) in h.iter() {
let header_name = name.as_str().to_lowercase();
if header_name.starts_with("x-amz-meta-") {
let key = header_name.strip_prefix("x-amz-meta-").unwrap().to_string();
if let Ok(value_str) = value.to_str() {
meta.insert(key, value_str.to_string());
}
}
}
meta
};
let user_tag = {
let user_tag_str = get_header("X-Amz-Tagging");
user_tag_str
};
// Extract user tags count
let user_tag_count = {
let count_str = get_header("x-amz-tagging-count");
if !count_str.is_empty() {
count_str
.parse::<usize>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "x-amz-tagging-count is not an integer"))?
} else {
0
}
};
// Handle restore info
let restore = {
let restore_hdr = get_header("x-amz-restore");
if !restore_hdr.is_empty() {
// Simplified restore header parsing - in real implementation, this would parse the specific format
// "ongoing-request=\"true\"" or "ongoing-request=\"false\", expiry-date=\"..."
let ongoing_restore = restore_hdr.contains("ongoing-request=\"true\"");
RestoreInfo {
ongoing_restore,
expiry_time: if ongoing_restore {
OffsetDateTime::now_utc()
} else {
// Try to extract expiry date from the header
// This is simplified - real parsing would be more complex
OffsetDateTime::now_utc()
},
}
} else {
RestoreInfo::default()
}
};
// Extract version ID
let version_id = {
let version_id_str = get_header("x-amz-version-id");
if !version_id_str.is_empty() {
Some(Uuid::parse_str(&version_id_str).unwrap_or_else(|_| Uuid::nil()))
} else {
None
}
};
// Check if it's a delete marker
let is_delete_marker = get_header("x-amz-delete-marker") == "true";
// Get replication status
let replication_status = {
let status_str = get_header("x-amz-replication-status");
ReplicationStatus::from_static(match status_str.as_str() {
"COMPLETE" => ReplicationStatus::COMPLETE,
"PENDING" => ReplicationStatus::PENDING,
"FAILED" => ReplicationStatus::FAILED,
"REPLICA" => ReplicationStatus::REPLICA,
_ => ReplicationStatus::PENDING,
})
};
// Extract expiration rule ID and time (simplified)
let (expiration_time, expiration_rule_id) = {
// In a real implementation, this would parse the x-amz-expiration header
// which typically has format: "expiry-date="Fri, 11 Dec 2020 00:00:00 GMT", rule-id="myrule""
let exp_header = get_header("x-amz-expiration");
if !exp_header.is_empty() {
// Simplified parsing - real implementation would be more thorough
(OffsetDateTime::now_utc(), exp_header) // Placeholder
} else {
(OffsetDateTime::now_utc(), "".to_string())
}
};
// Extract checksums
let checksum_crc32 = get_header("x-amz-checksum-crc32");
let checksum_crc32c = get_header("x-amz-checksum-crc32c");
let checksum_sha1 = get_header("x-amz-checksum-sha1");
let checksum_sha256 = get_header("x-amz-checksum-sha256");
let checksum_crc64nvme = get_header("x-amz-checksum-crc64nvme");
let checksum_mode = get_header("x-amz-checksum-mode");
// Build and return the ObjectInfo struct
Ok(ObjectInfo {
etag,
name: object_name.to_string(),
mod_time,
size,
content_type,
metadata: h.clone(),
user_metadata,
user_tags: "".to_string(), // Tags would need separate parsing
user_tag_count,
owner: Owner::default(),
storage_class: get_header("x-amz-storage-class"),
is_latest: true, // Would be determined by versioning settings
is_delete_marker,
version_id,
replication_status,
replication_ready: false, // Would be computed based on status
expiration: expiration_time,
expiration_rule_id,
num_versions: 1, // Would be determined by versioning
restore,
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
checksum_crc64nvme,
checksum_mode,
})
}
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
//#[derive(Clone)]
pub struct SendRequest {
inner: hyper::client::conn::http1::SendRequest<Body>,
inner: hyper::client::conn::http1::SendRequest<s3s::Body>,
}
impl From<hyper::client::conn::http1::SendRequest<Body>> for SendRequest {
fn from(inner: hyper::client::conn::http1::SendRequest<Body>) -> Self {
impl From<hyper::client::conn::http1::SendRequest<s3s::Body>> for SendRequest {
fn from(inner: hyper::client::conn::http1::SendRequest<s3s::Body>) -> Self {
Self { inner }
}
}
impl tower::Service<Request<Body>> for SendRequest {
type Response = Response<Body>;
impl tower::Service<Request<s3s::Body>> for SendRequest {
type Response = Response<Incoming>;
type Error = std::io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
@@ -1002,13 +1211,13 @@ impl tower::Service<Request<Body>> for SendRequest {
self.inner.poll_ready(cx).map_err(std::io::Error::other)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<s3s::Body>) -> Self::Future {
//let req = hyper::Request::builder().uri("/").body(http_body_util::Empty::<Bytes>::new()).unwrap();
//let req = hyper::Request::builder().uri("/").body(Body::empty()).unwrap();
let fut = self.inner.send_request(req);
Box::pin(async move { fut.await.map_err(std::io::Error::other).map(|res| res.map(Body::from)) })
Box::pin(async move { fut.await.map_err(std::io::Error::other) })
}
}

View File

@@ -4993,8 +4993,10 @@ impl StorageAPI for SetDisks {
oi = ObjectInfo::from_file_info(&actual_fi, bucket, object, opts.versioned || opts.version_suspended);
let ropts = put_restore_opts(bucket, object, &opts.transition.restore_request, &oi).await?;
if oi.parts.len() == 1 {
let mut opts = opts.clone();
opts.part_number = Some(1);
let rs: Option<HTTPRangeSpec> = None;
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, opts).await;
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, &opts).await;
if let Err(err) = gr {
return set_restore_header_fn(&mut oi, Some(to_object_err(err.into(), vec![bucket, object]))).await;
}