Merge pull request #47 from rustfs/feature-up/ilm

Feature up/ilm
This commit is contained in:
loverustfs
2025-07-04 22:50:35 +08:00
committed by GitHub
10 changed files with 136 additions and 182 deletions

View File

@@ -292,8 +292,6 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
//defer closeResponse(resp)
//if resp.is_none() {
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp,
@@ -361,13 +359,13 @@ impl TransitionClient {
bucket_name: bucket_name.to_string(),
object_name: object_name.to_string(),
query_values: url_values,
custom_header: headers,
content_body: ReaderImpl::Body(complete_multipart_upload_buffer),
content_length: 100, //complete_multipart_upload_bytes.len(),
content_sha256_hex: "".to_string(), //hex_simd::encode_to_string(complete_multipart_upload_bytes, hex_simd::AsciiCase::Lower),
custom_header: headers,
content_md5_base64: "".to_string(),
stream_sha256: Default::default(),
trailer: Default::default(),
content_md5_base64: "".to_string(),
pre_sign_url: Default::default(),
add_crc: Default::default(),
extra_pre_sign_header: Default::default(),

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -124,9 +123,11 @@ impl TransitionClient {
url_str = target_url.to_string();
}
let mut req_builder = Request::builder().method(http::Method::GET).uri(url_str);
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(Body::empty()) else {
return Err(std::io::Error::other("create request error"));
};
self.set_user_agent(&mut req_builder);
self.set_user_agent(&mut req);
let value;
{
@@ -153,22 +154,12 @@ impl TransitionClient {
}
if signer_type == SignatureType::SignatureAnonymous {
let req = match req_builder.body(Body::empty()) {
Ok(req) => return Ok(req),
Err(err) => {
return Err(std::io::Error::other(err));
}
};
return Ok(req);
}
if signer_type == SignatureType::SignatureV2 {
let req_builder = rustfs_signer::sign_v2(req_builder, 0, &access_key_id, &secret_access_key, is_virtual_style);
let req = match req_builder.body(Body::empty()) {
Ok(req) => return Ok(req),
Err(err) => {
return Err(std::io::Error::other(err));
}
};
let req = rustfs_signer::sign_v2(req, 0, &access_key_id, &secret_access_key, is_virtual_style);
return Ok(req);
}
let mut content_sha256 = EMPTY_STRING_SHA256_HASH.to_string();
@@ -176,17 +167,11 @@ impl TransitionClient {
content_sha256 = UNSIGNED_PAYLOAD.to_string();
}
req_builder
req
.headers_mut()
.expect("err")
.insert("X-Amz-Content-Sha256", content_sha256.parse().unwrap());
let req_builder = rustfs_signer::sign_v4(req_builder, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1");
let req = match req_builder.body(Body::empty()) {
Ok(req) => return Ok(req),
Err(err) => {
return Err(std::io::Error::other(err));
}
};
let req = rustfs_signer::sign_v4(req, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1");
Ok(req)
}
}

View File

@@ -1,4 +1,3 @@
#![allow(clippy::map_entry)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -72,7 +71,6 @@ use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
use s3s::{Body, dto::Owner};
const _C_USER_AGENT_PREFIX: &str = "RustFS (linux; x86)";
const C_USER_AGENT: &str = "RustFS (linux; x86)";
const SUCCESS_STATUS: [StatusCode; 3] = [StatusCode::OK, StatusCode::NO_CONTENT, StatusCode::PARTIAL_CONTENT];
@@ -93,20 +91,16 @@ pub struct TransitionClient {
pub endpoint_url: Url,
pub creds_provider: Arc<Mutex<Credentials<Static>>>,
pub override_signer_type: SignatureType,
/*app_info: TODO*/
pub secure: bool,
pub http_client: Client<HttpsConnector<HttpConnector>, Body>,
//pub http_trace: Httptrace.ClientTrace,
pub bucket_loc_cache: Arc<Mutex<BucketLocationCache>>,
pub is_trace_enabled: Arc<Mutex<bool>>,
pub trace_errors_only: Arc<Mutex<bool>>,
//pub trace_output: io.Writer,
pub s3_accelerate_endpoint: Arc<Mutex<String>>,
pub s3_dual_stack_enabled: Arc<Mutex<bool>>,
pub region: String,
pub random: u64,
pub lookup: BucketLookupType,
//pub lookupFn: func(u url.URL, bucketName string) BucketLookupType,
pub md5_hasher: Arc<Mutex<Option<HashAlgorithm>>>,
pub sha256_hasher: Option<HashAlgorithm>,
pub health_status: AtomicI32,
@@ -118,12 +112,8 @@ pub struct TransitionClient {
pub struct Options {
pub creds: Credentials<Static>,
pub secure: bool,
//pub transport: http.RoundTripper,
//pub trace: *httptrace.ClientTrace,
pub region: String,
pub bucket_lookup: BucketLookupType,
//pub custom_region_via_url: func(u url.URL) string,
//pub bucket_lookup_via_url: func(u url.URL, bucketName string) BucketLookupType,
pub trailing_headers: bool,
pub custom_md5: Option<HashAlgorithm>,
pub custom_sha256: Option<HashAlgorithm>,
@@ -148,8 +138,6 @@ impl TransitionClient {
async fn private_new(endpoint: &str, opts: Options) -> Result<TransitionClient, std::io::Error> {
let endpoint_url = get_endpoint_url(endpoint, opts.secure)?;
//let jar = cookiejar.New(cookiejar.Options{PublicSuffixList: publicsuffix.List})?;
//#[cfg(feature = "ring")]
//let _ = rustls::crypto::ring::default_provider().install_default();
//#[cfg(feature = "aws-lc-rs")]
@@ -157,9 +145,6 @@ impl TransitionClient {
let scheme = endpoint_url.scheme();
let client;
//if scheme == "https" {
// client = Client::builder(TokioExecutor::new()).build_http();
//} else {
let tls = rustls::ClientConfig::builder().with_native_roots()?.with_no_client_auth();
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(tls)
@@ -167,7 +152,6 @@ impl TransitionClient {
.enable_http1()
.build();
client = Client::builder(TokioExecutor::new()).build(https);
//}
let mut clnt = TransitionClient {
endpoint_url,
@@ -213,13 +197,6 @@ impl TransitionClient {
self.endpoint_url.clone()
}
fn set_appinfo(&self, app_name: &str, app_version: &str) {
/*if app_name != "" && app_version != "" {
self.appInfo.app_name = app_name
self.appInfo.app_version = app_version
}*/
}
fn trace_errors_only_off(&self) {
let mut trace_errors_only = self.trace_errors_only.lock().unwrap();
*trace_errors_only = false;
@@ -268,6 +245,7 @@ impl TransitionClient {
fn dump_http(&self, req: &http::Request<Body>, resp: &http::Response<Body>) -> Result<(), std::io::Error> {
let mut resp_trace: Vec<u8>;
//info!("{}{}", self.trace_output, "---------BEGIN-HTTP---------");
//info!("{}{}", self.trace_output, "---------END-HTTP---------");
Ok(())
@@ -338,7 +316,7 @@ impl TransitionClient {
//let mut retry_timer = RetryTimer::new();
//while let Some(v) = retry_timer.next().await {
for _ in [1; 1]
/*new_retry_timer(req_retry, DefaultRetryUnit, DefaultRetryCap, MaxJitter)*/
/*new_retry_timer(req_retry, default_retry_unit, default_retry_cap, max_jitter)*/
{
let req = self.new_request(method, metadata).await?;
@@ -409,7 +387,9 @@ impl TransitionClient {
&metadata.query_values,
)?;
let mut req_builder = Request::builder().method(method).uri(target_url.to_string());
let Ok(mut req) = Request::builder().method(method).uri(target_url.to_string()).body(Body::empty()) else {
return Err(std::io::Error::other("create request error"));
};
let value;
{
@@ -433,30 +413,31 @@ impl TransitionClient {
if metadata.expires != 0 && metadata.pre_sign_url {
if signer_type == SignatureType::SignatureAnonymous {
return Err(std::io::Error::other(err_invalid_argument(
"Presigned URLs cannot be generated with anonymous credentials.",
"presigned urls cannot be generated with anonymous credentials.",
)));
}
if metadata.extra_pre_sign_header.is_some() {
if signer_type == SignatureType::SignatureV2 {
return Err(std::io::Error::other(err_invalid_argument(
"Extra signed headers for Presign with Signature V2 is not supported.",
"extra signed headers for presign with signature v2 is not supported.",
)));
}
let headers = req.headers_mut();
for (k, v) in metadata.extra_pre_sign_header.as_ref().unwrap() {
req_builder = req_builder.header(k, v);
headers.insert(k, v.clone());
}
}
if signer_type == SignatureType::SignatureV2 {
req_builder = rustfs_signer::pre_sign_v2(
req_builder,
req = rustfs_signer::pre_sign_v2(
req,
&access_key_id,
&secret_access_key,
metadata.expires,
is_virtual_host,
);
} else if signer_type == SignatureType::SignatureV4 {
req_builder = rustfs_signer::pre_sign_v4(
req_builder,
req = rustfs_signer::pre_sign_v4(
req,
&access_key_id,
&secret_access_key,
&session_token,
@@ -465,57 +446,41 @@ impl TransitionClient {
OffsetDateTime::now_utc(),
);
}
let req = match req_builder.body(Body::empty()) {
Ok(req) => req,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
return Ok(req);
}
self.set_user_agent(&mut req_builder);
self.set_user_agent(&mut req);
for (k, v) in metadata.custom_header.clone() {
req_builder.headers_mut().expect("err").insert(k.expect("err"), v);
req.headers_mut().insert(k.expect("err"), v);
}
//req.content_length = metadata.content_length;
if metadata.content_length <= -1 {
let chunked_value = HeaderValue::from_str(&vec!["chunked"].join(",")).expect("err");
req_builder
req
.headers_mut()
.expect("err")
.insert(http::header::TRANSFER_ENCODING, chunked_value);
}
if metadata.content_md5_base64.len() > 0 {
let md5_value = HeaderValue::from_str(&metadata.content_md5_base64).expect("err");
req_builder.headers_mut().expect("err").insert("Content-Md5", md5_value);
req.headers_mut().insert("Content-Md5", md5_value);
}
if signer_type == SignatureType::SignatureAnonymous {
let req = match req_builder.body(Body::empty()) {
Ok(req) => req,
Err(err) => {
return Err(std::io::Error::other(err));
}
};
return Ok(req);
}
if signer_type == SignatureType::SignatureV2 {
req_builder =
rustfs_signer::sign_v2(req_builder, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host);
req =
rustfs_signer::sign_v2(req, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host);
} else if metadata.stream_sha256 && !self.secure {
if metadata.trailer.len() > 0 {
//req.Trailer = metadata.trailer;
for (_, v) in &metadata.trailer {
req_builder = req_builder.header(http::header::TRAILER, v.clone());
req.headers_mut().insert(http::header::TRAILER, v.clone());
}
}
//req_builder = rustfs_signer::streaming_sign_v4(req_builder, &access_key_id,
// &secret_access_key, &session_token, &location, metadata.content_length, OffsetDateTime::now_utc(), self.sha256_hasher());
} else {
let mut sha_header = UNSIGNED_PAYLOAD.to_string();
if metadata.content_sha256_hex != "" {
@@ -526,11 +491,11 @@ impl TransitionClient {
} else if metadata.trailer.len() > 0 {
sha_header = UNSIGNED_PAYLOAD_TRAILER.to_string();
}
req_builder = req_builder
.header::<HeaderName, HeaderValue>("X-Amz-Content-Sha256".parse().unwrap(), sha_header.parse().expect("err"));
req
.headers_mut().insert("X-Amz-Content-Sha256".parse::<HeaderName>().unwrap(), sha_header.parse().expect("err"));
req_builder = rustfs_signer::sign_v4_trailer(
req_builder,
req = rustfs_signer::sign_v4_trailer(
req,
&access_key_id,
&secret_access_key,
&session_token,
@@ -539,33 +504,23 @@ impl TransitionClient {
);
}
let req;
if metadata.content_length == 0 {
req = req_builder.body(Body::empty());
} else {
if metadata.content_length > 0 {
match &mut metadata.content_body {
ReaderImpl::Body(content_body) => {
req = req_builder.body(Body::from(content_body.clone()));
*req.body_mut() = Body::from(content_body.clone());
}
ReaderImpl::ObjectBody(content_body) => {
req = req_builder.body(Body::from(content_body.read_all().await?));
*req.body_mut() = Body::from(content_body.read_all().await?);
}
}
//req = req_builder.body(s3s::Body::from(metadata.content_body.read_all().await?));
}
match req {
Ok(req) => Ok(req),
Err(err) => Err(std::io::Error::other(err)),
Ok(req)
}
}
pub fn set_user_agent(&self, req: &mut Builder) {
let headers = req.headers_mut().expect("err");
pub fn set_user_agent(&self, req: &mut Request<Body>) {
let headers = req.headers_mut();
headers.insert("User-Agent", C_USER_AGENT.parse().expect("err"));
/*if self.app_info.app_name != "" && self.app_info.app_version != "" {
headers.insert("User-Agent", C_USER_AGENT+" "+self.app_info.app_name+"/"+self.app_info.app_version);
}*/
}
fn make_target_url(
@@ -948,7 +903,7 @@ pub struct ObjectMultipartInfo {
pub key: String,
pub size: i64,
pub upload_id: String,
//pub err error,
//pub err: Error,
}
pub struct UploadInfo {

View File

@@ -95,7 +95,6 @@ impl WarmBackendS3 {
..Default::default()
};
let client = TransitionClient::new(&u.host().expect("err").to_string(), opts).await?;
//client.set_appinfo(format!("s3-tier-{}", tier), ReleaseTag);
let client = Arc::new(client);
let core = TransitionCore(Arc::clone(&client));

View File

@@ -27,10 +27,14 @@ bytes = { workspace = true }
http.workspace = true
time.workspace = true
hyper.workspace = true
serde.workspace = true
serde_urlencoded.workspace = true
rustfs-utils = { workspace = true, features = ["full"] }
s3s.workspace = true
[dev-dependencies]
tempfile = { workspace = true }
rand = { workspace = true }
[lints]
workspace = true

View File

@@ -17,6 +17,7 @@ use lazy_static::lazy_static;
use std::collections::HashMap;
use time::{OffsetDateTime, macros::format_description};
use s3s::Body;
use super::request_signature_v4::{SERVICE_TYPE_S3, get_scope, get_signature, get_signing_key};
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
@@ -68,7 +69,7 @@ fn _build_chunk_signature(
#[allow(clippy::too_many_arguments)]
pub fn streaming_sign_v4(
mut req: request::Builder,
mut req: request::Request<Body>,
_access_key_id: &str,
_secret_access_key: &str,
session_token: &str,
@@ -76,8 +77,8 @@ pub fn streaming_sign_v4(
data_len: i64,
req_time: OffsetDateTime, /*, sh256: md5simd::Hasher*/
trailer: HeaderMap,
) -> request::Builder {
let headers = req.headers_mut().expect("err");
) -> request::Request<Body> {
let headers = req.headers_mut();
if trailer.is_empty() {
headers.append("X-Amz-Content-Sha256", HeaderValue::from_str(STREAMING_SIGN_ALGORITHM).expect("err"));

View File

@@ -15,13 +15,15 @@
use http::{HeaderValue, request};
use time::{OffsetDateTime, macros::format_description};
use s3s::Body;
pub fn streaming_unsigned_v4(
mut req: request::Builder,
mut req: request::Request<Body>,
session_token: &str,
_data_len: i64,
req_time: OffsetDateTime,
) -> request::Builder {
let headers = req.headers_mut().expect("err");
) -> request::Request<Body> {
let headers = req.headers_mut();
let chunked_value = HeaderValue::from_str(&["aws-chunked"].join(",")).expect("err");
headers.insert(http::header::TRANSFER_ENCODING, chunked_value);

View File

@@ -19,25 +19,25 @@ use std::collections::HashMap;
use std::fmt::Write;
use time::{OffsetDateTime, format_description};
use s3s::Body;
use super::utils::get_host_addr;
use rustfs_utils::crypto::{base64_encode, hex, hmac_sha1};
const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
const SIGN_V2_ALGORITHM: &str = "AWS";
fn encode_url2path(req: &request::Builder, _virtual_host: bool) -> String {
//path = serde_urlencoded::to_string(req.uri_ref().unwrap().path().unwrap()).unwrap();
fn encode_url2path(req: &request::Request<Body>, _virtual_host: bool) -> String {
req.uri_ref().unwrap().path().to_string()
req.uri().path().to_string()
}
pub fn pre_sign_v2(
mut req: request::Builder,
mut req: request::Request<Body>,
access_key_id: &str,
secret_access_key: &str,
expires: i64,
virtual_host: bool,
) -> request::Builder {
) -> request::Request<Body> {
if access_key_id.is_empty() || secret_access_key.is_empty() {
return req;
}
@@ -46,7 +46,7 @@ pub fn pre_sign_v2(
let d = d.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
let epoch_expires = d.unix_timestamp() + expires;
let headers = req.headers_mut().expect("headers_mut err");
let headers = req.headers_mut();
let expires_str = headers.get("Expires");
if expires_str.is_none() {
headers.insert("Expires", format!("{epoch_expires:010}").parse().unwrap());
@@ -55,7 +55,7 @@ pub fn pre_sign_v2(
let string_to_sign = pre_string_to_sign_v2(&req, virtual_host);
let signature = hex(hmac_sha1(secret_access_key, string_to_sign));
let result = serde_urlencoded::from_str::<HashMap<String, String>>(req.uri_ref().unwrap().query().unwrap());
let result = serde_urlencoded::from_str::<HashMap<String, String>>(req.uri().query().unwrap());
let mut query = result.unwrap_or_default();
if get_host_addr(&req).contains(".storage.googleapis.com") {
query.insert("GoogleAccessId".to_string(), access_key_id.to_string());
@@ -65,15 +65,17 @@ pub fn pre_sign_v2(
query.insert("Expires".to_string(), format!("{epoch_expires:010}"));
let uri = req.uri_ref().unwrap().clone();
let mut parts = req.uri_ref().unwrap().clone().into_parts();
let uri = req.uri().clone();
let mut parts = req.uri().clone().into_parts();
parts.path_and_query = Some(
format!("{}?{}&Signature={}", uri.path(), serde_urlencoded::to_string(&query).unwrap(), signature)
.parse()
.unwrap(),
);
req.uri(Uri::from_parts(parts).unwrap())
*req.uri_mut() = Uri::from_parts(parts).unwrap();
req
}
fn _post_pre_sign_signature_v2(policy_base64: &str, secret_access_key: &str) -> String {
@@ -81,12 +83,12 @@ fn _post_pre_sign_signature_v2(policy_base64: &str, secret_access_key: &str) ->
}
pub fn sign_v2(
mut req: request::Builder,
mut req: request::Request<Body>,
_content_len: i64,
access_key_id: &str,
secret_access_key: &str,
virtual_host: bool,
) -> request::Builder {
) -> request::Request<Body> {
if access_key_id.is_empty() || secret_access_key.is_empty() {
return req;
}
@@ -95,7 +97,7 @@ pub fn sign_v2(
let d2 = d.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
let string_to_sign = string_to_sign_v2(&req, virtual_host);
let headers = req.headers_mut().expect("err");
let headers = req.headers_mut();
let date = headers.get("Date").unwrap();
if date.to_str().unwrap() == "" {
@@ -117,7 +119,7 @@ pub fn sign_v2(
req
}
fn pre_string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
fn pre_string_to_sign_v2(req: &request::Request<Body>, virtual_host: bool) -> String {
let mut buf = BytesMut::new();
write_pre_sign_v2_headers(&mut buf, req);
write_canonicalized_headers(&mut buf, req);
@@ -125,18 +127,18 @@ fn pre_string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
String::from_utf8(buf.to_vec()).unwrap()
}
fn write_pre_sign_v2_headers(buf: &mut BytesMut, req: &request::Builder) {
let _ = buf.write_str(req.method_ref().unwrap().as_str());
fn write_pre_sign_v2_headers(buf: &mut BytesMut, req: &request::Request<Body>) {
let _ = buf.write_str(req.method().as_str());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Md5").unwrap().to_str().unwrap());
let _ = buf.write_str(req.headers().get("Content-Md5").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Type").unwrap().to_str().unwrap());
let _ = buf.write_str(req.headers().get("Content-Type").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Expires").unwrap().to_str().unwrap());
let _ = buf.write_str(req.headers().get("Expires").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
}
fn string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
fn string_to_sign_v2(req: &request::Request<Body>, virtual_host: bool) -> String {
let mut buf = BytesMut::new();
write_sign_v2_headers(&mut buf, req);
write_canonicalized_headers(&mut buf, req);
@@ -144,27 +146,27 @@ fn string_to_sign_v2(req: &request::Builder, virtual_host: bool) -> String {
String::from_utf8(buf.to_vec()).unwrap()
}
fn write_sign_v2_headers(buf: &mut BytesMut, req: &request::Builder) {
let _ = buf.write_str(req.method_ref().unwrap().as_str());
fn write_sign_v2_headers(buf: &mut BytesMut, req: &request::Request<Body>) {
let headers = req.headers();
let _ = buf.write_str(req.method().as_str());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Md5").unwrap().to_str().unwrap());
let _ = buf.write_str(headers.get("Content-Md5").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Content-Type").unwrap().to_str().unwrap());
let _ = buf.write_str(headers.get("Content-Type").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
let _ = buf.write_str(req.headers_ref().unwrap().get("Date").unwrap().to_str().unwrap());
let _ = buf.write_str(headers.get("Date").unwrap().to_str().unwrap());
let _ = buf.write_char('\n');
}
fn write_canonicalized_headers(buf: &mut BytesMut, req: &request::Builder) {
fn write_canonicalized_headers(buf: &mut BytesMut, req: &request::Request<Body>) {
let mut proto_headers = Vec::<String>::new();
let mut vals = HashMap::<String, Vec<String>>::new();
for k in req.headers_ref().expect("err").keys() {
for k in req.headers().keys() {
let lk = k.as_str().to_lowercase();
if lk.starts_with("x-amz") {
proto_headers.push(lk.clone());
let vv = req
.headers_ref()
.expect("err")
.headers()
.get_all(k)
.iter()
.map(|e| e.to_str().unwrap().to_string())
@@ -210,12 +212,12 @@ const INCLUDED_QUERY: &[&str] = &[
"website",
];
fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Builder, virtual_host: bool) {
let request_url = req.uri_ref().unwrap();
fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Request<Body>, virtual_host: bool) {
let request_url = req.uri();
let _ = buf.write_str(&encode_url2path(req, virtual_host));
if request_url.query().unwrap() != "" {
let mut n: i64 = 0;
let result = serde_urlencoded::from_str::<HashMap<String, Vec<String>>>(req.uri_ref().unwrap().query().unwrap());
let result = serde_urlencoded::from_str::<HashMap<String, Vec<String>>>(req.uri().query().unwrap());
let vals = result.unwrap_or_default();
for resource in INCLUDED_QUERY {
let vv = &vals[*resource];

View File

@@ -22,10 +22,11 @@ use std::fmt::Write;
use time::{OffsetDateTime, macros::format_description};
use tracing::debug;
use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256};
use s3s::Body;
use super::constants::UNSIGNED_PAYLOAD;
use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4;
use super::utils::{get_host_addr, sign_v4_trim_all};
use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256};
pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
pub const SERVICE_TYPE_S3: &str = "s3";
@@ -76,8 +77,8 @@ fn get_credential(access_key_id: &str, location: &str, t: OffsetDateTime, servic
s
}
fn get_hashed_payload(req: &request::Builder) -> String {
let headers = req.headers_ref().unwrap();
fn get_hashed_payload(req: &request::Request<Body>) -> String {
let headers = req.headers();
let mut hashed_payload = "";
if let Some(payload) = headers.get("X-Amz-Content-Sha256") {
hashed_payload = payload.to_str().unwrap();
@@ -88,17 +89,16 @@ fn get_hashed_payload(req: &request::Builder) -> String {
hashed_payload.to_string()
}
fn get_canonical_headers(req: &request::Builder, ignored_headers: &HashMap<String, bool>) -> String {
fn get_canonical_headers(req: &request::Request<Body>, ignored_headers: &HashMap<String, bool>) -> String {
let mut headers = Vec::<String>::new();
let mut vals = HashMap::<String, Vec<String>>::new();
for k in req.headers_ref().expect("err").keys() {
for k in req.headers().keys() {
if ignored_headers.get(&k.to_string()).is_some() {
continue;
}
headers.push(k.as_str().to_lowercase());
let vv = req
.headers_ref()
.expect("err")
.headers()
.get_all(k)
.iter()
.map(|e| e.to_str().unwrap().to_string())
@@ -146,9 +146,9 @@ fn header_exists(key: &str, headers: &[String]) -> bool {
false
}
fn get_signed_headers(req: &request::Builder, ignored_headers: &HashMap<String, bool>) -> String {
fn get_signed_headers(req: &request::Request<Body>, ignored_headers: &HashMap<String, bool>) -> String {
let mut headers = Vec::<String>::new();
let headers_ref = req.headers_ref().expect("err");
let headers_ref = req.headers();
debug!("get_signed_headers headers: {:?}", headers_ref);
for (k, _) in headers_ref {
if ignored_headers.get(&k.to_string()).is_some() {
@@ -163,9 +163,9 @@ fn get_signed_headers(req: &request::Builder, ignored_headers: &HashMap<String,
headers.join(";")
}
fn get_canonical_request(req: &request::Builder, ignored_headers: &HashMap<String, bool>, hashed_payload: &str) -> String {
fn get_canonical_request(req: &request::Request<Body>, ignored_headers: &HashMap<String, bool>, hashed_payload: &str) -> String {
let mut canonical_query_string = "".to_string();
if let Some(q) = req.uri_ref().unwrap().query() {
if let Some(q) = req.uri().query() {
// Parse query string into key-value pairs
let mut query_params: Vec<(String, String)> = Vec::new();
for param in q.split('&') {
@@ -187,8 +187,8 @@ fn get_canonical_request(req: &request::Builder, ignored_headers: &HashMap<Strin
}
let canonical_request = [
req.method_ref().unwrap().to_string(),
req.uri_ref().unwrap().path().to_string(),
req.method().to_string(),
req.uri().path().to_string(),
canonical_query_string,
get_canonical_headers(req, ignored_headers),
get_signed_headers(req, ignored_headers),
@@ -210,14 +210,14 @@ fn get_string_to_sign_v4(t: OffsetDateTime, location: &str, canonical_request: &
}
pub fn pre_sign_v4(
req: request::Builder,
req: request::Request<Body>,
access_key_id: &str,
secret_access_key: &str,
session_token: &str,
location: &str,
expires: i64,
t: OffsetDateTime,
) -> request::Builder {
) -> request::Request<Body> {
if access_key_id.is_empty() || secret_access_key.is_empty() {
return req;
}
@@ -226,7 +226,7 @@ pub fn pre_sign_v4(
let signed_headers = get_signed_headers(&req, &v4_ignored_headers);
let mut query = <Vec<(String, String)>>::new();
if let Some(q) = req.uri_ref().unwrap().query() {
if let Some(q) = req.uri().query() {
let result = serde_urlencoded::from_str::<Vec<(String, String)>>(q);
query = result.unwrap_or_default();
}
@@ -240,14 +240,15 @@ pub fn pre_sign_v4(
query.push(("X-Amz-Security-Token".to_string(), session_token.to_string()));
}
let uri = req.uri_ref().unwrap().clone();
let mut parts = req.uri_ref().unwrap().clone().into_parts();
let uri = req.uri().clone();
let mut parts = req.uri().clone().into_parts();
parts.path_and_query = Some(
format!("{}?{}", uri.path(), serde_urlencoded::to_string(&query).unwrap())
.parse()
.unwrap(),
);
let req = req.uri(Uri::from_parts(parts).unwrap());
let mut req = req;
*req.uri_mut() = Uri::from_parts(parts).unwrap();
let canonical_request = get_canonical_request(&req, &v4_ignored_headers, &get_hashed_payload(&req));
let string_to_sign = get_string_to_sign_v4(t, location, &canonical_request, SERVICE_TYPE_S3);
@@ -256,8 +257,8 @@ pub fn pre_sign_v4(
let signing_key = get_signing_key(secret_access_key, location, t, SERVICE_TYPE_S3);
let signature = get_signature(signing_key, &string_to_sign);
let uri = req.uri_ref().unwrap().clone();
let mut parts = req.uri_ref().unwrap().clone().into_parts();
let uri = req.uri().clone();
let mut parts = req.uri().clone().into_parts();
parts.path_and_query = Some(
format!(
"{}?{}&X-Amz-Signature={}",
@@ -269,7 +270,10 @@ pub fn pre_sign_v4(
.unwrap(),
);
req.uri(Uri::from_parts(parts).unwrap())
*req.uri_mut() = Uri::from_parts(parts).unwrap();
req
}
fn _post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_access_key: &str, location: &str) -> String {
@@ -278,13 +282,12 @@ fn _post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_ac
get_signature(signing_key, policy_base64)
}
fn _sign_v4_sts(req: request::Builder, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Builder {
fn _sign_v4_sts(req: request::Request<Body>, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Request<Body> {
sign_v4_inner(req, 0, access_key_id, secret_access_key, "", location, SERVICE_TYPE_STS, HeaderMap::new())
}
#[allow(clippy::too_many_arguments)]
fn sign_v4_inner(
mut req: request::Builder,
mut req: request::Request<Body>,
content_len: i64,
access_key_id: &str,
secret_access_key: &str,
@@ -292,7 +295,7 @@ fn sign_v4_inner(
location: &str,
service_type: &str,
trailer: HeaderMap,
) -> request::Builder {
) -> request::Request<Body> {
if access_key_id.is_empty() || secret_access_key.is_empty() {
return req;
}
@@ -300,7 +303,7 @@ fn sign_v4_inner(
let t = OffsetDateTime::now_utc();
let t2 = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
let headers = req.headers_mut().expect("err");
let headers = req.headers_mut();
let format = format_description!("[year][month][day]T[hour][minute][second]Z");
headers.insert("X-Amz-Date", t.format(&format).unwrap().to_string().parse().unwrap());
@@ -330,9 +333,12 @@ fn sign_v4_inner(
let signature = get_signature(signing_key, &string_to_sign);
//debug!("\n\ncanonical_request: \n{}\nstring_to_sign: \n{}\nsignature: \n{}\n\n", &canonical_request, &string_to_sign, &signature);
let headers = req.headers_mut().expect("err");
let headers = req.headers_mut();
let auth = format!("{SIGN_V4_ALGORITHM} Credential={credential}, SignedHeaders={signed_headers}, Signature={signature}");
let auth = format!(
"{} Credential={}, SignedHeaders={}, Signature={}",
SIGN_V4_ALGORITHM, credential, signed_headers, signature
);
headers.insert("Authorization", auth.parse().unwrap());
if !trailer.is_empty() {
@@ -345,14 +351,14 @@ fn sign_v4_inner(
req
}
fn _unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: HeaderMap) {
fn _unsigned_trailer(mut req: request::Request<Body>, content_len: i64, trailer: HeaderMap) {
if !trailer.is_empty() {
return;
}
let t = OffsetDateTime::now_utc();
let t = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
let headers = req.headers_mut().expect("err");
let headers = req.headers_mut();
let format = format_description!("[year][month][day]T[hour][minute][second]Z");
headers.insert("X-Amz-Date", t.format(&format).unwrap().to_string().parse().unwrap());
@@ -372,13 +378,13 @@ fn _unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: Heade
}
pub fn sign_v4(
req: request::Builder,
req: request::Request<Body>,
content_len: i64,
access_key_id: &str,
secret_access_key: &str,
session_token: &str,
location: &str,
) -> request::Builder {
) -> request::Request<Body> {
sign_v4_inner(
req,
content_len,
@@ -392,13 +398,13 @@ pub fn sign_v4(
}
pub fn sign_v4_trailer(
req: request::Builder,
req: request::Request<Body>,
access_key_id: &str,
secret_access_key: &str,
session_token: &str,
location: &str,
trailer: HeaderMap,
) -> request::Builder {
) -> request::Request<Body> {
sign_v4_inner(
req,
0,

View File

@@ -14,9 +14,11 @@
use http::request;
pub fn get_host_addr(req: &request::Builder) -> String {
let host = req.headers_ref().expect("err").get("host");
let uri = req.uri_ref().unwrap();
use s3s::Body;
pub fn get_host_addr(req: &request::Request<Body>) -> String {
let host = req.headers().get("host");
let uri = req.uri();
let req_host;
if let Some(port) = uri.port() {
req_host = format!("{}:{}", uri.host().unwrap(), port);