This commit is contained in:
likewu
2026-01-17 21:27:40 +08:00
parent f383ea1175
commit 82fcb7227a
11 changed files with 85 additions and 54 deletions

View File

@@ -19,10 +19,10 @@
#![allow(clippy::all)]
use http::{HeaderMap, StatusCode};
use std::collections::HashMap;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use std::collections::HashMap;
use crate::client::{
api_error_response::http_resp_to_error_response,
@@ -69,7 +69,13 @@ impl TransitionClient {
//if resp != nil {
if resp_status != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
//}
Ok(())
@@ -107,7 +113,13 @@ impl TransitionClient {
let h = resp.headers().clone();
if resp_status != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
Ok(())

View File

@@ -20,11 +20,11 @@
#![allow(clippy::all)]
//use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::ready;
use http::HeaderMap;
use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::BufReader;
use tokio_util::io::StreamReader;
@@ -33,11 +33,11 @@ use crate::client::{
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use futures_util::StreamExt;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use tokio_util::io::ReaderStream;
impl TransitionClient {

View File

@@ -25,11 +25,11 @@ use crate::client::{
};
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body_util::BodyExt;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::dto::Owner;
use std::collections::HashMap;
use http_body_util::BodyExt;
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Grantee {
@@ -97,7 +97,13 @@ impl TransitionClient {
}
if resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, object_name)));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
body_vec,
bucket_name,
object_name,
)));
}
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(body_vec).unwrap()) {

View File

@@ -30,11 +30,11 @@ use crate::client::{
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
//use s3s::Body;
use s3s::header::{X_AMZ_MAX_PARTS, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, X_AMZ_VERSION_ID};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper::body::Body;
use http_body_util::BodyExt;
use s3s::header::{X_AMZ_MAX_PARTS, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, X_AMZ_VERSION_ID};
pub struct ObjectAttributesOptions {
pub max_parts: i64,

View File

@@ -29,12 +29,12 @@ use crate::client::{
use crate::store_api::BucketInfo;
//use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use std::collections::HashMap;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
impl TransitionClient {
pub fn list_buckets(&self) -> Result<Vec<BucketInfo>, std::io::Error> {
@@ -105,7 +105,13 @@ impl TransitionClient {
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
//let mut list_bucket_result = ListBucketV2Result::default();

View File

@@ -19,12 +19,12 @@
//use bytes::Bytes;
use http::{HeaderMap, HeaderName, StatusCode};
use hyper::body::Bytes;
use s3s::S3ErrorCode;
use std::collections::HashMap;
use time::OffsetDateTime;
use tracing::warn;
use uuid::Uuid;
use hyper::body::Bytes;
use crate::client::checksum::ChecksumMode;
use crate::client::utils::base64_encode;
@@ -226,7 +226,7 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();

View File

@@ -19,6 +19,9 @@
#![allow(clippy::all)]
use http::{HeaderMap, HeaderValue, Method, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::HashAlgorithm;
use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
@@ -27,9 +30,6 @@ use std::fmt::Display;
use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime;
use tokio::sync::mpsc::{self, Receiver, Sender};
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use crate::client::utils::base64_encode;
use crate::client::{

View File

@@ -25,13 +25,13 @@ use crate::client::{
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use http::HeaderMap;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use s3s::dto::RestoreRequest;
use std::collections::HashMap;
use std::io::Cursor;
use tokio::io::BufReader;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
const TIER_STANDARD: &str = "Standard";
const TIER_BULK: &str = "Bulk";
@@ -121,7 +121,13 @@ impl TransitionClient {
}
}
if resp_status != http::StatusCode::ACCEPTED && resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
body_vec,
bucket_name,
"",
)));
}
Ok(())
}

View File

@@ -19,14 +19,14 @@
#![allow(clippy::all)]
use http::{HeaderMap, HeaderValue};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use std::{collections::HashMap, str::FromStr};
use tokio::io::BufReader;
use tracing::warn;
use uuid::Uuid;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use crate::client::{
api_error_response::{ErrorResponse, err_invalid_argument, http_resp_to_error_response},

View File

@@ -25,15 +25,15 @@ use crate::client::{
transition_api::{CreateBucketConfiguration, LocationConstraint, TransitionClient},
};
use http::Request;
use http_body_util::BodyExt;
use hyper::StatusCode;
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::body::Incoming;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use s3s::S3ErrorCode;
use std::collections::HashMap;
use hyper::body::Bytes;
use hyper::body::Body;
use http_body_util::BodyExt;
use hyper::body::Incoming;
#[derive(Debug, Clone)]
pub struct BucketLocationCache {
@@ -123,7 +123,11 @@ impl TransitionClient {
url_str = target_url.to_string();
}
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(s3s::Body::empty()) else {
let Ok(mut req) = Request::builder()
.method(http::Method::GET)
.uri(url_str)
.body(s3s::Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -183,7 +187,7 @@ async fn process_bucket_location_response(
if resp.status() != StatusCode::OK {
let resp_status = resp.status();
let h = resp.headers().clone();
let err_resp = http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "");
match err_resp.code {
S3ErrorCode::NotImplemented => {
@@ -227,7 +231,9 @@ async fn process_bucket_location_response(
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(body_vec).unwrap()).unwrap();
location = d.location_constraint;
} else {
if let Ok(LocationConstraint { field }) = quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(body_vec).unwrap()) {
if let Ok(LocationConstraint { field }) =
quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(body_vec).unwrap())
{
location = field;
}
}

View File

@@ -20,6 +20,7 @@
use crate::client::bucket_cache::BucketLocationCache;
use crate::client::{
api_error_response::ErrorResponse,
api_error_response::{err_invalid_argument, http_resp_to_error_response, to_error_response},
api_get_options::GetObjectOptions,
api_put_object::PutObjectOptions,
@@ -30,7 +31,6 @@ use crate::client::{
},
constants::{UNSIGNED_PAYLOAD, UNSIGNED_PAYLOAD_TRAILER},
credentials::{CredContext, Credentials, SignatureType, Static},
api_error_response::ErrorResponse,
};
use crate::{client::checksum::ChecksumMode, store_api::GetObjectReader};
//use bytes::Bytes;
@@ -40,6 +40,10 @@ use http::{
HeaderValue, Response, StatusCode,
request::{Builder, Request},
};
use http_body::Body;
use http_body_util::BodyExt;
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::{client::legacy::Client, client::legacy::connect::HttpConnector, rt::TokioExecutor};
use md5::Digest;
@@ -55,8 +59,8 @@ use rustfs_utils::{
},
};
use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
use s3s::dto::Owner;
use s3s::dto::ReplicationStatus;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::io::Cursor;
@@ -73,10 +77,6 @@ use tokio::io::BufReader;
use tracing::{debug, error, warn};
use url::{Url, form_urlencoded};
use uuid::Uuid;
use http_body::Body;
use hyper::body::Incoming;
use hyper::body::Bytes;
use http_body_util::BodyExt;
const C_USER_AGENT: &str = "RustFS (linux; x86)";
@@ -277,8 +277,7 @@ impl TransitionClient {
debug!("endpoint_url: {}", self.endpoint_url.as_str().to_string());
resp = http_client.request(req);
}
let resp = resp
.await;
let resp = resp.await;
debug!("http_client url: {} {}", req_method, req_uri);
debug!("http_client headers: {:?}", req_headers);
if let Err(err) = resp {
@@ -357,7 +356,8 @@ impl TransitionClient {
body_vec.extend_from_slice(data);
}
}
let mut err_response = http_resp_to_error_response(resp_status, &h, body_vec.clone(), &metadata.bucket_name, &metadata.object_name);
let mut err_response =
http_resp_to_error_response(resp_status, &h, body_vec.clone(), &metadata.bucket_name, &metadata.object_name);
err_response.message = format!("remote tier error: {}", err_response.message);
if self.region == "" {
@@ -993,12 +993,7 @@ impl Default for UploadInfo {
/// containing metadata about an S3 object.
pub fn to_object_info(bucket_name: &str, object_name: &str, h: &HeaderMap) -> Result<ObjectInfo, std::io::Error> {
// Helper function to get header value as string
let get_header = |name: &str| -> String {
h.get(name)
.and_then(|val| val.to_str().ok())
.unwrap_or("")
.to_string()
};
let get_header = |name: &str| -> String { h.get(name).and_then(|val| val.to_str().ok()).unwrap_or("").to_string() };
// Get and process the ETag
let etag = {