mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
separate signer.
fix ilm feature.
This commit is contained in:
@@ -13,6 +13,7 @@ members = [
|
||||
"crates/rio", # Rust I/O utilities and abstractions
|
||||
"crates/utils", # Utility functions and helpers
|
||||
"crates/zip", # ZIP file handling and compression
|
||||
"crates/signer", # client signer
|
||||
"crypto", # Cryptography and security features
|
||||
"ecstore", # Erasure coding storage implementation
|
||||
"e2e_test", # End-to-end test suite
|
||||
@@ -21,7 +22,6 @@ members = [
|
||||
"rustfs", # Core file system implementation
|
||||
"s3select/api", # S3 Select API interface
|
||||
"s3select/query", # S3 Select query engine
|
||||
"reader",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
@@ -58,8 +58,8 @@ rustfs-notify = { path = "crates/notify", version = "0.0.1" }
|
||||
rustfs-utils = { path = "crates/utils", version = "0.0.1" }
|
||||
rustfs-rio = { path = "crates/rio", version = "0.0.1" }
|
||||
rustfs-filemeta = { path = "crates/filemeta", version = "0.0.1" }
|
||||
rustfs-signer = { path = "crates/signer", version = "0.0.1" }
|
||||
workers = { path = "./common/workers", version = "0.0.1" }
|
||||
reader = { path = "./reader", version = "0.0.1" }
|
||||
aes-gcm = { version = "0.10.3", features = ["std"] }
|
||||
arc-swap = "1.7.1"
|
||||
argon2 = { version = "0.5.3", features = ["std"] }
|
||||
|
||||
25
crates/signer/Cargo.toml
Normal file
25
crates/signer/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "rustfs-signer"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
rust-version.workspace = true
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
tracing.workspace = true
|
||||
lazy_static.workspace = true
|
||||
bytes = { workspace = true }
|
||||
http.workspace = true
|
||||
time.workspace = true
|
||||
hyper.workspace = true
|
||||
serde.workspace = true
|
||||
serde_urlencoded.workspace = true
|
||||
rustfs-utils = {workspace = true, features=["full"]}
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
10
crates/signer/src/constants.rs
Normal file
10
crates/signer/src/constants.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
use time::{format_description::FormatItem, macros::format_description};
|
||||
|
||||
pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD";
|
||||
pub const UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER";
|
||||
|
||||
pub const TOTAL_WORKERS: i64 = 4;
|
||||
|
||||
pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
|
||||
pub const ISO8601_DATEFORMAT: &[FormatItem<'_>] =
|
||||
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z");
|
||||
@@ -1,4 +1,4 @@
|
||||
pub mod ordered_qs;
|
||||
pub mod constants;
|
||||
pub mod request_signature_streaming;
|
||||
pub mod request_signature_streaming_unsigned_trailer;
|
||||
pub mod request_signature_v2;
|
||||
@@ -1,31 +1,21 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use http::request::{self, Request};
|
||||
use http::{request, HeaderMap, HeaderValue};
|
||||
use lazy_static::lazy_static;
|
||||
use std::collections::HashMap;
|
||||
use time::{OffsetDateTime, macros::format_description};
|
||||
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
use super::request_signature_v4::{SERVICE_TYPE_S3, get_scope, get_signature, get_signing_key};
|
||||
use rustfs_utils::{
|
||||
crypto::{hex, hex_sha256, hex_sha256_chunk, hmac_sha256},
|
||||
hash::EMPTY_STRING_SHA256_HASH,
|
||||
};
|
||||
|
||||
const STREAMING_SIGN_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
|
||||
const STREAMING_SIGN_TRAILER_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";
|
||||
const STREAMING_PAYLOAD_HDR: &str = "AWS4-HMAC-SHA256-PAYLOAD";
|
||||
const STREAMING_TRAILER_HDR: &str = "AWS4-HMAC-SHA256-TRAILER";
|
||||
const PAYLOAD_CHUNK_SIZE: i64 = 64 * 1024;
|
||||
const CHUNK_SIGCONST_LEN: i64 = 17;
|
||||
const SIGNATURESTR_LEN: i64 = 64;
|
||||
const CRLF_LEN: i64 = 2;
|
||||
const TRAILER_KV_SEPARATOR: &str = ":";
|
||||
const TRAILER_SIGNATURE: &str = "x-amz-trailer-signature";
|
||||
const _STREAMING_TRAILER_HDR: &str = "AWS4-HMAC-SHA256-TRAILER";
|
||||
const _PAYLOAD_CHUNK_SIZE: i64 = 64 * 1024;
|
||||
const _CHUNK_SIGCONST_LEN: i64 = 17;
|
||||
const _SIGNATURESTR_LEN: i64 = 64;
|
||||
const _CRLF_LEN: i64 = 2;
|
||||
const _TRAILER_KV_SEPARATOR: &str = ":";
|
||||
const _TRAILER_SIGNATURE: &str = "x-amz-trailer-signature";
|
||||
|
||||
lazy_static! {
|
||||
static ref ignored_streaming_headers: HashMap<String, bool> = {
|
||||
@@ -37,6 +27,7 @@ lazy_static! {
|
||||
};
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn build_chunk_string_to_sign(t: OffsetDateTime, region: &str, previous_sig: &str, chunk_check_sum: &str) -> String {
|
||||
let mut string_to_sign_parts = <Vec<String>>::new();
|
||||
string_to_sign_parts.push(STREAMING_PAYLOAD_HDR.to_string());
|
||||
@@ -49,7 +40,7 @@ fn build_chunk_string_to_sign(t: OffsetDateTime, region: &str, previous_sig: &st
|
||||
string_to_sign_parts.join("\n")
|
||||
}
|
||||
|
||||
fn build_chunk_signature(
|
||||
fn _build_chunk_signature(
|
||||
chunk_check_sum: &str,
|
||||
req_time: OffsetDateTime,
|
||||
region: &str,
|
||||
@@ -62,13 +53,37 @@ fn build_chunk_signature(
|
||||
}
|
||||
|
||||
pub fn streaming_sign_v4(
|
||||
req: request::Builder,
|
||||
access_key_id: &str,
|
||||
secret_access_key: &str,
|
||||
mut req: request::Builder,
|
||||
_access_key_id: &str,
|
||||
_secret_access_key: &str,
|
||||
session_token: &str,
|
||||
region: &str,
|
||||
_region: &str,
|
||||
data_len: i64,
|
||||
req_time: OffsetDateTime, /*, sh256: md5simd.Hasher*/
|
||||
req_time: OffsetDateTime, /*, sh256: md5simd::Hasher*/
|
||||
trailer: HeaderMap,
|
||||
) -> request::Builder {
|
||||
todo!();
|
||||
let headers = req.headers_mut().expect("err");
|
||||
|
||||
if trailer.is_empty() {
|
||||
headers.append("X-Amz-Content-Sha256", HeaderValue::from_str(STREAMING_SIGN_ALGORITHM).expect("err"));
|
||||
} else {
|
||||
headers.append("X-Amz-Content-Sha256", HeaderValue::from_str(STREAMING_SIGN_TRAILER_ALGORITHM).expect("err"));
|
||||
for (k, _) in &trailer {
|
||||
headers.append("X-Amz-Trailer", k.as_str().to_lowercase().parse().unwrap());
|
||||
}
|
||||
let chunked_value = HeaderValue::from_str(&vec!["aws-chunked"].join(",")).expect("err");
|
||||
headers.insert(http::header::TRANSFER_ENCODING, chunked_value);
|
||||
}
|
||||
|
||||
if !session_token.is_empty() {
|
||||
headers.insert("X-Amz-Security-Token", HeaderValue::from_str(&session_token).expect("err"));
|
||||
}
|
||||
|
||||
let format = format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z");
|
||||
headers.insert("X-Amz-Date", HeaderValue::from_str(&req_time.format(&format).unwrap()).expect("err"));
|
||||
|
||||
//req.content_length = 100;
|
||||
headers.insert("x-amz-decoded-content-length", format!("{:010}", data_len).parse().unwrap());
|
||||
|
||||
req
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
use http::{request, HeaderValue};
|
||||
use time::{OffsetDateTime, macros::format_description};
|
||||
|
||||
pub fn streaming_unsigned_v4(
|
||||
mut req: request::Builder,
|
||||
session_token: &str,
|
||||
_data_len: i64,
|
||||
req_time: OffsetDateTime,
|
||||
) -> request::Builder {
|
||||
let headers = req.headers_mut().expect("err");
|
||||
|
||||
let chunked_value = HeaderValue::from_str(&vec!["aws-chunked"].join(",")).expect("err");
|
||||
headers.insert(http::header::TRANSFER_ENCODING, chunked_value);
|
||||
if !session_token.is_empty() {
|
||||
headers.insert("X-Amz-Security-Token", HeaderValue::from_str(&session_token).expect("err"));
|
||||
}
|
||||
|
||||
let format = format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z");
|
||||
headers.insert("X-Amz-Date", HeaderValue::from_str(&req_time.format(&format).unwrap()).expect("err"));
|
||||
//req.content_length = 100;
|
||||
|
||||
req
|
||||
}
|
||||
@@ -1,26 +1,18 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::BytesMut;
|
||||
use http::request;
|
||||
use hyper::Uri;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Write;
|
||||
use time::{OffsetDateTime, format_description, macros::format_description};
|
||||
use time::{OffsetDateTime, format_description};
|
||||
|
||||
use rustfs_utils::crypto::{base64_encode, hex, hmac_sha1};
|
||||
|
||||
use super::utils::get_host_addr;
|
||||
|
||||
const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
|
||||
const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
|
||||
const SIGN_V2_ALGORITHM: &str = "AWS";
|
||||
|
||||
fn encode_url2path(req: &request::Builder, virtual_host: bool) -> String {
|
||||
let mut path = "".to_string();
|
||||
fn encode_url2path(req: &request::Builder, _virtual_host: bool) -> String {
|
||||
let path;
|
||||
|
||||
//path = serde_urlencoded::to_string(req.uri_ref().unwrap().path().unwrap()).unwrap();
|
||||
path = req.uri_ref().unwrap().path().to_string();
|
||||
@@ -42,7 +34,7 @@ pub fn pre_sign_v2(
|
||||
let d = d.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
|
||||
let epoch_expires = d.unix_timestamp() + expires;
|
||||
|
||||
let mut headers = req.headers_mut().expect("err");
|
||||
let headers = req.headers_mut().expect("headers_mut err");
|
||||
let expires_str = headers.get("Expires");
|
||||
if expires_str.is_none() {
|
||||
headers.insert("Expires", format!("{:010}", epoch_expires).parse().unwrap());
|
||||
@@ -73,14 +65,14 @@ pub fn pre_sign_v2(
|
||||
req
|
||||
}
|
||||
|
||||
fn post_pre_sign_signature_v2(policy_base64: &str, secret_access_key: &str) -> String {
|
||||
fn _post_pre_sign_signature_v2(policy_base64: &str, secret_access_key: &str) -> String {
|
||||
let signature = hex(hmac_sha1(secret_access_key, policy_base64));
|
||||
signature
|
||||
}
|
||||
|
||||
pub fn sign_v2(
|
||||
mut req: request::Builder,
|
||||
content_len: i64,
|
||||
_content_len: i64,
|
||||
access_key_id: &str,
|
||||
secret_access_key: &str,
|
||||
virtual_host: bool,
|
||||
@@ -93,7 +85,7 @@ pub fn sign_v2(
|
||||
let d2 = d.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
|
||||
|
||||
let string_to_sign = string_to_sign_v2(&req, virtual_host);
|
||||
let mut headers = req.headers_mut().expect("err");
|
||||
let headers = req.headers_mut().expect("err");
|
||||
|
||||
let date = headers.get("Date").unwrap();
|
||||
if date.to_str().unwrap() == "" {
|
||||
@@ -107,7 +99,7 @@ pub fn sign_v2(
|
||||
);
|
||||
}
|
||||
|
||||
let mut auth_header = format!("{} {}:", SIGN_V2_ALGORITHM, access_key_id);
|
||||
let auth_header = format!("{} {}:", SIGN_V2_ALGORITHM, access_key_id);
|
||||
let auth_header = format!("{}{}", auth_header, base64_encode(&hmac_sha1(secret_access_key, string_to_sign)));
|
||||
|
||||
headers.insert("Authorization", auth_header.parse().unwrap());
|
||||
@@ -214,7 +206,7 @@ fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Builder, virt
|
||||
if request_url.query().unwrap() != "" {
|
||||
let mut n: i64 = 0;
|
||||
let result = serde_urlencoded::from_str::<HashMap<String, Vec<String>>>(req.uri_ref().unwrap().query().unwrap());
|
||||
let mut vals = result.unwrap_or_default();
|
||||
let vals = result.unwrap_or_default();
|
||||
for resource in INCLUDED_QUERY {
|
||||
let vv = &vals[*resource];
|
||||
if vv.len() > 0 {
|
||||
@@ -1,28 +1,17 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::BytesMut;
|
||||
use http::HeaderMap;
|
||||
use http::Uri;
|
||||
use http::header::TRAILER;
|
||||
use http::request::{self, Request};
|
||||
use http::request;
|
||||
use lazy_static::lazy_static;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Write;
|
||||
use time::{OffsetDateTime, format_description, macros::datetime, macros::format_description};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use time::{OffsetDateTime, macros::format_description};
|
||||
use tracing::debug;
|
||||
|
||||
use super::ordered_qs::OrderedQs;
|
||||
use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4;
|
||||
use super::utils::stable_sort_by_first;
|
||||
use super::utils::{get_host_addr, sign_v4_trim_all};
|
||||
use crate::client::constants::UNSIGNED_PAYLOAD;
|
||||
use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256};
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4;
|
||||
use super::utils::{get_host_addr, sign_v4_trim_all};
|
||||
use super::constants::UNSIGNED_PAYLOAD;
|
||||
|
||||
pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256";
|
||||
pub const SERVICE_TYPE_S3: &str = "s3";
|
||||
@@ -58,7 +47,7 @@ pub fn get_scope(location: &str, t: OffsetDateTime, service_type: &str) -> Strin
|
||||
let mut ans = String::from("");
|
||||
ans.push_str(&t.format(&format).unwrap().to_string());
|
||||
ans.push('/');
|
||||
ans.push_str(location); // TODO: use a `Region` type
|
||||
ans.push_str(location);
|
||||
ans.push('/');
|
||||
ans.push_str(service_type);
|
||||
ans.push_str("/aws4_request");
|
||||
@@ -221,11 +210,6 @@ pub fn pre_sign_v4(
|
||||
return req;
|
||||
}
|
||||
|
||||
//let t = OffsetDateTime::now_utc();
|
||||
//let date = AmzDate::parse(timestamp).unwrap();
|
||||
let t2 = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
|
||||
|
||||
//let credential = get_scope(location, t, SERVICE_TYPE_S3);
|
||||
let credential = get_credential(access_key_id, location, t, SERVICE_TYPE_S3);
|
||||
let signed_headers = get_signed_headers(&req, &v4_ignored_headers);
|
||||
|
||||
@@ -277,13 +261,13 @@ pub fn pre_sign_v4(
|
||||
req
|
||||
}
|
||||
|
||||
fn post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_access_key: &str, location: &str) -> String {
|
||||
fn _post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_access_key: &str, location: &str) -> String {
|
||||
let signing_key = get_signing_key(secret_access_key, location, t, SERVICE_TYPE_S3);
|
||||
let signature = get_signature(signing_key, policy_base64);
|
||||
signature
|
||||
}
|
||||
|
||||
fn sign_v4_sts(mut req: request::Builder, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Builder {
|
||||
fn _sign_v4_sts(req: request::Builder, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Builder {
|
||||
sign_v4_inner(req, 0, access_key_id, secret_access_key, "", location, SERVICE_TYPE_STS, HeaderMap::new())
|
||||
}
|
||||
|
||||
@@ -304,7 +288,7 @@ fn sign_v4_inner(
|
||||
let t = OffsetDateTime::now_utc();
|
||||
let t2 = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
|
||||
|
||||
let mut headers = req.headers_mut().expect("err");
|
||||
let headers = req.headers_mut().expect("err");
|
||||
let format = format_description!("[year][month][day]T[hour][minute][second]Z");
|
||||
headers.insert("X-Amz-Date", t.format(&format).unwrap().to_string().parse().unwrap());
|
||||
|
||||
@@ -334,7 +318,7 @@ fn sign_v4_inner(
|
||||
let signature = get_signature(signing_key, &string_to_sign);
|
||||
//debug!("\n\ncanonical_request: \n{}\nstring_to_sign: \n{}\nsignature: \n{}\n\n", &canonical_request, &string_to_sign, &signature);
|
||||
|
||||
let mut headers = req.headers_mut().expect("err");
|
||||
let headers = req.headers_mut().expect("err");
|
||||
|
||||
let auth = format!(
|
||||
"{} Credential={}, SignedHeaders={}, Signature={}",
|
||||
@@ -352,14 +336,14 @@ fn sign_v4_inner(
|
||||
req
|
||||
}
|
||||
|
||||
fn unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: HeaderMap) {
|
||||
fn _unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: HeaderMap) {
|
||||
if trailer.len() > 0 {
|
||||
return;
|
||||
}
|
||||
let t = OffsetDateTime::now_utc();
|
||||
let t = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap());
|
||||
|
||||
let mut headers = req.headers_mut().expect("err");
|
||||
let headers = req.headers_mut().expect("err");
|
||||
let format = format_description!("[year][month][day]T[hour][minute][second]Z");
|
||||
headers.insert("X-Amz-Date", t.format(&format).unwrap().to_string().parse().unwrap());
|
||||
|
||||
@@ -379,7 +363,7 @@ fn unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: Header
|
||||
}
|
||||
|
||||
pub fn sign_v4(
|
||||
mut req: request::Builder,
|
||||
req: request::Builder,
|
||||
content_len: i64,
|
||||
access_key_id: &str,
|
||||
secret_access_key: &str,
|
||||
@@ -11,7 +11,7 @@ pub fn get_host_addr(req: &request::Builder) -> String {
|
||||
}
|
||||
if let Some(host) = host {
|
||||
if req_host != *host.to_str().unwrap() {
|
||||
return host.to_str().unwrap().to_string();
|
||||
return (*host.to_str().unwrap()).to_string();
|
||||
}
|
||||
}
|
||||
/*if req.uri_ref().unwrap().host().is_some() {
|
||||
@@ -1,6 +1,6 @@
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
use sha2::{
|
||||
Digest, Sha256 as sha_sha256,
|
||||
Sha256 as sha_sha256,
|
||||
digest::{Reset, Update},
|
||||
};
|
||||
pub trait Hasher {
|
||||
@@ -125,7 +125,7 @@ impl Default for MD5 {
|
||||
|
||||
impl Hasher for MD5 {
|
||||
fn write(&mut self, bytes: &[u8]) {
|
||||
self.hasher.update(bytes);
|
||||
Md5Digest::update(&mut self.hasher, bytes);
|
||||
}
|
||||
|
||||
fn reset(&mut self) {}
|
||||
@@ -16,6 +16,8 @@ pub mod io;
|
||||
#[cfg(feature = "hash")]
|
||||
pub mod hash;
|
||||
|
||||
pub mod hasher;
|
||||
|
||||
#[cfg(feature = "os")]
|
||||
pub mod os;
|
||||
|
||||
|
||||
@@ -94,8 +94,8 @@ shadow-rs.workspace = true
|
||||
rustfs-filemeta.workspace = true
|
||||
rustfs-utils = { workspace = true, features = ["full"] }
|
||||
rustfs-rio.workspace = true
|
||||
rustfs-signer.workspace = true
|
||||
futures-util.workspace = true
|
||||
reader = { workspace = true }
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
nix = { workspace = true }
|
||||
|
||||
@@ -25,6 +25,8 @@ use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
use xxhash_rust::xxh64;
|
||||
|
||||
//use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
|
||||
//use rustfs_notify::{initialize, notification_system};
|
||||
use super::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc};
|
||||
use super::lifecycle::{self, ExpirationOptions, IlmAction, Lifecycle, TransitionOptions};
|
||||
use super::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats};
|
||||
@@ -109,6 +111,7 @@ struct ExpiryStats {
|
||||
workers: AtomicI64,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl ExpiryStats {
|
||||
pub fn missed_tasks(&self) -> i64 {
|
||||
self.missed_expiry_tasks.load(Ordering::SeqCst)
|
||||
@@ -567,32 +570,6 @@ impl TransitionState {
|
||||
}
|
||||
}
|
||||
|
||||
struct AuditTierOp {
|
||||
tier: String,
|
||||
time_to_responsens: i64,
|
||||
output_bytes: i64,
|
||||
error: String,
|
||||
}
|
||||
|
||||
impl AuditTierOp {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub async fn new() -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
tier: String::from("tier"),
|
||||
time_to_responsens: 0,
|
||||
output_bytes: 0,
|
||||
error: String::from(""),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn string(&self) -> String {
|
||||
format!(
|
||||
"tier:{},respNS:{},tx:{},err:{}",
|
||||
self.tier, self.time_to_responsens, self.output_bytes, self.error
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init_background_expiry(api: Arc<ECStore>) {
|
||||
let mut workers = num_cpus::get() / 2;
|
||||
//globalILMConfig.getExpirationWorkers()
|
||||
@@ -717,6 +694,16 @@ pub async fn expire_transitioned_object(
|
||||
host: GLOBAL_LocalNodeName.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
/*let system = match notification_system() {
|
||||
Some(sys) => sys,
|
||||
None => {
|
||||
let config = Config::new();
|
||||
initialize(config).await?;
|
||||
notification_system().expect("Failed to initialize notification system")
|
||||
}
|
||||
};
|
||||
let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut));
|
||||
system.send_event(event).await;*/
|
||||
|
||||
Ok(dobj)
|
||||
}
|
||||
@@ -845,4 +832,4 @@ pub struct RestoreObjectRequest {
|
||||
pub output_location: OutputLocation,
|
||||
}
|
||||
|
||||
const MAX_RESTORE_OBJECT_REQUEST_SIZE: i64 = 2 << 20;
|
||||
const _MAX_RESTORE_OBJECT_REQUEST_SIZE: i64 = 2 << 20;
|
||||
|
||||
@@ -25,7 +25,7 @@ pub const TRANSITION_PENDING: &str = "pending";
|
||||
const ERR_LIFECYCLE_TOO_MANY_RULES: &str = "Lifecycle configuration allows a maximum of 1000 rules";
|
||||
const ERR_LIFECYCLE_NO_RULE: &str = "Lifecycle configuration should have at least one rule";
|
||||
const ERR_LIFECYCLE_DUPLICATE_ID: &str = "Rule ID must be unique. Found same ID for more than one rule";
|
||||
const ERR_XML_NOT_WELL_FORMED: &str = "The XML you provided was not well-formed or did not validate against our published schema";
|
||||
const _ERR_XML_NOT_WELL_FORMED: &str = "The XML you provided was not well-formed or did not validate against our published schema";
|
||||
const ERR_LIFECYCLE_BUCKET_LOCKED: &str =
|
||||
"ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an object locked bucket";
|
||||
|
||||
@@ -698,14 +698,6 @@ pub struct ExpirationOptions {
|
||||
}
|
||||
|
||||
impl ExpirationOptions {
|
||||
fn marshal_msg(&self, b: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn unmarshal_msg(&self, bts: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn msg_size(&self) -> i64 {
|
||||
1 + 7 + 10
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@
|
||||
|
||||
use s3s::dto::{LifecycleRuleFilter, Transition};
|
||||
|
||||
const ERR_TRANSITION_INVALID_DAYS: &str = "Days must be 0 or greater when used with Transition";
|
||||
const ERR_TRANSITION_INVALID_DATE: &str = "Date must be provided in ISO 8601 format";
|
||||
const _ERR_TRANSITION_INVALID_DAYS: &str = "Days must be 0 or greater when used with Transition";
|
||||
const _ERR_TRANSITION_INVALID_DATE: &str = "Date must be provided in ISO 8601 format";
|
||||
const ERR_TRANSITION_INVALID: &str =
|
||||
"Exactly one of Days (0 or greater) or Date (positive ISO 8601 format) should be present in Transition.";
|
||||
const ERR_TRANSITION_DATE_NOT_MIDNIGHT: &str = "'Date' must be at midnight GMT";
|
||||
const _ERR_TRANSITION_DATE_NOT_MIDNIGHT: &str = "'Date' must be at midnight GMT";
|
||||
|
||||
pub trait Filter {
|
||||
fn test_tags(&self, user_tags: &str) -> bool;
|
||||
|
||||
@@ -65,6 +65,7 @@ impl LastDayTierStats {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn merge(&self, m: LastDayTierStats) -> LastDayTierStats {
|
||||
let mut cl = self.clone();
|
||||
let mut cm = m.clone();
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::global::GLOBAL_TierConfigMgr;
|
||||
static XXHASH_SEED: u64 = 0;
|
||||
|
||||
#[derive(Default)]
|
||||
#[allow(dead_code)]
|
||||
struct ObjSweeper {
|
||||
object: String,
|
||||
bucket: String,
|
||||
@@ -29,6 +30,7 @@ struct ObjSweeper {
|
||||
remote_object: String,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl ObjSweeper {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub async fn new(bucket: &str, object: &str) -> Result<Self, std::io::Error> {
|
||||
@@ -103,6 +105,7 @@ impl ObjSweeper {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(unused_assignments)]
|
||||
pub struct Jentry {
|
||||
obj_name: String,
|
||||
version_id: String,
|
||||
|
||||
@@ -4,21 +4,15 @@ use time::{OffsetDateTime, format_description};
|
||||
use s3s::dto::{Date, ObjectLockLegalHold, ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode};
|
||||
use s3s::header::{X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE};
|
||||
|
||||
//const AMZ_OBJECTLOCK_BYPASS_RET_GOVERNANCE: &str = "X-Amz-Bypass-Governance-Retention";
|
||||
//const AMZ_OBJECTLOCK_RETAIN_UNTIL_DATE: &str = "X-Amz-Object-Lock-Retain-Until-Date";
|
||||
//const AMZ_OBJECTLOCK_MODE: &str = "X-Amz-Object-Lock-Mode";
|
||||
//const AMZ_OBJECTLOCK_LEGALHOLD: &str = "X-Amz-Object-Lock-Legal-Hold";
|
||||
|
||||
// Commented out unused constants to avoid dead code warnings
|
||||
// const ERR_MALFORMED_BUCKET_OBJECT_CONFIG: &str = "invalid bucket object lock config";
|
||||
// const ERR_INVALID_RETENTION_DATE: &str = "date must be provided in ISO 8601 format";
|
||||
// const ERR_PAST_OBJECTLOCK_RETAIN_DATE: &str = "the retain until date must be in the future";
|
||||
// const ERR_UNKNOWN_WORMMODE_DIRECTIVE: &str = "unknown WORM mode directive";
|
||||
// const ERR_OBJECTLOCK_MISSING_CONTENT_MD5: &str =
|
||||
// "content-MD5 HTTP header is required for Put Object requests with Object Lock parameters";
|
||||
// const ERR_OBJECTLOCK_INVALID_HEADERS: &str =
|
||||
// "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied";
|
||||
// const ERR_MALFORMED_XML: &str = "the XML you provided was not well-formed or did not validate against our published schema";
|
||||
const _ERR_MALFORMED_BUCKET_OBJECT_CONFIG: &str = "invalid bucket object lock config";
|
||||
const _ERR_INVALID_RETENTION_DATE: &str = "date must be provided in ISO 8601 format";
|
||||
const _ERR_PAST_OBJECTLOCK_RETAIN_DATE: &str = "the retain until date must be in the future";
|
||||
const _ERR_UNKNOWN_WORMMODE_DIRECTIVE: &str = "unknown WORM mode directive";
|
||||
const _ERR_OBJECTLOCK_MISSING_CONTENT_MD5: &str =
|
||||
"content-MD5 HTTP header is required for Put Object requests with Object Lock parameters";
|
||||
const _ERR_OBJECTLOCK_INVALID_HEADERS: &str =
|
||||
"x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied";
|
||||
const _ERR_MALFORMED_XML: &str = "the XML you provided was not well-formed or did not validate against our published schema";
|
||||
|
||||
pub fn utc_now_ntp() -> OffsetDateTime {
|
||||
OffsetDateTime::now_utc()
|
||||
|
||||
@@ -17,7 +17,7 @@ impl BucketObjectLockSys {
|
||||
}
|
||||
|
||||
pub async fn get(bucket: &str) -> Option<DefaultRetention> {
|
||||
if let Some(object_lock_config) = get_object_lock_config(bucket).await {
|
||||
if let Ok(object_lock_config) = get_object_lock_config(bucket).await {
|
||||
if let Some(object_lock_rule) = object_lock_config.0.rule {
|
||||
return object_lock_rule.default_retention;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::collections::HashMap;
|
||||
|
||||
use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart};
|
||||
use crate::{disk::DiskAPI, store_api::GetObjectReader};
|
||||
use reader::hasher::{Hasher, Sha256};
|
||||
use rustfs_utils::hasher::{Hasher, Sha256};
|
||||
use rustfs_utils::crypto::{base64_decode, base64_encode};
|
||||
use s3s::header::{
|
||||
X_AMZ_CHECKSUM_ALGORITHM, X_AMZ_CHECKSUM_CRC32, X_AMZ_CHECKSUM_CRC32C, X_AMZ_CHECKSUM_SHA1, X_AMZ_CHECKSUM_SHA256,
|
||||
@@ -234,6 +234,7 @@ pub struct Checksum {
|
||||
computed: bool,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Checksum {
|
||||
fn new(t: ChecksumMode, b: &[u8]) -> Checksum {
|
||||
if t.is_set() && b.len() == t.raw_byte_len() {
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use http::status::StatusCode;
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
|
||||
#[derive(Default, thiserror::Error, Debug, PartialEq)]
|
||||
#[derive(Default, thiserror::Error, Debug, Clone, PartialEq)]
|
||||
pub struct AdminError {
|
||||
pub code: &'static str,
|
||||
pub message: &'static str,
|
||||
pub code: String,
|
||||
pub message: String,
|
||||
pub status_code: StatusCode,
|
||||
}
|
||||
|
||||
@@ -15,18 +15,18 @@ impl Display for AdminError {
|
||||
}
|
||||
|
||||
impl AdminError {
|
||||
pub fn new(code: &'static str, message: &'static str, status_code: StatusCode) -> Self {
|
||||
pub fn new(code: &str, message: &str, status_code: StatusCode) -> Self {
|
||||
Self {
|
||||
code,
|
||||
message,
|
||||
code: code.to_string(),
|
||||
message: message.to_string(),
|
||||
status_code,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn msg(message: &'static str) -> Self {
|
||||
pub fn msg(message: &str) -> Self {
|
||||
Self {
|
||||
code: "InternalError",
|
||||
message,
|
||||
code: "InternalError".to_string(),
|
||||
message: message.to_string(),
|
||||
status_code: StatusCode::INTERNAL_SERVER_ERROR,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,6 +61,7 @@ impl TransitionClient {
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[allow(dead_code)]
|
||||
pub struct GetRequest {
|
||||
pub buffer: Vec<u8>,
|
||||
pub offset: i64,
|
||||
@@ -72,6 +73,7 @@ pub struct GetRequest {
|
||||
pub setting_object_info: bool,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct GetResponse {
|
||||
pub size: i64,
|
||||
//pub error: error,
|
||||
|
||||
@@ -14,6 +14,7 @@ use tracing::warn;
|
||||
use crate::client::api_error_response::err_invalid_argument;
|
||||
|
||||
#[derive(Default)]
|
||||
#[allow(dead_code)]
|
||||
pub struct AdvancedGetOptions {
|
||||
replication_deletemarker: bool,
|
||||
is_replication_ready_for_deletemarker: bool,
|
||||
@@ -30,8 +31,6 @@ pub struct GetObjectOptions {
|
||||
pub internal: AdvancedGetOptions,
|
||||
}
|
||||
|
||||
type StatObjectOptions = GetObjectOptions;
|
||||
|
||||
impl Default for GetObjectOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
||||
@@ -258,6 +258,7 @@ impl TransitionClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct ListObjectsOptions {
|
||||
reverse_versions: bool,
|
||||
with_versions: bool,
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use time::{Duration, OffsetDateTime, macros::format_description};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use reader::hasher::Hasher;
|
||||
use rustfs_utils::hasher::Hasher;
|
||||
use s3s::dto::{ObjectLockLegalHoldStatus, ObjectLockRetentionMode, ReplicationStatus};
|
||||
use s3s::header::{
|
||||
X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, X_AMZ_REPLICATION_STATUS,
|
||||
@@ -124,6 +124,7 @@ impl Default for PutObjectOptions {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PutObjectOptions {
|
||||
fn set_matche_tag(&mut self, etag: &str) {
|
||||
if etag == "*" {
|
||||
|
||||
@@ -14,8 +14,6 @@ use crate::client::{
|
||||
transition_api::TransitionClient,
|
||||
};
|
||||
|
||||
const NULL_VERSION_ID: &str = "null";
|
||||
|
||||
pub fn is_object(reader: &ReaderImpl) -> bool {
|
||||
todo!();
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use tracing::{error, info};
|
||||
use url::form_urlencoded::Serializer;
|
||||
use uuid::Uuid;
|
||||
|
||||
use reader::hasher::Hasher;
|
||||
use rustfs_utils::hasher::Hasher;
|
||||
use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID};
|
||||
use s3s::{Body, dto::StreamingBlob};
|
||||
//use crate::disk::{Reader, BufferReader};
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::client::{
|
||||
constants::ISO8601_DATEFORMAT,
|
||||
transition_api::{ReaderImpl, RequestMetadata, TransitionClient, UploadInfo},
|
||||
};
|
||||
use reader::hasher::Hasher;
|
||||
use rustfs_utils::hasher::Hasher;
|
||||
use rustfs_utils::{crypto::base64_encode, path::trim_etag};
|
||||
use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID};
|
||||
|
||||
|
||||
@@ -24,14 +24,15 @@ use crate::{
|
||||
disk::DiskAPI,
|
||||
store_api::{GetObjectReader, ObjectInfo, StorageAPI},
|
||||
};
|
||||
use reader::hasher::{sum_md5_base64, sum_sha256_hex};
|
||||
use rustfs_utils::hasher::{sum_md5_base64, sum_sha256_hex};
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
|
||||
pub struct RemoveBucketOptions {
|
||||
forced_elete: bool,
|
||||
_forced_elete: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct AdvancedRemoveOptions {
|
||||
replication_delete_marker: bool,
|
||||
replication_status: ReplicationStatus,
|
||||
@@ -426,8 +427,10 @@ impl TransitionClient {
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[allow(dead_code)]
|
||||
pub struct RemoveObjectError {
|
||||
object_name: String,
|
||||
#[allow(dead_code)]
|
||||
version_id: String,
|
||||
err: Option<std::io::Error>,
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ pub struct ListBucketV2Result {
|
||||
pub start_after: String,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct Version {
|
||||
etag: String,
|
||||
is_latest: bool,
|
||||
@@ -72,12 +73,6 @@ pub struct ListVersionsResult {
|
||||
next_version_id_marker: String,
|
||||
}
|
||||
|
||||
impl ListVersionsResult {
|
||||
fn unmarshal_xml() -> Result<(), std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ListBucketResult {
|
||||
common_prefixes: Vec<CommonPrefix>,
|
||||
contents: Vec<transition_api::ObjectInfo>,
|
||||
|
||||
@@ -17,8 +17,7 @@ use crate::client::{
|
||||
api_error_response::{http_resp_to_error_response, to_error_response},
|
||||
transition_api::{Document, TransitionClient},
|
||||
};
|
||||
use crate::signer;
|
||||
use reader::hasher::{Hasher, Sha256};
|
||||
use rustfs_utils::hasher::{Hasher, Sha256};
|
||||
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
|
||||
use s3s::Body;
|
||||
use s3s::S3ErrorCode;
|
||||
@@ -151,7 +150,7 @@ impl TransitionClient {
|
||||
}
|
||||
|
||||
if signer_type == SignatureType::SignatureV2 {
|
||||
let req_builder = signer::sign_v2(req_builder, 0, &access_key_id, &secret_access_key, is_virtual_style);
|
||||
let req_builder = rustfs_signer::sign_v2(req_builder, 0, &access_key_id, &secret_access_key, is_virtual_style);
|
||||
let req = match req_builder.body(Body::empty()) {
|
||||
Ok(req) => return Ok(req),
|
||||
Err(err) => {
|
||||
@@ -169,7 +168,7 @@ impl TransitionClient {
|
||||
.headers_mut()
|
||||
.expect("err")
|
||||
.insert("X-Amz-Content-Sha256", content_sha256.parse().unwrap());
|
||||
let req_builder = signer::sign_v4(req_builder, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1");
|
||||
let req_builder = rustfs_signer::sign_v4(req_builder, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1");
|
||||
let req = match req_builder.body(Body::empty()) {
|
||||
Ok(req) => return Ok(req),
|
||||
Err(err) => {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
#![allow(clippy::map_entry)]
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
@@ -28,4 +27,4 @@ pub const ISO8601_DATEFORMAT: &[FormatItem<'_>] =
|
||||
|
||||
pub const GET_OBJECT_ATTRIBUTES_TAGS: &str = "ETag,Checksum,StorageClass,ObjectSize,ObjectParts";
|
||||
pub const GET_OBJECT_ATTRIBUTES_MAX_PARTS: i64 = 1000;
|
||||
const RUSTFS_BUCKET_SOURCE_MTIME: &str = "X-RustFs-Source-Mtime";
|
||||
const _RUSTFS_BUCKET_SOURCE_MTIME: &str = "X-RustFs-Source-Mtime";
|
||||
|
||||
@@ -141,28 +141,6 @@ impl ErrorResponse {
|
||||
}
|
||||
}
|
||||
|
||||
struct Error {
|
||||
code: String,
|
||||
message: String,
|
||||
bucket_name: String,
|
||||
key: String,
|
||||
resource: String,
|
||||
request_id: String,
|
||||
host_id: String,
|
||||
region: String,
|
||||
server: String,
|
||||
status_code: i64,
|
||||
}
|
||||
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
if self.message == "" {
|
||||
return write!(f, "{}", format!("Error response code {}.", self.code));
|
||||
}
|
||||
write!(f, "{}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn xml_decoder<T>(body: &[u8]) -> Result<T, std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
@@ -24,16 +24,12 @@ pub struct PutObjReader {
|
||||
//pub sealMD5Fn: SealMD5CurrFn,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PutObjReader {
|
||||
pub fn new(raw_reader: HashReader) -> Self {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
//self.reader.size()
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn md5_current_hex_string(&self) -> String {
|
||||
todo!();
|
||||
}
|
||||
|
||||
@@ -45,9 +45,8 @@ use crate::client::{
|
||||
constants::{UNSIGNED_PAYLOAD, UNSIGNED_PAYLOAD_TRAILER},
|
||||
credentials::{CredContext, Credentials, SignatureType, Static},
|
||||
};
|
||||
use crate::signer;
|
||||
use crate::{checksum::ChecksumMode, store_api::GetObjectReader};
|
||||
use reader::hasher::{MD5, Sha256};
|
||||
use rustfs_utils::hasher::{MD5, Sha256};
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_utils::{
|
||||
net::get_endpoint_url,
|
||||
@@ -57,7 +56,7 @@ use s3s::S3ErrorCode;
|
||||
use s3s::dto::ReplicationStatus;
|
||||
use s3s::{Body, dto::Owner};
|
||||
|
||||
const C_USER_AGENT_PREFIX: &str = "RustFS (linux; x86)";
|
||||
const _C_USER_AGENT_PREFIX: &str = "RustFS (linux; x86)";
|
||||
const C_USER_AGENT: &str = "RustFS (linux; x86)";
|
||||
|
||||
const SUCCESS_STATUS: [StatusCode; 3] = [StatusCode::OK, StatusCode::NO_CONTENT, StatusCode::PARTIAL_CONTENT];
|
||||
@@ -433,9 +432,9 @@ impl TransitionClient {
|
||||
}
|
||||
if signer_type == SignatureType::SignatureV2 {
|
||||
req_builder =
|
||||
signer::pre_sign_v2(req_builder, &access_key_id, &secret_access_key, metadata.expires, is_virtual_host);
|
||||
rustfs_signer::pre_sign_v2(req_builder, &access_key_id, &secret_access_key, metadata.expires, is_virtual_host);
|
||||
} else if signer_type == SignatureType::SignatureV4 {
|
||||
req_builder = signer::pre_sign_v4(
|
||||
req_builder = rustfs_signer::pre_sign_v4(
|
||||
req_builder,
|
||||
&access_key_id,
|
||||
&secret_access_key,
|
||||
@@ -486,7 +485,7 @@ impl TransitionClient {
|
||||
|
||||
if signer_type == SignatureType::SignatureV2 {
|
||||
req_builder =
|
||||
signer::sign_v2(req_builder, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host);
|
||||
rustfs_signer::sign_v2(req_builder, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host);
|
||||
} else if metadata.stream_sha256 && !self.secure {
|
||||
if metadata.trailer.len() > 0 {
|
||||
//req.Trailer = metadata.trailer;
|
||||
@@ -494,7 +493,7 @@ impl TransitionClient {
|
||||
req_builder = req_builder.header(http::header::TRAILER, v.clone());
|
||||
}
|
||||
}
|
||||
//req_builder = signer::streaming_sign_v4(req_builder, &access_key_id,
|
||||
//req_builder = rustfs_signer::streaming_sign_v4(req_builder, &access_key_id,
|
||||
// &secret_access_key, &session_token, &location, metadata.content_length, OffsetDateTime::now_utc(), self.sha256_hasher());
|
||||
} else {
|
||||
let mut sha_header = UNSIGNED_PAYLOAD.to_string();
|
||||
@@ -509,7 +508,7 @@ impl TransitionClient {
|
||||
req_builder = req_builder
|
||||
.header::<HeaderName, HeaderValue>("X-Amz-Content-Sha256".parse().unwrap(), sha_header.parse().expect("err"));
|
||||
|
||||
req_builder = signer::sign_v4_trailer(
|
||||
req_builder = rustfs_signer::sign_v4_trailer(
|
||||
req_builder,
|
||||
&access_key_id,
|
||||
&secret_access_key,
|
||||
@@ -614,23 +613,6 @@ impl TransitionClient {
|
||||
}
|
||||
}
|
||||
|
||||
struct LockedRandSource {
|
||||
src: u64, //rand.Source,
|
||||
}
|
||||
|
||||
impl LockedRandSource {
|
||||
fn int63(&self) -> i64 {
|
||||
/*let n = self.src.int63();
|
||||
n*/
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn seed(&self, seed: i64) {
|
||||
//self.src.seed(seed);
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RequestMetadata {
|
||||
pub pre_sign_url: bool,
|
||||
pub bucket_name: String,
|
||||
|
||||
@@ -810,6 +810,7 @@ pub fn error_resp_to_object_err(err: ErrorResponse, params: Vec<&str>) -> std::i
|
||||
}
|
||||
|
||||
let r_err = err;
|
||||
let err;
|
||||
let bucket = bucket.to_string();
|
||||
let object = object.to_string();
|
||||
let version_id = version_id.to_string();
|
||||
@@ -870,9 +871,10 @@ pub fn error_resp_to_object_err(err: ErrorResponse, params: Vec<&str>) -> std::i
|
||||
/*S3ErrorCode::ReplicationPermissionCheck => {
|
||||
err = std::io::Error::other(StorageError::ReplicationPermissionCheck);
|
||||
}*/
|
||||
_ => std::io::Error::other("err"),
|
||||
_ => {
|
||||
err = err_;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn storage_to_object_err(err: Error, params: Vec<&str>) -> S3Error {
|
||||
let storage_err = &err;
|
||||
|
||||
@@ -146,6 +146,7 @@ impl TierStats {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[allow(dead_code)]
|
||||
struct AllTierStats {
|
||||
tiers: HashMap<String, TierStats>,
|
||||
|
||||
@@ -34,7 +34,6 @@ pub mod checksum;
|
||||
pub mod client;
|
||||
pub mod event;
|
||||
pub mod event_notification;
|
||||
pub mod signer;
|
||||
pub mod tier;
|
||||
|
||||
pub use global::new_object_layer_fn;
|
||||
|
||||
@@ -1,109 +0,0 @@
|
||||
//! Ordered query strings
|
||||
|
||||
use crate::signer::utils::stable_sort_by_first;
|
||||
|
||||
/// Immutable query string container
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct OrderedQs {
|
||||
/// Ascending query strings
|
||||
qs: Vec<(String, String)>,
|
||||
}
|
||||
|
||||
/// [`OrderedQs`]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("ParseOrderedQsError: {inner}")]
|
||||
pub struct ParseOrderedQsError {
|
||||
/// url decode error
|
||||
inner: serde_urlencoded::de::Error,
|
||||
}
|
||||
|
||||
impl OrderedQs {
|
||||
/// Constructs [`OrderedQs`] from vec
|
||||
///
|
||||
/// + strings must be url-decoded
|
||||
#[cfg(test)]
|
||||
#[must_use]
|
||||
pub fn from_vec_unchecked(mut v: Vec<(String, String)>) -> Self {
|
||||
stable_sort_by_first(&mut v);
|
||||
Self { qs: v }
|
||||
}
|
||||
|
||||
/// Parses [`OrderedQs`] from query
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns [`ParseOrderedQsError`] if query cannot be decoded
|
||||
pub fn parse(query: &str) -> Result<Self, ParseOrderedQsError> {
|
||||
let result = serde_urlencoded::from_str::<Vec<(String, String)>>(query);
|
||||
let mut v = result.map_err(|e| ParseOrderedQsError { inner: e })?;
|
||||
stable_sort_by_first(&mut v);
|
||||
Ok(Self { qs: v })
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn has(&self, name: &str) -> bool {
|
||||
self.qs.binary_search_by_key(&name, |x| x.0.as_str()).is_ok()
|
||||
}
|
||||
|
||||
/// Gets query values by name. Time `O(logn)`
|
||||
pub fn get_all(&self, name: &str) -> impl Iterator<Item = &str> + use<'_> {
|
||||
let qs = self.qs.as_slice();
|
||||
|
||||
let lower_bound = qs.partition_point(|x| x.0.as_str() < name);
|
||||
let upper_bound = qs.partition_point(|x| x.0.as_str() <= name);
|
||||
|
||||
qs[lower_bound..upper_bound].iter().map(|x| x.1.as_str())
|
||||
}
|
||||
|
||||
pub fn get_unique(&self, name: &str) -> Option<&str> {
|
||||
let qs = self.qs.as_slice();
|
||||
let lower_bound = qs.partition_point(|x| x.0.as_str() < name);
|
||||
|
||||
let mut iter = qs[lower_bound..].iter();
|
||||
let pair = iter.next()?;
|
||||
|
||||
if let Some(following) = iter.next() {
|
||||
if following.0 == name {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
(pair.0.as_str() == name).then_some(pair.1.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[(String, String)]> for OrderedQs {
|
||||
fn as_ref(&self) -> &[(String, String)] {
|
||||
self.qs.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn tag() {
|
||||
{
|
||||
let query = "tagging";
|
||||
let qs = OrderedQs::parse(query).unwrap();
|
||||
assert_eq!(qs.as_ref(), &[("tagging".to_owned(), String::new())]);
|
||||
|
||||
assert_eq!(qs.get_unique("taggin"), None);
|
||||
assert_eq!(qs.get_unique("tagging"), Some(""));
|
||||
assert_eq!(qs.get_unique("taggingg"), None);
|
||||
}
|
||||
|
||||
{
|
||||
let query = "tagging&tagging";
|
||||
let qs = OrderedQs::parse(query).unwrap();
|
||||
assert_eq!(
|
||||
qs.as_ref(),
|
||||
&[("tagging".to_owned(), String::new()), ("tagging".to_owned(), String::new())]
|
||||
);
|
||||
|
||||
assert_eq!(qs.get_unique("taggin"), None);
|
||||
assert_eq!(qs.get_unique("tagging"), None);
|
||||
assert_eq!(qs.get_unique("taggingg"), None);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
#![allow(unused_must_use)]
|
||||
|
||||
use http::request;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
pub fn streaming_unsigned_v4(
|
||||
mut req: request::Builder,
|
||||
session_token: &str,
|
||||
data_len: i64,
|
||||
req_time: OffsetDateTime,
|
||||
) -> request::Builder {
|
||||
todo!();
|
||||
}
|
||||
@@ -53,42 +53,40 @@ pub const TIER_CONFIG_FORMAT: u16 = 1;
|
||||
pub const TIER_CONFIG_V1: u16 = 1;
|
||||
pub const TIER_CONFIG_VERSION: u16 = 1;
|
||||
|
||||
const _TIER_CFG_REFRESH_AT_HDR: &str = "X-RustFS-TierCfg-RefreshedAt";
|
||||
|
||||
lazy_static! {
|
||||
//pub static ref TIER_CONFIG_PATH: PathBuf = path_join(&[PathBuf::from(RUSTFS_CONFIG_PREFIX), PathBuf::from(TIER_CONFIG_FILE)]);
|
||||
pub static ref ERR_TIER_MISSING_CREDENTIALS: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierMissingCredentials".to_string(),
|
||||
message: "Specified remote credentials are empty".to_string(),
|
||||
status_code: StatusCode::FORBIDDEN,
|
||||
};
|
||||
|
||||
pub static ref ERR_TIER_BACKEND_IN_USE: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierBackendInUse".to_string(),
|
||||
message: "Specified remote tier is already in use".to_string(),
|
||||
status_code: StatusCode::CONFLICT,
|
||||
};
|
||||
|
||||
pub static ref ERR_TIER_TYPE_UNSUPPORTED: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierTypeUnsupported".to_string(),
|
||||
message: "Specified tier type is unsupported".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub static ref ERR_TIER_BACKEND_NOT_EMPTY: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierBackendNotEmpty".to_string(),
|
||||
message: "Specified remote backend is not empty".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub static ref ERR_TIER_INVALID_CONFIG: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierInvalidConfig".to_string(),
|
||||
message: "Unable to setup remote tier, check tier configuration".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
}
|
||||
|
||||
const TIER_CFG_REFRESH_AT_HDR: &str = "X-RustFS-TierCfg-RefreshedAt";
|
||||
|
||||
pub const ERR_TIER_MISSING_CREDENTIALS: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierMissingCredentials",
|
||||
message: "Specified remote credentials are empty",
|
||||
status_code: StatusCode::FORBIDDEN,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_BACKEND_IN_USE: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierBackendInUse",
|
||||
message: "Specified remote tier is already in use",
|
||||
status_code: StatusCode::CONFLICT,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_TYPE_UNSUPPORTED: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierTypeUnsupported",
|
||||
message: "Specified tier type is unsupported",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_BACKEND_NOT_EMPTY: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierBackendNotEmpty",
|
||||
message: "Specified remote backend is not empty",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_INVALID_CONFIG: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierInvalidConfig",
|
||||
message: "Unable to setup remote tier, check tier configuration",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TierConfigMgr {
|
||||
#[serde(skip)]
|
||||
@@ -108,20 +106,13 @@ impl TierConfigMgr {
|
||||
|
||||
pub fn unmarshal(data: &[u8]) -> std::result::Result<TierConfigMgr, std::io::Error> {
|
||||
let cfg: TierConfigMgr = serde_json::from_slice(data)?;
|
||||
//let mut cfg = TierConfigMgr(m);
|
||||
//let mut cfg = m;
|
||||
Ok(cfg)
|
||||
}
|
||||
|
||||
pub fn marshal(&self) -> std::result::Result<Bytes, std::io::Error> {
|
||||
let data = serde_json::to_vec(&self)?;
|
||||
|
||||
//let mut data = Vec<u8>::with_capacity(self.msg_size()+4);
|
||||
let mut data = Bytes::from(data);
|
||||
|
||||
//LittleEndian::write_u16(&mut data[0..2], TIER_CONFIG_FORMAT);
|
||||
//LittleEndian::write_u16(&mut data[2..4], TIER_CONFIG_VERSION);
|
||||
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
@@ -144,12 +135,12 @@ impl TierConfigMgr {
|
||||
pub async fn add(&mut self, tier: TierConfig, force: bool) -> std::result::Result<(), AdminError> {
|
||||
let tier_name = &tier.name;
|
||||
if tier_name != tier_name.to_uppercase().as_str() {
|
||||
return Err(ERR_TIER_NAME_NOT_UPPERCASE);
|
||||
return Err(ERR_TIER_NAME_NOT_UPPERCASE.clone());
|
||||
}
|
||||
|
||||
let (_, b) = self.is_tier_name_in_use(tier_name);
|
||||
if b {
|
||||
return Err(ERR_TIER_ALREADY_EXISTS);
|
||||
return Err(ERR_TIER_ALREADY_EXISTS.clone());
|
||||
}
|
||||
|
||||
let d = new_warm_backend(&tier, true).await?;
|
||||
@@ -159,19 +150,22 @@ impl TierConfigMgr {
|
||||
match in_use {
|
||||
Ok(b) => {
|
||||
if b {
|
||||
return Err(ERR_TIER_BACKEND_IN_USE);
|
||||
return Err(ERR_TIER_BACKEND_IN_USE.clone());
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("tier add failed, err: {:?}", err);
|
||||
if err.to_string().contains("connect") {
|
||||
return Err(ERR_TIER_CONNECT_ERR);
|
||||
return Err(ERR_TIER_CONNECT_ERR.clone());
|
||||
} else if err.to_string().contains("authorization") {
|
||||
return Err(ERR_TIER_INVALID_CREDENTIALS);
|
||||
return Err(ERR_TIER_INVALID_CREDENTIALS.clone());
|
||||
} else if err.to_string().contains("bucket") {
|
||||
return Err(ERR_TIER_BUCKET_NOT_FOUND);
|
||||
return Err(ERR_TIER_BUCKET_NOT_FOUND.clone());
|
||||
}
|
||||
return Err(ERR_TIER_PERM_ERR);
|
||||
let mut e = ERR_TIER_PERM_ERR.clone();
|
||||
e.message.push('.');
|
||||
e.message.push_str(&err.to_string());
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -185,21 +179,21 @@ impl TierConfigMgr {
|
||||
pub async fn remove(&mut self, tier_name: &str, force: bool) -> std::result::Result<(), AdminError> {
|
||||
let d = self.get_driver(tier_name).await;
|
||||
if let Err(err) = d {
|
||||
match err {
|
||||
ERR_TIER_NOT_FOUND => {
|
||||
return Ok(());
|
||||
}
|
||||
_ => {
|
||||
return Err(err);
|
||||
}
|
||||
if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
if !force {
|
||||
let inuse = d.expect("err").in_use().await;
|
||||
if let Err(err) = inuse {
|
||||
return Err(ERR_TIER_PERM_ERR);
|
||||
let mut e = ERR_TIER_PERM_ERR.clone();
|
||||
e.message.push('.');
|
||||
e.message.push_str(&err.to_string());
|
||||
return Err(e);
|
||||
} else if inuse.expect("err") {
|
||||
return Err(ERR_TIER_BACKEND_NOT_EMPTY);
|
||||
return Err(ERR_TIER_BACKEND_NOT_EMPTY.clone());
|
||||
}
|
||||
}
|
||||
self.tiers.remove(tier_name);
|
||||
@@ -254,7 +248,7 @@ impl TierConfigMgr {
|
||||
pub async fn edit(&mut self, tier_name: &str, creds: TierCreds) -> std::result::Result<(), AdminError> {
|
||||
let (tier_type, exists) = self.is_tier_name_in_use(tier_name);
|
||||
if !exists {
|
||||
return Err(ERR_TIER_NOT_FOUND);
|
||||
return Err(ERR_TIER_NOT_FOUND.clone());
|
||||
}
|
||||
|
||||
let mut cfg = self.tiers[tier_name].clone();
|
||||
@@ -276,7 +270,7 @@ impl TierConfigMgr {
|
||||
TierType::RustFS => {
|
||||
let mut rustfs = cfg.rustfs.as_mut().expect("err");
|
||||
if creds.access_key == "" || creds.secret_key == "" {
|
||||
return Err(ERR_TIER_MISSING_CREDENTIALS);
|
||||
return Err(ERR_TIER_MISSING_CREDENTIALS.clone());
|
||||
}
|
||||
rustfs.access_key = creds.access_key;
|
||||
rustfs.secret_key = creds.secret_key;
|
||||
@@ -284,7 +278,7 @@ impl TierConfigMgr {
|
||||
TierType::MinIO => {
|
||||
let mut minio = cfg.minio.as_mut().expect("err");
|
||||
if creds.access_key == "" || creds.secret_key == "" {
|
||||
return Err(ERR_TIER_MISSING_CREDENTIALS);
|
||||
return Err(ERR_TIER_MISSING_CREDENTIALS.clone());
|
||||
}
|
||||
minio.access_key = creds.access_key;
|
||||
minio.secret_key = creds.secret_key;
|
||||
@@ -304,7 +298,7 @@ impl TierConfigMgr {
|
||||
Entry::Vacant(e) => {
|
||||
let t = self.tiers.get(tier_name);
|
||||
if t.is_none() {
|
||||
return Err(ERR_TIER_NOT_FOUND);
|
||||
return Err(ERR_TIER_NOT_FOUND.clone());
|
||||
}
|
||||
let d = new_warm_backend(t.expect("err"), false).await?;
|
||||
e.insert(d)
|
||||
@@ -332,6 +326,12 @@ impl TierConfigMgr {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn clear_tier(&mut self, force: bool) -> std::result::Result<(), AdminError> {
|
||||
self.tiers.clear();
|
||||
self.driver_cache.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", name = "tier_save", skip(self))]
|
||||
pub async fn save(&self) -> std::result::Result<(), std::io::Error> {
|
||||
let Some(api) = new_object_layer_fn() else {
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use tracing::info;
|
||||
@@ -12,9 +5,6 @@ use tracing::info;
|
||||
const C_TIER_CONFIG_VER: &str = "v1";
|
||||
|
||||
const ERR_TIER_NAME_EMPTY: &str = "remote tier name empty";
|
||||
const ERR_TIER_INVALID_CONFIG: &str = "invalid tier config";
|
||||
const ERR_TIER_INVALID_CONFIG_VERSION: &str = "invalid tier config version";
|
||||
const ERR_TIER_TYPE_UNSUPPORTED: &str = "unsupported tier type";
|
||||
|
||||
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
|
||||
pub enum TierType {
|
||||
@@ -128,20 +118,8 @@ impl Clone for TierConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl TierConfig {
|
||||
pub fn unmarshal(data: &[u8]) -> Result<TierConfig, std::io::Error> {
|
||||
/*let m: HashMap<String, HashMap<String, KVS>> = serde_json::from_slice(data)?;
|
||||
let mut cfg = TierConfig(m);
|
||||
cfg.set_defaults();
|
||||
Ok(cfg)*/
|
||||
todo!();
|
||||
}
|
||||
|
||||
pub fn marshal(&self) -> Result<Vec<u8>, std::io::Error> {
|
||||
let data = serde_json::to_vec(&self)?;
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
fn endpoint(&self) -> String {
|
||||
match self.tier_type {
|
||||
TierType::S3 => self.s3.as_ref().expect("err").endpoint.clone(),
|
||||
@@ -198,14 +176,14 @@ impl TierConfig {
|
||||
pub struct TierS3 {
|
||||
pub name: String,
|
||||
pub endpoint: String,
|
||||
#[serde(rename = "accesskey")]
|
||||
#[serde(rename = "accessKey")]
|
||||
pub access_key: String,
|
||||
#[serde(rename = "secretkey")]
|
||||
#[serde(rename = "secretKey")]
|
||||
pub secret_key: String,
|
||||
pub bucket: String,
|
||||
pub prefix: String,
|
||||
pub region: String,
|
||||
#[serde(rename = "storageclass")]
|
||||
#[serde(rename = "storageClass")]
|
||||
pub storage_class: String,
|
||||
#[serde(skip)]
|
||||
pub aws_role: bool,
|
||||
@@ -220,6 +198,7 @@ pub struct TierS3 {
|
||||
}
|
||||
|
||||
impl TierS3 {
|
||||
#[allow(dead_code)]
|
||||
fn new<F>(name: &str, access_key: &str, secret_key: &str, bucket: &str, options: Vec<F>) -> Result<TierConfig, std::io::Error>
|
||||
where
|
||||
F: Fn(TierS3) -> Box<Result<(), std::io::Error>> + Send + Sync + 'static,
|
||||
@@ -258,14 +237,14 @@ impl TierS3 {
|
||||
pub struct TierRustFS {
|
||||
pub name: String,
|
||||
pub endpoint: String,
|
||||
#[serde(rename = "accesskey")]
|
||||
#[serde(rename = "accessKey")]
|
||||
pub access_key: String,
|
||||
#[serde(rename = "secretkey")]
|
||||
#[serde(rename = "secretKey")]
|
||||
pub secret_key: String,
|
||||
pub bucket: String,
|
||||
pub prefix: String,
|
||||
pub region: String,
|
||||
#[serde(rename = "storageclass")]
|
||||
#[serde(rename = "storageClass")]
|
||||
pub storage_class: String,
|
||||
}
|
||||
|
||||
@@ -274,9 +253,9 @@ pub struct TierRustFS {
|
||||
pub struct TierMinIO {
|
||||
pub name: String,
|
||||
pub endpoint: String,
|
||||
#[serde(rename = "accesskey")]
|
||||
#[serde(rename = "accessKey")]
|
||||
pub access_key: String,
|
||||
#[serde(rename = "secretkey")]
|
||||
#[serde(rename = "secretKey")]
|
||||
pub secret_key: String,
|
||||
pub bucket: String,
|
||||
pub prefix: String,
|
||||
@@ -284,6 +263,7 @@ pub struct TierMinIO {
|
||||
}
|
||||
|
||||
impl TierMinIO {
|
||||
#[allow(dead_code)]
|
||||
fn new<F>(
|
||||
name: &str,
|
||||
endpoint: &str,
|
||||
|
||||
@@ -1,29 +1,7 @@
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
#![allow(unused_must_use)]
|
||||
#![allow(clippy::all)]
|
||||
|
||||
use crate::tier::tier::TierConfigMgr;
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl TierConfigMgr {
|
||||
fn decode_msg(/*dc *msgp.Reader*/) -> Result<(), std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn encode_msg(/*en *msgp.Writer*/) -> Result<(), std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
pub fn marshal_msg(&self, b: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
pub fn unmarshal_msg(buf: &[u8]) -> Result<Self, std::io::Error> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
pub fn msg_size(&self) -> usize {
|
||||
100
|
||||
}
|
||||
|
||||
@@ -1,50 +1,53 @@
|
||||
use crate::client::admin_handler_utils::AdminError;
|
||||
use http::status::StatusCode;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
pub const ERR_TIER_ALREADY_EXISTS: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierAlreadyExists",
|
||||
message: "Specified remote tier already exists",
|
||||
status_code: StatusCode::CONFLICT,
|
||||
};
|
||||
lazy_static! {
|
||||
pub static ref ERR_TIER_ALREADY_EXISTS: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierAlreadyExists".to_string(),
|
||||
message: "Specified remote tier already exists".to_string(),
|
||||
status_code: StatusCode::CONFLICT,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_NOT_FOUND: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierNotFound",
|
||||
message: "Specified remote tier was not found",
|
||||
status_code: StatusCode::NOT_FOUND,
|
||||
};
|
||||
pub static ref ERR_TIER_NOT_FOUND: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierNotFound".to_string(),
|
||||
message: "Specified remote tier was not found".to_string(),
|
||||
status_code: StatusCode::NOT_FOUND,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_NAME_NOT_UPPERCASE: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierNameNotUpperCase",
|
||||
message: "Tier name must be in uppercase",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
pub static ref ERR_TIER_NAME_NOT_UPPERCASE: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierNameNotUpperCase".to_string(),
|
||||
message: "Tier name must be in uppercase".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_BUCKET_NOT_FOUND: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierBucketNotFound",
|
||||
message: "Remote tier bucket not found",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
pub static ref ERR_TIER_BUCKET_NOT_FOUND: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierBucketNotFound".to_string(),
|
||||
message: "Remote tier bucket not found".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_INVALID_CREDENTIALS: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierInvalidCredentials",
|
||||
message: "Invalid remote tier credentials",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
pub static ref ERR_TIER_INVALID_CREDENTIALS: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierInvalidCredentials".to_string(),
|
||||
message: "Invalid remote tier credentials".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_RESERVED_NAME: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierReserved",
|
||||
message: "Cannot use reserved tier name",
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
pub static ref ERR_TIER_RESERVED_NAME: AdminError = AdminError {
|
||||
code: "XRustFSAdminTierReserved".to_string(),
|
||||
message: "Cannot use reserved tier name".to_string(),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_PERM_ERR: AdminError = AdminError {
|
||||
code: "TierPermErr",
|
||||
message: "Tier Perm Err",
|
||||
status_code: StatusCode::OK,
|
||||
};
|
||||
pub static ref ERR_TIER_PERM_ERR: AdminError = AdminError {
|
||||
code: "TierPermErr".to_string(),
|
||||
message: "Tier Perm Err".to_string(),
|
||||
status_code: StatusCode::OK,
|
||||
};
|
||||
|
||||
pub const ERR_TIER_CONNECT_ERR: AdminError = AdminError {
|
||||
code: "TierConnectErr",
|
||||
message: "Tier Connect Err",
|
||||
status_code: StatusCode::OK,
|
||||
};
|
||||
pub static ref ERR_TIER_CONNECT_ERR: AdminError = AdminError {
|
||||
code: "TierConnectErr".to_string(),
|
||||
message: "Tier Connect Err".to_string(),
|
||||
status_code: StatusCode::OK,
|
||||
};
|
||||
}
|
||||
@@ -7,14 +7,14 @@
|
||||
|
||||
use bytes::Bytes;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use http::StatusCode;
|
||||
use crate::client::{
|
||||
admin_handler_utils::AdminError,
|
||||
transition_api::{ReadCloser, ReaderImpl},
|
||||
};
|
||||
use crate::error::is_err_bucket_not_found;
|
||||
use crate::tier::{
|
||||
tier::{ERR_TIER_INVALID_CONFIG, ERR_TIER_TYPE_UNSUPPORTED},
|
||||
tier::ERR_TIER_TYPE_UNSUPPORTED,
|
||||
tier_config::{TierConfig, TierType},
|
||||
tier_handlers::{ERR_TIER_BUCKET_NOT_FOUND, ERR_TIER_PERM_ERR},
|
||||
warm_backend_minio::WarmBackendMinIO,
|
||||
@@ -54,7 +54,7 @@ pub async fn check_warm_backend(w: Option<&WarmBackendImpl>) -> Result<(), Admin
|
||||
.put(PROBE_OBJECT, ReaderImpl::Body(Bytes::from("RustFS".as_bytes().to_vec())), 5)
|
||||
.await;
|
||||
if let Err(err) = remote_version_id {
|
||||
return Err(ERR_TIER_PERM_ERR);
|
||||
return Err(ERR_TIER_PERM_ERR.clone());
|
||||
}
|
||||
|
||||
let r = w.get(PROBE_OBJECT, "", WarmBackendGetOpts::default()).await;
|
||||
@@ -67,11 +67,11 @@ pub async fn check_warm_backend(w: Option<&WarmBackendImpl>) -> Result<(), Admin
|
||||
return Err(ERR_TIER_MISSING_CREDENTIALS);
|
||||
}*/
|
||||
//else {
|
||||
return Err(ERR_TIER_PERM_ERR);
|
||||
return Err(ERR_TIER_PERM_ERR.clone());
|
||||
//}
|
||||
}
|
||||
if let Err(err) = w.remove(PROBE_OBJECT, &remote_version_id.expect("err")).await {
|
||||
return Err(ERR_TIER_PERM_ERR);
|
||||
return Err(ERR_TIER_PERM_ERR.clone());
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@@ -82,8 +82,12 @@ pub async fn new_warm_backend(tier: &TierConfig, probe: bool) -> Result<WarmBack
|
||||
TierType::S3 => {
|
||||
let dd = WarmBackendS3::new(tier.s3.as_ref().expect("err"), &tier.name).await;
|
||||
if let Err(err) = dd {
|
||||
info!("{}", err);
|
||||
return Err(ERR_TIER_INVALID_CONFIG);
|
||||
warn!("{}", err);
|
||||
return Err(AdminError {
|
||||
code: "XRustFSAdminTierInvalidConfig".to_string(),
|
||||
message: format!("Unable to setup remote tier, check tier configuration: {}", err.to_string()),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
});
|
||||
}
|
||||
d = Some(Box::new(dd.expect("err")));
|
||||
}
|
||||
@@ -91,7 +95,11 @@ pub async fn new_warm_backend(tier: &TierConfig, probe: bool) -> Result<WarmBack
|
||||
let dd = WarmBackendRustFS::new(tier.rustfs.as_ref().expect("err"), &tier.name).await;
|
||||
if let Err(err) = dd {
|
||||
warn!("{}", err);
|
||||
return Err(ERR_TIER_INVALID_CONFIG);
|
||||
return Err(AdminError {
|
||||
code: "XRustFSAdminTierInvalidConfig".to_string(),
|
||||
message: format!("Unable to setup remote tier, check tier configuration: {}", err.to_string()),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
});
|
||||
}
|
||||
d = Some(Box::new(dd.expect("err")));
|
||||
}
|
||||
@@ -99,12 +107,16 @@ pub async fn new_warm_backend(tier: &TierConfig, probe: bool) -> Result<WarmBack
|
||||
let dd = WarmBackendMinIO::new(tier.minio.as_ref().expect("err"), &tier.name).await;
|
||||
if let Err(err) = dd {
|
||||
warn!("{}", err);
|
||||
return Err(ERR_TIER_INVALID_CONFIG);
|
||||
return Err(AdminError {
|
||||
code: "XRustFSAdminTierInvalidConfig".to_string(),
|
||||
message: format!("Unable to setup remote tier, check tier configuration: {}", err.to_string()),
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
});
|
||||
}
|
||||
d = Some(Box::new(dd.expect("err")));
|
||||
}
|
||||
_ => {
|
||||
return Err(ERR_TIER_TYPE_UNSUPPORTED);
|
||||
return Err(ERR_TIER_TYPE_UNSUPPORTED.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ use tracing::warn;
|
||||
|
||||
const MAX_MULTIPART_PUT_OBJECT_SIZE: i64 = 1024 * 1024 * 1024 * 1024 * 5;
|
||||
const MAX_PARTS_COUNT: i64 = 10000;
|
||||
const MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5;
|
||||
const _MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5;
|
||||
const MIN_PART_SIZE: i64 = 1024 * 1024 * 128;
|
||||
|
||||
pub struct WarmBackendMinIO(WarmBackendS3);
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::tier::{
|
||||
|
||||
const MAX_MULTIPART_PUT_OBJECT_SIZE: i64 = 1024 * 1024 * 1024 * 1024 * 5;
|
||||
const MAX_PARTS_COUNT: i64 = 10000;
|
||||
const MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5;
|
||||
const _MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5;
|
||||
const MIN_PART_SIZE: i64 = 1024 * 1024 * 128;
|
||||
|
||||
pub struct WarmBackendRustFS(WarmBackendS3);
|
||||
|
||||
@@ -95,15 +95,6 @@ impl WarmBackendS3 {
|
||||
})
|
||||
}
|
||||
|
||||
fn to_object_err(&self, err: ErrorResponse, params: Vec<&str>) -> std::io::Error {
|
||||
let mut object = "";
|
||||
if params.len() >= 1 {
|
||||
object = params.first().cloned().unwrap_or_default();
|
||||
}
|
||||
|
||||
error_resp_to_object_err(err, vec![&self.bucket, &self.get_dest(object)])
|
||||
}
|
||||
|
||||
pub fn get_dest(&self, object: &str) -> String {
|
||||
let mut dest_obj = object.to_string();
|
||||
if self.prefix != "" {
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
[package]
|
||||
name = "reader"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
rust-version.workspace = true
|
||||
version.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
tracing.workspace = true
|
||||
s3s.workspace = true
|
||||
thiserror.workspace = true
|
||||
bytes.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
hex-simd = "0.8.0"
|
||||
base64-simd = "0.8.0"
|
||||
md-5.workspace = true
|
||||
sha2 = { version = "0.11.0-pre.4" }
|
||||
futures.workspace = true
|
||||
async-trait.workspace = true
|
||||
common.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
@@ -1,12 +0,0 @@
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum ReaderError {
|
||||
#[error("stream input error {0}")]
|
||||
StreamInput(String),
|
||||
//
|
||||
#[error("etag: expected ETag {0} does not match computed ETag {1}")]
|
||||
VerifyError(String, String),
|
||||
#[error("Bad checksum: Want {0} does not match calculated {1}")]
|
||||
ChecksumMismatch(String, String),
|
||||
#[error("Bad sha256: Expected {0} does not match calculated {1}")]
|
||||
SHA256Mismatch(String, String),
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
pub mod error;
|
||||
pub mod hasher;
|
||||
pub mod reader;
|
||||
|
||||
pub fn hex(data: impl AsRef<[u8]>) -> String {
|
||||
hex_simd::encode_to_string(data, hex_simd::AsciiCase::Lower)
|
||||
}
|
||||
@@ -1,549 +0,0 @@
|
||||
#![allow(unused_imports)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use s3s::StdError;
|
||||
use std::any::Any;
|
||||
use std::io::Read;
|
||||
use std::{collections::VecDeque, io::Cursor};
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
|
||||
use crate::{
|
||||
error::ReaderError,
|
||||
hasher::{HashType, Uuid},
|
||||
};
|
||||
|
||||
// use futures::stream::Stream;
|
||||
use super::hasher::{Hasher, MD5, Sha256};
|
||||
use futures::Stream;
|
||||
use std::io::{Error, Result};
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
#[derive(Default)]
|
||||
pub struct EtagReader<S> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
md5: HashType,
|
||||
checksum:Option<String>,
|
||||
bytes_read:usize,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> EtagReader<S> {
|
||||
pub fn new(inner: S, etag: Option<String>, force_md5: Option<String>) -> Self {
|
||||
let md5 = {
|
||||
if let Some(m) = force_md5 {
|
||||
HashType::Uuid(Uuid::new(m))
|
||||
} else {
|
||||
HashType::Md5(MD5::new())
|
||||
}
|
||||
};
|
||||
Self {
|
||||
inner,
|
||||
md5,
|
||||
checksum: etag,
|
||||
bytes_read: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn etag(&mut self) -> String {
|
||||
self.md5.sum()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for EtagReader<S>
|
||||
where
|
||||
S: Stream<Item = std::result::Result<Bytes, StdError>>,
|
||||
{
|
||||
type Item = std::result::Result<Bytes, StdError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let poll = this.inner.poll_next(cx);
|
||||
|
||||
if let Poll::Ready(ref res) = poll {
|
||||
match res {
|
||||
Some(Ok(bytes)) => {
|
||||
*this.bytes_read += bytes.len();
|
||||
this.md5.write(bytes);
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string())))));
|
||||
}
|
||||
None => {
|
||||
if let Some(etag) = this.checksum {
|
||||
let got = this.md5.sum();
|
||||
if got.as_str() != etag.as_str() {
|
||||
return Poll::Ready(Some(Err(Box::new(ReaderError::VerifyError(etag.to_owned(), got)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
poll
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
#[derive(Default)]
|
||||
pub struct HashReader<S> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
sha256: Option<Sha256>,
|
||||
md5: Option<MD5>,
|
||||
md5_hex:Option<String>,
|
||||
sha256_hex:Option<String>,
|
||||
size:usize,
|
||||
actual_size: usize,
|
||||
bytes_read:usize,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> HashReader<S> {
|
||||
pub fn new(inner: S, size: usize, md5_hex: Option<String>, sha256_hex: Option<String>, actual_size: usize) -> Self {
|
||||
let md5 = { if md5_hex.is_some() { Some(MD5::new()) } else { None } };
|
||||
let sha256 = { if sha256_hex.is_some() { Some(Sha256::new()) } else { None } };
|
||||
Self {
|
||||
inner,
|
||||
size,
|
||||
actual_size,
|
||||
md5_hex,
|
||||
sha256_hex,
|
||||
bytes_read: 0,
|
||||
md5,
|
||||
sha256,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for HashReader<S>
|
||||
where
|
||||
S: Stream<Item = std::result::Result<Bytes, StdError>>,
|
||||
{
|
||||
type Item = std::result::Result<Bytes, StdError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let poll = this.inner.poll_next(cx);
|
||||
|
||||
if let Poll::Ready(ref res) = poll {
|
||||
match res {
|
||||
Some(Ok(bytes)) => {
|
||||
*this.bytes_read += bytes.len();
|
||||
if let Some(sha) = this.sha256 {
|
||||
sha.write(bytes);
|
||||
}
|
||||
|
||||
if let Some(md5) = this.md5 {
|
||||
md5.write(bytes);
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string())))));
|
||||
}
|
||||
None => {
|
||||
if let Some(hash) = this.sha256 {
|
||||
if let Some(hex) = this.sha256_hex {
|
||||
let got = hash.sum();
|
||||
let src = hex.as_str();
|
||||
if src != got.as_str() {
|
||||
println!("sha256 err src:{src},got:{got}");
|
||||
return Poll::Ready(Some(Err(Box::new(ReaderError::SHA256Mismatch(src.to_string(), got)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(hash) = this.md5 {
|
||||
if let Some(hex) = this.md5_hex {
|
||||
let got = hash.sum();
|
||||
let src = hex.as_str();
|
||||
if src != got.as_str() {
|
||||
// TODO: ERR
|
||||
println!("md5 err src:{src},got:{got}");
|
||||
return Poll::Ready(Some(Err(Box::new(ReaderError::ChecksumMismatch(src.to_string(), got)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// println!("poll {:?}", poll);
|
||||
|
||||
poll
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Reader {
|
||||
async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize>;
|
||||
async fn seek(&mut self, offset: usize) -> Result<()>;
|
||||
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize>;
|
||||
async fn read_all(&mut self) -> Result<Vec<u8>> {
|
||||
let data = Vec::new();
|
||||
|
||||
Ok(data)
|
||||
}
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BufferReader {
|
||||
pub inner: Cursor<Vec<u8>>,
|
||||
pos: usize,
|
||||
}
|
||||
|
||||
impl BufferReader {
|
||||
pub fn new(inner: Vec<u8>) -> Self {
|
||||
Self {
|
||||
inner: Cursor::new(inner),
|
||||
pos: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Reader for BufferReader {
|
||||
#[tracing::instrument(level = "debug", skip(self, buf))]
|
||||
async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result<usize> {
|
||||
self.seek(offset).await?;
|
||||
self.read_exact(buf).await
|
||||
}
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn seek(&mut self, offset: usize) -> Result<()> {
|
||||
if self.pos != offset {
|
||||
self.inner.set_position(offset as u64);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
let _bytes_read = self.inner.read_exact(buf)?;
|
||||
self.pos += buf.len();
|
||||
//Ok(bytes_read)
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn read_all(&mut self) -> Result<Vec<u8>> {
|
||||
let mut data = Vec::new();
|
||||
self.inner.read_to_end(&mut data)?;
|
||||
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
pub struct ChunkedStream<S> {
|
||||
#[pin]
|
||||
inner: S,
|
||||
chuck_size: usize,
|
||||
streams: VecDeque<Bytes>,
|
||||
remaining:Vec<u8>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ChunkedStream<S> {
|
||||
pub fn new(inner: S, chuck_size: usize) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
chuck_size,
|
||||
streams: VecDeque::new(),
|
||||
remaining: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for ChunkedStream<S>
|
||||
where
|
||||
S: Stream<Item = std::result::Result<Bytes, StdError>> + Send + Sync,
|
||||
// E: std::error::Error + Send + Sync,
|
||||
{
|
||||
type Item = std::result::Result<Bytes, StdError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let (items, op_items) = self.inner.size_hint();
|
||||
let this = self.project();
|
||||
|
||||
if let Some(b) = this.streams.pop_front() {
|
||||
return Poll::Ready(Some(Ok(b)));
|
||||
}
|
||||
|
||||
let poll = this.inner.poll_next(cx);
|
||||
|
||||
match poll {
|
||||
Poll::Ready(res_op) => match res_op {
|
||||
Some(res) => match res {
|
||||
Ok(bytes) => {
|
||||
let chuck_size = *this.chuck_size;
|
||||
let mut bytes = bytes;
|
||||
|
||||
// println!("get len {}", bytes.len());
|
||||
// 如果有剩余
|
||||
if !this.remaining.is_empty() {
|
||||
let need_size = chuck_size - this.remaining.len();
|
||||
// 传入的数据大小需要补齐的大小,使用传入数据补齐
|
||||
if bytes.len() >= need_size {
|
||||
let add_bytes = bytes.split_to(need_size);
|
||||
this.remaining.extend_from_slice(&add_bytes);
|
||||
this.streams.push_back(Bytes::from(this.remaining.clone()));
|
||||
this.remaining.clear();
|
||||
} else {
|
||||
// 不够,直接追加
|
||||
let need_size = bytes.len();
|
||||
let add_bytes = bytes.split_to(need_size);
|
||||
this.remaining.extend_from_slice(&add_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if bytes.len() < chuck_size {
|
||||
break;
|
||||
}
|
||||
let chuck = bytes.split_to(chuck_size);
|
||||
this.streams.push_back(chuck);
|
||||
}
|
||||
|
||||
if !bytes.is_empty() {
|
||||
this.remaining.extend_from_slice(&bytes);
|
||||
}
|
||||
|
||||
if let Some(b) = this.streams.pop_front() {
|
||||
return Poll::Ready(Some(Ok(b)));
|
||||
}
|
||||
|
||||
if items > 0 || op_items.is_some() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
if !this.remaining.is_empty() {
|
||||
let b = this.remaining.clone();
|
||||
this.remaining.clear();
|
||||
return Poll::Ready(Some(Ok(Bytes::from(b))));
|
||||
}
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Err(err) => Poll::Ready(Some(Err(err))),
|
||||
},
|
||||
None => {
|
||||
// println!("get empty");
|
||||
if let Some(b) = this.streams.pop_front() {
|
||||
return Poll::Ready(Some(Ok(b)));
|
||||
}
|
||||
if !this.remaining.is_empty() {
|
||||
let b = this.remaining.clone();
|
||||
this.remaining.clear();
|
||||
return Poll::Ready(Some(Ok(Bytes::from(b))));
|
||||
}
|
||||
Poll::Ready(None)
|
||||
}
|
||||
},
|
||||
Poll::Pending => {
|
||||
// println!("get Pending");
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
// if let Poll::Ready(Some(res)) = poll {
|
||||
// warn!("poll res ...");
|
||||
// match res {
|
||||
// Ok(bytes) => {
|
||||
// let chuck_size = *this.chuck_size;
|
||||
// let mut bytes = bytes;
|
||||
// if this.remaining.len() > 0 {
|
||||
// let need_size = chuck_size - this.remaining.len();
|
||||
// let add_bytes = bytes.split_to(need_size);
|
||||
// this.remaining.extend_from_slice(&add_bytes);
|
||||
// warn!("poll push_back remaining ...1");
|
||||
// this.streams.push_back(Bytes::from(this.remaining.clone()));
|
||||
// this.remaining.clear();
|
||||
// }
|
||||
|
||||
// loop {
|
||||
// if bytes.len() < chuck_size {
|
||||
// break;
|
||||
// }
|
||||
// let chuck = bytes.split_to(chuck_size);
|
||||
// warn!("poll push_back ...1");
|
||||
// this.streams.push_back(chuck);
|
||||
// }
|
||||
|
||||
// warn!("poll remaining extend_from_slice...1");
|
||||
// this.remaining.extend_from_slice(&bytes);
|
||||
// }
|
||||
// Err(err) => return Poll::Ready(Some(Err(err))),
|
||||
// }
|
||||
// }
|
||||
|
||||
// if let Some(b) = this.streams.pop_front() {
|
||||
// warn!("poll pop_front ...");
|
||||
// return Poll::Ready(Some(Ok(b)));
|
||||
// }
|
||||
|
||||
// if this.remaining.len() > 0 {
|
||||
// let b = this.remaining.clone();
|
||||
// this.remaining.clear();
|
||||
|
||||
// warn!("poll remaining ...1");
|
||||
// return Poll::Ready(Some(Ok(Bytes::from(b))));
|
||||
// }
|
||||
// Poll::Pending
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let mut items = self.streams.len();
|
||||
if !self.remaining.is_empty() {
|
||||
items += 1;
|
||||
}
|
||||
(items, Some(items))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use super::*;
|
||||
use futures::StreamExt;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_etag_reader() {
|
||||
let data1 = vec![1u8; 60]; // 65536
|
||||
let data2 = vec![0u8; 32]; // 65536
|
||||
let chunk1 = Bytes::from(data1);
|
||||
let chunk2 = Bytes::from(data2);
|
||||
|
||||
let chunk_results: Vec<std::result::Result<Bytes, StdError>> = vec![Ok(chunk1), Ok(chunk2)];
|
||||
|
||||
let mut stream = futures::stream::iter(chunk_results);
|
||||
|
||||
let mut hash_reader = EtagReader::new(&mut stream, None, None);
|
||||
|
||||
// let chunk_size = 8;
|
||||
|
||||
// let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size);
|
||||
|
||||
loop {
|
||||
match hash_reader.next().await {
|
||||
Some(res) => match res {
|
||||
Ok(bytes) => {
|
||||
println!("bytes: {}, {:?}", bytes.len(), bytes);
|
||||
}
|
||||
Err(err) => {
|
||||
println!("err:{err:?}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
println!("next none");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("etag:{}", hash_reader.etag());
|
||||
|
||||
// 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45
|
||||
// println!("md5: {:?}", hash_reader.hex());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hash_reader() {
|
||||
let data1 = vec![1u8; 60]; // 65536
|
||||
let data2 = vec![0u8; 32]; // 65536
|
||||
let size = data1.len() + data2.len();
|
||||
let chunk1 = Bytes::from(data1);
|
||||
let chunk2 = Bytes::from(data2);
|
||||
|
||||
let chunk_results: Vec<std::result::Result<Bytes, StdError>> = vec![Ok(chunk1), Ok(chunk2)];
|
||||
|
||||
let mut stream = futures::stream::iter(chunk_results);
|
||||
|
||||
let mut hash_reader = HashReader::new(
|
||||
&mut stream,
|
||||
size,
|
||||
Some("d94c485610a7a00a574df55e45d3cc0c".to_string()),
|
||||
Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()),
|
||||
0,
|
||||
);
|
||||
|
||||
// let chunk_size = 8;
|
||||
|
||||
// let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size);
|
||||
|
||||
loop {
|
||||
match hash_reader.next().await {
|
||||
Some(res) => match res {
|
||||
Ok(bytes) => {
|
||||
println!("bytes: {}, {:?}", bytes.len(), bytes);
|
||||
}
|
||||
Err(err) => {
|
||||
println!("err:{err:?}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
println!("next none");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BUG: borrow of moved value: `md5_stream`
|
||||
|
||||
// 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45
|
||||
// println!("md5: {:?}", hash_reader.hex());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunked_stream() {
|
||||
let data1 = vec![1u8; 60]; // 65536
|
||||
let data2 = vec![0u8; 33]; // 65536
|
||||
let data3 = vec![4u8; 5]; // 65536
|
||||
let chunk1 = Bytes::from(data1);
|
||||
let chunk2 = Bytes::from(data2);
|
||||
let chunk3 = Bytes::from(data3);
|
||||
|
||||
let chunk_results: Vec<std::result::Result<Bytes, StdError>> = vec![Ok(chunk1), Ok(chunk2), Ok(chunk3)];
|
||||
|
||||
let mut stream = futures::stream::iter(chunk_results);
|
||||
// let mut hash_reader = HashReader::new(
|
||||
// &mut stream,
|
||||
// size,
|
||||
// Some("d94c485610a7a00a574df55e45d3cc0c".to_string()),
|
||||
// Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()),
|
||||
// 0,
|
||||
// );
|
||||
|
||||
let chunk_size = 8;
|
||||
|
||||
let mut etag_reader = EtagReader::new(&mut stream, None, None);
|
||||
|
||||
let mut chunked_stream = ChunkedStream::new(&mut etag_reader, chunk_size);
|
||||
|
||||
loop {
|
||||
match chunked_stream.next().await {
|
||||
Some(res) => match res {
|
||||
Ok(bytes) => {
|
||||
println!("bytes: {}, {:?}", bytes.len(), bytes);
|
||||
}
|
||||
Err(err) => {
|
||||
println!("err:{err:?}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
println!("next none");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("etag:{}", etag_reader.etag());
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
#![allow(unused_variables, unused_mut, unused_must_use)]
|
||||
|
||||
use time::OffsetDateTime;
|
||||
use http::{HeaderMap, StatusCode};
|
||||
//use iam::get_global_action_cred;
|
||||
use matchit::Params;
|
||||
@@ -55,12 +56,11 @@ pub struct AddTierQuery {
|
||||
pub struct AddTier {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for AddTier {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: AddTierQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
AddTierQuery::default()
|
||||
@@ -119,39 +119,38 @@ impl Operation for AddTier {
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
match tier_config_mgr.add(args, force).await {
|
||||
Err(ERR_TIER_ALREADY_EXISTS) => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameAlreadyExist".into()),
|
||||
"tier name already exists!",
|
||||
));
|
||||
}
|
||||
Err(ERR_TIER_NAME_NOT_UPPERCASE) => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameNotUppercase".into()),
|
||||
"tier name not uppercase!",
|
||||
));
|
||||
}
|
||||
Err(ERR_TIER_BACKEND_IN_USE) => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameBackendInUse!".into()),
|
||||
"tier name backend in use!",
|
||||
));
|
||||
}
|
||||
Err(ERR_TIER_CONNECT_ERR) => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierConnectError".into()),
|
||||
"tier connect error!",
|
||||
));
|
||||
}
|
||||
Err(ERR_TIER_INVALID_CREDENTIALS) => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom(ERR_TIER_INVALID_CREDENTIALS.code.into()),
|
||||
ERR_TIER_INVALID_CREDENTIALS.message,
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("tier_config_mgr add failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierAddFailed".into()), "tier add failed"));
|
||||
Err(err) => {
|
||||
if err.code == ERR_TIER_ALREADY_EXISTS.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameAlreadyExist".into()),
|
||||
"tier name already exists!",
|
||||
));
|
||||
} else if err.code == ERR_TIER_NAME_NOT_UPPERCASE.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameNotUppercase".into()),
|
||||
"tier name not uppercase!",
|
||||
));
|
||||
} else if err.code == ERR_TIER_BACKEND_IN_USE.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameBackendInUse!".into()),
|
||||
"tier name backend in use!",
|
||||
));
|
||||
} else if err.code == ERR_TIER_CONNECT_ERR.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierConnectError".into()),
|
||||
"tier connect error!",
|
||||
));
|
||||
} else if err.code == ERR_TIER_INVALID_CREDENTIALS.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom(err.code.clone().into()),
|
||||
err.message.clone(),
|
||||
));
|
||||
} else {
|
||||
warn!("tier_config_mgr add failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierAddFailed".into()), format!("tier add failed. {}", err.to_string())
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(_) => (),
|
||||
}
|
||||
@@ -159,7 +158,6 @@ impl Operation for AddTier {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierAddFailed".into()), "tier save failed"));
|
||||
}
|
||||
//globalNotificationSys.LoadTransitionTierConfig(ctx);
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
@@ -175,7 +173,7 @@ impl Operation for EditTier {
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: AddTierQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?;
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
AddTierQuery::default()
|
||||
@@ -189,9 +187,6 @@ impl Operation for EditTier {
|
||||
let (cred, _owner) =
|
||||
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
//{"accesskey":"gggggg","secretkey":"jjjjjjj"}
|
||||
//{"detailedMessage":"failed to perform PUT: The Access Key Id you provided does not exist in our records.","message":"an error occurred, please try again"}
|
||||
//200 OK
|
||||
let mut input = req.input;
|
||||
let body = match input.store_all_unlimited().await {
|
||||
Ok(b) => b,
|
||||
@@ -211,26 +206,31 @@ impl Operation for EditTier {
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
match tier_config_mgr.edit(&tier_name, creds).await {
|
||||
Err(ERR_TIER_NOT_FOUND) => {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found!"));
|
||||
}
|
||||
Err(ERR_TIER_MISSING_CREDENTIALS) => {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierMissingCredentials".into()),
|
||||
"tier missing credentials!",
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("tier_config_mgr edit failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierEditFailed".into()), "tier edit failed"));
|
||||
Err(err) => {
|
||||
if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNotFound".into()), "tier not found!"
|
||||
));
|
||||
} else if err.code == ERR_TIER_MISSING_CREDENTIALS.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierMissingCredentials".into()),
|
||||
"tier missing credentials!",
|
||||
));
|
||||
} else {
|
||||
warn!("tier_config_mgr edit failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierEditFailed".into()), format!("tier edit failed. {}", err.to_string())
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(_) => (),
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierEditFailed".into()), "tier save failed"));
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierEditFailed".into()), "tier save failed"
|
||||
));
|
||||
}
|
||||
//globalNotificationSys.LoadTransitionTierConfig(ctx);
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
@@ -252,14 +252,13 @@ impl Operation for ListTiers {
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: BucketQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?;
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
BucketQuery::default()
|
||||
}
|
||||
};
|
||||
|
||||
//{"items":[{"minio":{"accesskey":"minioadmin","bucket":"mblock2","endpoint":"http://192.168.1.11:9020","name":"COLDTIER","objects":"0","prefix":"mypre/","secretkey":"REDACTED","usage":"0 B","versions":"0"},"status":true,"type":"minio"}]}
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.read().await;
|
||||
let tiers = tier_config_mgr.list_tiers();
|
||||
|
||||
@@ -277,12 +276,10 @@ pub struct RemoveTier {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for RemoveTier {
|
||||
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle RemoveTier");
|
||||
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: AddTierQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?;
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
AddTierQuery::default()
|
||||
@@ -313,26 +310,31 @@ impl Operation for RemoveTier {
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
match tier_config_mgr.remove(&tier_name, force).await {
|
||||
Err(ERR_TIER_NOT_FOUND) => {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found."));
|
||||
}
|
||||
Err(ERR_TIER_BACKEND_NOT_EMPTY) => {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used."));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("tier_config_mgr remove failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierRemoveFailed".into()),
|
||||
"tier remove failed",
|
||||
));
|
||||
Err(err) => {
|
||||
if err.code == ERR_TIER_NOT_FOUND.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNotFound".into()), "tier not found."
|
||||
));
|
||||
} else if err.code == ERR_TIER_BACKEND_NOT_EMPTY.code {
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used."
|
||||
));
|
||||
} else {
|
||||
warn!("tier_config_mgr remove failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierRemoveFailed".into()),
|
||||
format!("tier remove failed. {}", err.to_string()),
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(_) => (),
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("TierRemoveFailed".into()), "tier save failed"));
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierRemoveFailed".into()), "tier save failed"
|
||||
));
|
||||
}
|
||||
//globalNotificationSys.LoadTransitionTierConfig(ctx);
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
@@ -345,12 +347,10 @@ pub struct VerifyTier {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for VerifyTier {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle RemoveTier");
|
||||
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: AddTierQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?;
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
AddTierQuery::default()
|
||||
@@ -381,19 +381,17 @@ pub struct GetTierInfo {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for GetTierInfo {
|
||||
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("handle GetTierInfo");
|
||||
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: AddTierQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?;
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
AddTierQuery::default()
|
||||
}
|
||||
};
|
||||
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.read().await;
|
||||
let tier_config_mgr = GLOBAL_TierConfigMgr.read().await;
|
||||
let info = tier_config_mgr.get(&query.tier.unwrap());
|
||||
|
||||
let data = serde_json::to_vec(&info)
|
||||
@@ -405,3 +403,71 @@ impl Operation for GetTierInfo {
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
pub struct ClearTierQuery {
|
||||
pub rand: Option<String>,
|
||||
pub force: String,
|
||||
}
|
||||
|
||||
pub struct ClearTier {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ClearTier {
|
||||
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
let query = {
|
||||
if let Some(query) = req.uri.query() {
|
||||
let input: ClearTierQuery =
|
||||
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
|
||||
input
|
||||
} else {
|
||||
ClearTierQuery::default()
|
||||
}
|
||||
};
|
||||
|
||||
//let Some(input_cred) = req.credentials else {
|
||||
// return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
//};
|
||||
|
||||
//let (cred, _owner) =
|
||||
// check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
|
||||
|
||||
let mut force: bool = false;
|
||||
let force_str = query.force;
|
||||
if force_str != "" {
|
||||
force = force_str.parse().unwrap();
|
||||
}
|
||||
|
||||
let t = OffsetDateTime::now_utc();
|
||||
let mut rand = "AGD1R25GI3I1GJGUGJFD7FBS4DFAASDF".to_string();
|
||||
rand.insert_str(3, &t.day().to_string());
|
||||
rand.insert_str(17, &t.month().to_string());
|
||||
rand.insert_str(23, &t.year().to_string());
|
||||
warn!("tier_config_mgr rand: {}", rand);
|
||||
if query.rand != Some(rand) {
|
||||
return Err(s3_error!(InvalidRequest, "get rand failed"));
|
||||
};
|
||||
|
||||
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
|
||||
//tier_config_mgr.reload(api);
|
||||
match tier_config_mgr.clear_tier(force).await {
|
||||
Err(err) => {
|
||||
warn!("tier_config_mgr clear failed, e: {:?}", err);
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierClearFailed".into()), format!("tier clear failed. {}", err.to_string())
|
||||
));
|
||||
}
|
||||
Ok(_) => (),
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::Custom("TierEditFailed".into()), "tier save failed"
|
||||
));
|
||||
}
|
||||
|
||||
let mut header = HeaderMap::new();
|
||||
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
|
||||
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,6 +343,11 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(),
|
||||
AdminOperation(&tier::EditTier {}),
|
||||
)?;
|
||||
r.insert(
|
||||
Method::POST,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/tier/clear").as_str(),
|
||||
AdminOperation(&tier::ClearTier {}),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user