From 4ed84a6bc4c41a69a83689944f493222949cd1cb Mon Sep 17 00:00:00 2001 From: likewu Date: Sat, 28 Jun 2025 11:02:37 +0800 Subject: [PATCH] separate signer. fix ilm feature. --- Cargo.toml | 4 +- crates/signer/Cargo.toml | 25 + crates/signer/src/constants.rs | 10 + .../signer/mod.rs => crates/signer/src/lib.rs | 2 +- .../src}/request_signature_streaming.rs | 67 ++- ...st_signature_streaming_unsigned_trailer.rs | 23 + .../signer/src}/request_signature_v2.rs | 30 +- .../signer/src}/request_signature_v4.rs | 46 +- .../src/signer => crates/signer/src}/utils.rs | 2 +- {reader => crates/utils}/src/hasher.rs | 4 +- crates/utils/src/lib.rs | 2 + ecstore/Cargo.toml | 2 +- .../bucket/lifecycle/bucket_lifecycle_ops.rs | 41 +- ecstore/src/bucket/lifecycle/lifecycle.rs | 10 +- ecstore/src/bucket/lifecycle/rule.rs | 6 +- .../bucket/lifecycle/tier_last_day_stats.rs | 1 + ecstore/src/bucket/lifecycle/tier_sweeper.rs | 3 + ecstore/src/bucket/object_lock/objectlock.rs | 24 +- .../src/bucket/object_lock/objectlock_sys.rs | 2 +- ecstore/src/checksum.rs | 3 +- ecstore/src/client/admin_handler_utils.rs | 18 +- ecstore/src/client/api_get_object.rs | 2 + ecstore/src/client/api_get_options.rs | 3 +- ecstore/src/client/api_list.rs | 1 + ecstore/src/client/api_put_object.rs | 3 +- ecstore/src/client/api_put_object_common.rs | 2 - .../src/client/api_put_object_multipart.rs | 2 +- .../src/client/api_put_object_streaming.rs | 2 +- ecstore/src/client/api_remove.rs | 7 +- ecstore/src/client/api_s3_datatypes.rs | 7 +- ecstore/src/client/bucket_cache.rs | 7 +- ecstore/src/client/constants.rs | 3 +- ecstore/src/client/credentials.rs | 22 - ecstore/src/client/object_api_utils.rs | 6 +- ecstore/src/client/transition_api.rs | 32 +- ecstore/src/error.rs | 6 +- ecstore/src/heal/data_usage_cache.rs | 1 + ecstore/src/lib.rs | 1 - ecstore/src/signer/ordered_qs.rs | 109 ---- ...st_signature_streaming_unsigned_trailer.rs | 17 - ecstore/src/tier/tier.rs | 120 ++-- ecstore/src/tier/tier_config.rs | 42 +- ecstore/src/tier/tier_gen.rs | 24 +- ecstore/src/tier/tier_handlers.rs | 83 +-- ecstore/src/tier/warm_backend.rs | 32 +- ecstore/src/tier/warm_backend_minio.rs | 2 +- ecstore/src/tier/warm_backend_rustfs.rs | 2 +- ecstore/src/tier/warm_backend_s3.rs | 9 - reader/Cargo.toml | 27 - reader/src/error.rs | 12 - reader/src/lib.rs | 7 - reader/src/reader.rs | 549 ------------------ rustfs/src/admin/handlers/tier.rs | 226 ++++--- rustfs/src/admin/mod.rs | 5 + 54 files changed, 498 insertions(+), 1200 deletions(-) create mode 100644 crates/signer/Cargo.toml create mode 100644 crates/signer/src/constants.rs rename ecstore/src/signer/mod.rs => crates/signer/src/lib.rs (95%) rename {ecstore/src/signer => crates/signer/src}/request_signature_streaming.rs (52%) create mode 100644 crates/signer/src/request_signature_streaming_unsigned_trailer.rs rename {ecstore/src/signer => crates/signer/src}/request_signature_v2.rs (90%) rename {ecstore/src/signer => crates/signer/src}/request_signature_v4.rs (95%) rename {ecstore/src/signer => crates/signer/src}/utils.rs (93%) rename {reader => crates/utils}/src/hasher.rs (97%) delete mode 100644 ecstore/src/signer/ordered_qs.rs delete mode 100644 ecstore/src/signer/request_signature_streaming_unsigned_trailer.rs delete mode 100644 reader/Cargo.toml delete mode 100644 reader/src/error.rs delete mode 100644 reader/src/lib.rs delete mode 100644 reader/src/reader.rs diff --git a/Cargo.toml b/Cargo.toml index 36ed5d1a..bb4fb3d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/rio", # Rust I/O utilities and abstractions "crates/utils", # Utility functions and helpers "crates/zip", # ZIP file handling and compression + "crates/signer", # client signer "crypto", # Cryptography and security features "ecstore", # Erasure coding storage implementation "e2e_test", # End-to-end test suite @@ -21,7 +22,6 @@ members = [ "rustfs", # Core file system implementation "s3select/api", # S3 Select API interface "s3select/query", # S3 Select query engine - "reader", ] resolver = "2" @@ -58,8 +58,8 @@ rustfs-notify = { path = "crates/notify", version = "0.0.1" } rustfs-utils = { path = "crates/utils", version = "0.0.1" } rustfs-rio = { path = "crates/rio", version = "0.0.1" } rustfs-filemeta = { path = "crates/filemeta", version = "0.0.1" } +rustfs-signer = { path = "crates/signer", version = "0.0.1" } workers = { path = "./common/workers", version = "0.0.1" } -reader = { path = "./reader", version = "0.0.1" } aes-gcm = { version = "0.10.3", features = ["std"] } arc-swap = "1.7.1" argon2 = { version = "0.5.3", features = ["std"] } diff --git a/crates/signer/Cargo.toml b/crates/signer/Cargo.toml new file mode 100644 index 00000000..c7a0f0aa --- /dev/null +++ b/crates/signer/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "rustfs-signer" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +tracing.workspace = true +lazy_static.workspace = true +bytes = { workspace = true } +http.workspace = true +time.workspace = true +hyper.workspace = true +serde.workspace = true +serde_urlencoded.workspace = true +rustfs-utils = {workspace = true, features=["full"]} + +[dev-dependencies] +tempfile = { workspace = true } +rand = { workspace = true } + +[lints] +workspace = true diff --git a/crates/signer/src/constants.rs b/crates/signer/src/constants.rs new file mode 100644 index 00000000..54229fef --- /dev/null +++ b/crates/signer/src/constants.rs @@ -0,0 +1,10 @@ +use time::{format_description::FormatItem, macros::format_description}; + +pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD"; +pub const UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"; + +pub const TOTAL_WORKERS: i64 = 4; + +pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; +pub const ISO8601_DATEFORMAT: &[FormatItem<'_>] = + format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z"); diff --git a/ecstore/src/signer/mod.rs b/crates/signer/src/lib.rs similarity index 95% rename from ecstore/src/signer/mod.rs rename to crates/signer/src/lib.rs index 7d87ba04..a8286515 100644 --- a/ecstore/src/signer/mod.rs +++ b/crates/signer/src/lib.rs @@ -1,4 +1,4 @@ -pub mod ordered_qs; +pub mod constants; pub mod request_signature_streaming; pub mod request_signature_streaming_unsigned_trailer; pub mod request_signature_v2; diff --git a/ecstore/src/signer/request_signature_streaming.rs b/crates/signer/src/request_signature_streaming.rs similarity index 52% rename from ecstore/src/signer/request_signature_streaming.rs rename to crates/signer/src/request_signature_streaming.rs index 87c4d111..08320f6f 100644 --- a/ecstore/src/signer/request_signature_streaming.rs +++ b/crates/signer/src/request_signature_streaming.rs @@ -1,31 +1,21 @@ -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - -use http::request::{self, Request}; +use http::{request, HeaderMap, HeaderValue}; use lazy_static::lazy_static; use std::collections::HashMap; use time::{OffsetDateTime, macros::format_description}; +use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH; use super::request_signature_v4::{SERVICE_TYPE_S3, get_scope, get_signature, get_signing_key}; -use rustfs_utils::{ - crypto::{hex, hex_sha256, hex_sha256_chunk, hmac_sha256}, - hash::EMPTY_STRING_SHA256_HASH, -}; const STREAMING_SIGN_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; const STREAMING_SIGN_TRAILER_ALGORITHM: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER"; const STREAMING_PAYLOAD_HDR: &str = "AWS4-HMAC-SHA256-PAYLOAD"; -const STREAMING_TRAILER_HDR: &str = "AWS4-HMAC-SHA256-TRAILER"; -const PAYLOAD_CHUNK_SIZE: i64 = 64 * 1024; -const CHUNK_SIGCONST_LEN: i64 = 17; -const SIGNATURESTR_LEN: i64 = 64; -const CRLF_LEN: i64 = 2; -const TRAILER_KV_SEPARATOR: &str = ":"; -const TRAILER_SIGNATURE: &str = "x-amz-trailer-signature"; +const _STREAMING_TRAILER_HDR: &str = "AWS4-HMAC-SHA256-TRAILER"; +const _PAYLOAD_CHUNK_SIZE: i64 = 64 * 1024; +const _CHUNK_SIGCONST_LEN: i64 = 17; +const _SIGNATURESTR_LEN: i64 = 64; +const _CRLF_LEN: i64 = 2; +const _TRAILER_KV_SEPARATOR: &str = ":"; +const _TRAILER_SIGNATURE: &str = "x-amz-trailer-signature"; lazy_static! { static ref ignored_streaming_headers: HashMap = { @@ -37,6 +27,7 @@ lazy_static! { }; } +#[allow(dead_code)] fn build_chunk_string_to_sign(t: OffsetDateTime, region: &str, previous_sig: &str, chunk_check_sum: &str) -> String { let mut string_to_sign_parts = >::new(); string_to_sign_parts.push(STREAMING_PAYLOAD_HDR.to_string()); @@ -49,7 +40,7 @@ fn build_chunk_string_to_sign(t: OffsetDateTime, region: &str, previous_sig: &st string_to_sign_parts.join("\n") } -fn build_chunk_signature( +fn _build_chunk_signature( chunk_check_sum: &str, req_time: OffsetDateTime, region: &str, @@ -62,13 +53,37 @@ fn build_chunk_signature( } pub fn streaming_sign_v4( - req: request::Builder, - access_key_id: &str, - secret_access_key: &str, + mut req: request::Builder, + _access_key_id: &str, + _secret_access_key: &str, session_token: &str, - region: &str, + _region: &str, data_len: i64, - req_time: OffsetDateTime, /*, sh256: md5simd.Hasher*/ + req_time: OffsetDateTime, /*, sh256: md5simd::Hasher*/ + trailer: HeaderMap, ) -> request::Builder { - todo!(); + let headers = req.headers_mut().expect("err"); + + if trailer.is_empty() { + headers.append("X-Amz-Content-Sha256", HeaderValue::from_str(STREAMING_SIGN_ALGORITHM).expect("err")); + } else { + headers.append("X-Amz-Content-Sha256", HeaderValue::from_str(STREAMING_SIGN_TRAILER_ALGORITHM).expect("err")); + for (k, _) in &trailer { + headers.append("X-Amz-Trailer", k.as_str().to_lowercase().parse().unwrap()); + } + let chunked_value = HeaderValue::from_str(&vec!["aws-chunked"].join(",")).expect("err"); + headers.insert(http::header::TRANSFER_ENCODING, chunked_value); + } + + if !session_token.is_empty() { + headers.insert("X-Amz-Security-Token", HeaderValue::from_str(&session_token).expect("err")); + } + + let format = format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z"); + headers.insert("X-Amz-Date", HeaderValue::from_str(&req_time.format(&format).unwrap()).expect("err")); + + //req.content_length = 100; + headers.insert("x-amz-decoded-content-length", format!("{:010}", data_len).parse().unwrap()); + + req } diff --git a/crates/signer/src/request_signature_streaming_unsigned_trailer.rs b/crates/signer/src/request_signature_streaming_unsigned_trailer.rs new file mode 100644 index 00000000..1c096eaa --- /dev/null +++ b/crates/signer/src/request_signature_streaming_unsigned_trailer.rs @@ -0,0 +1,23 @@ +use http::{request, HeaderValue}; +use time::{OffsetDateTime, macros::format_description}; + +pub fn streaming_unsigned_v4( + mut req: request::Builder, + session_token: &str, + _data_len: i64, + req_time: OffsetDateTime, +) -> request::Builder { + let headers = req.headers_mut().expect("err"); + + let chunked_value = HeaderValue::from_str(&vec!["aws-chunked"].join(",")).expect("err"); + headers.insert(http::header::TRANSFER_ENCODING, chunked_value); + if !session_token.is_empty() { + headers.insert("X-Amz-Security-Token", HeaderValue::from_str(&session_token).expect("err")); + } + + let format = format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]Z"); + headers.insert("X-Amz-Date", HeaderValue::from_str(&req_time.format(&format).unwrap()).expect("err")); + //req.content_length = 100; + + req +} diff --git a/ecstore/src/signer/request_signature_v2.rs b/crates/signer/src/request_signature_v2.rs similarity index 90% rename from ecstore/src/signer/request_signature_v2.rs rename to crates/signer/src/request_signature_v2.rs index 6387a5de..6cd59349 100644 --- a/ecstore/src/signer/request_signature_v2.rs +++ b/crates/signer/src/request_signature_v2.rs @@ -1,26 +1,18 @@ -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use http::request; use hyper::Uri; use std::collections::HashMap; use std::fmt::Write; -use time::{OffsetDateTime, format_description, macros::format_description}; +use time::{OffsetDateTime, format_description}; use rustfs_utils::crypto::{base64_encode, hex, hmac_sha1}; - use super::utils::get_host_addr; -const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; +const _SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; const SIGN_V2_ALGORITHM: &str = "AWS"; -fn encode_url2path(req: &request::Builder, virtual_host: bool) -> String { - let mut path = "".to_string(); +fn encode_url2path(req: &request::Builder, _virtual_host: bool) -> String { + let path; //path = serde_urlencoded::to_string(req.uri_ref().unwrap().path().unwrap()).unwrap(); path = req.uri_ref().unwrap().path().to_string(); @@ -42,7 +34,7 @@ pub fn pre_sign_v2( let d = d.replace_time(time::Time::from_hms(0, 0, 0).unwrap()); let epoch_expires = d.unix_timestamp() + expires; - let mut headers = req.headers_mut().expect("err"); + let headers = req.headers_mut().expect("headers_mut err"); let expires_str = headers.get("Expires"); if expires_str.is_none() { headers.insert("Expires", format!("{:010}", epoch_expires).parse().unwrap()); @@ -73,14 +65,14 @@ pub fn pre_sign_v2( req } -fn post_pre_sign_signature_v2(policy_base64: &str, secret_access_key: &str) -> String { +fn _post_pre_sign_signature_v2(policy_base64: &str, secret_access_key: &str) -> String { let signature = hex(hmac_sha1(secret_access_key, policy_base64)); signature } pub fn sign_v2( mut req: request::Builder, - content_len: i64, + _content_len: i64, access_key_id: &str, secret_access_key: &str, virtual_host: bool, @@ -93,7 +85,7 @@ pub fn sign_v2( let d2 = d.replace_time(time::Time::from_hms(0, 0, 0).unwrap()); let string_to_sign = string_to_sign_v2(&req, virtual_host); - let mut headers = req.headers_mut().expect("err"); + let headers = req.headers_mut().expect("err"); let date = headers.get("Date").unwrap(); if date.to_str().unwrap() == "" { @@ -107,7 +99,7 @@ pub fn sign_v2( ); } - let mut auth_header = format!("{} {}:", SIGN_V2_ALGORITHM, access_key_id); + let auth_header = format!("{} {}:", SIGN_V2_ALGORITHM, access_key_id); let auth_header = format!("{}{}", auth_header, base64_encode(&hmac_sha1(secret_access_key, string_to_sign))); headers.insert("Authorization", auth_header.parse().unwrap()); @@ -214,7 +206,7 @@ fn write_canonicalized_resource(buf: &mut BytesMut, req: &request::Builder, virt if request_url.query().unwrap() != "" { let mut n: i64 = 0; let result = serde_urlencoded::from_str::>>(req.uri_ref().unwrap().query().unwrap()); - let mut vals = result.unwrap_or_default(); + let vals = result.unwrap_or_default(); for resource in INCLUDED_QUERY { let vv = &vals[*resource]; if vv.len() > 0 { diff --git a/ecstore/src/signer/request_signature_v4.rs b/crates/signer/src/request_signature_v4.rs similarity index 95% rename from ecstore/src/signer/request_signature_v4.rs rename to crates/signer/src/request_signature_v4.rs index 691240c4..b989db14 100644 --- a/ecstore/src/signer/request_signature_v4.rs +++ b/crates/signer/src/request_signature_v4.rs @@ -1,28 +1,17 @@ -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use http::HeaderMap; use http::Uri; -use http::header::TRAILER; -use http::request::{self, Request}; +use http::request; use lazy_static::lazy_static; use std::collections::HashMap; use std::fmt::Write; -use time::{OffsetDateTime, format_description, macros::datetime, macros::format_description}; -use tracing::{debug, error, info, warn}; +use time::{OffsetDateTime, macros::format_description}; +use tracing::debug; -use super::ordered_qs::OrderedQs; -use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4; -use super::utils::stable_sort_by_first; -use super::utils::{get_host_addr, sign_v4_trim_all}; -use crate::client::constants::UNSIGNED_PAYLOAD; use rustfs_utils::crypto::{hex, hex_sha256, hmac_sha256}; -use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH; +use super::request_signature_streaming_unsigned_trailer::streaming_unsigned_v4; +use super::utils::{get_host_addr, sign_v4_trim_all}; +use super::constants::UNSIGNED_PAYLOAD; pub const SIGN_V4_ALGORITHM: &str = "AWS4-HMAC-SHA256"; pub const SERVICE_TYPE_S3: &str = "s3"; @@ -58,7 +47,7 @@ pub fn get_scope(location: &str, t: OffsetDateTime, service_type: &str) -> Strin let mut ans = String::from(""); ans.push_str(&t.format(&format).unwrap().to_string()); ans.push('/'); - ans.push_str(location); // TODO: use a `Region` type + ans.push_str(location); ans.push('/'); ans.push_str(service_type); ans.push_str("/aws4_request"); @@ -221,11 +210,6 @@ pub fn pre_sign_v4( return req; } - //let t = OffsetDateTime::now_utc(); - //let date = AmzDate::parse(timestamp).unwrap(); - let t2 = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap()); - - //let credential = get_scope(location, t, SERVICE_TYPE_S3); let credential = get_credential(access_key_id, location, t, SERVICE_TYPE_S3); let signed_headers = get_signed_headers(&req, &v4_ignored_headers); @@ -277,13 +261,13 @@ pub fn pre_sign_v4( req } -fn post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_access_key: &str, location: &str) -> String { +fn _post_pre_sign_signature_v4(policy_base64: &str, t: OffsetDateTime, secret_access_key: &str, location: &str) -> String { let signing_key = get_signing_key(secret_access_key, location, t, SERVICE_TYPE_S3); let signature = get_signature(signing_key, policy_base64); signature } -fn sign_v4_sts(mut req: request::Builder, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Builder { +fn _sign_v4_sts(req: request::Builder, access_key_id: &str, secret_access_key: &str, location: &str) -> request::Builder { sign_v4_inner(req, 0, access_key_id, secret_access_key, "", location, SERVICE_TYPE_STS, HeaderMap::new()) } @@ -304,7 +288,7 @@ fn sign_v4_inner( let t = OffsetDateTime::now_utc(); let t2 = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap()); - let mut headers = req.headers_mut().expect("err"); + let headers = req.headers_mut().expect("err"); let format = format_description!("[year][month][day]T[hour][minute][second]Z"); headers.insert("X-Amz-Date", t.format(&format).unwrap().to_string().parse().unwrap()); @@ -334,7 +318,7 @@ fn sign_v4_inner( let signature = get_signature(signing_key, &string_to_sign); //debug!("\n\ncanonical_request: \n{}\nstring_to_sign: \n{}\nsignature: \n{}\n\n", &canonical_request, &string_to_sign, &signature); - let mut headers = req.headers_mut().expect("err"); + let headers = req.headers_mut().expect("err"); let auth = format!( "{} Credential={}, SignedHeaders={}, Signature={}", @@ -352,14 +336,14 @@ fn sign_v4_inner( req } -fn unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: HeaderMap) { +fn _unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: HeaderMap) { if trailer.len() > 0 { return; } let t = OffsetDateTime::now_utc(); let t = t.replace_time(time::Time::from_hms(0, 0, 0).unwrap()); - let mut headers = req.headers_mut().expect("err"); + let headers = req.headers_mut().expect("err"); let format = format_description!("[year][month][day]T[hour][minute][second]Z"); headers.insert("X-Amz-Date", t.format(&format).unwrap().to_string().parse().unwrap()); @@ -379,7 +363,7 @@ fn unsigned_trailer(mut req: request::Builder, content_len: i64, trailer: Header } pub fn sign_v4( - mut req: request::Builder, + req: request::Builder, content_len: i64, access_key_id: &str, secret_access_key: &str, diff --git a/ecstore/src/signer/utils.rs b/crates/signer/src/utils.rs similarity index 93% rename from ecstore/src/signer/utils.rs rename to crates/signer/src/utils.rs index 8ce8d329..90052ff3 100644 --- a/ecstore/src/signer/utils.rs +++ b/crates/signer/src/utils.rs @@ -11,7 +11,7 @@ pub fn get_host_addr(req: &request::Builder) -> String { } if let Some(host) = host { if req_host != *host.to_str().unwrap() { - return host.to_str().unwrap().to_string(); + return (*host.to_str().unwrap()).to_string(); } } /*if req.uri_ref().unwrap().host().is_some() { diff --git a/reader/src/hasher.rs b/crates/utils/src/hasher.rs similarity index 97% rename from reader/src/hasher.rs rename to crates/utils/src/hasher.rs index 5f86a52f..06e39ec8 100644 --- a/reader/src/hasher.rs +++ b/crates/utils/src/hasher.rs @@ -1,6 +1,6 @@ use md5::{Digest as Md5Digest, Md5}; use sha2::{ - Digest, Sha256 as sha_sha256, + Sha256 as sha_sha256, digest::{Reset, Update}, }; pub trait Hasher { @@ -125,7 +125,7 @@ impl Default for MD5 { impl Hasher for MD5 { fn write(&mut self, bytes: &[u8]) { - self.hasher.update(bytes); + Md5Digest::update(&mut self.hasher, bytes); } fn reset(&mut self) {} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 3b74bcb7..234c9442 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -16,6 +16,8 @@ pub mod io; #[cfg(feature = "hash")] pub mod hash; +pub mod hasher; + #[cfg(feature = "os")] pub mod os; diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 6333a998..4c555be1 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -94,8 +94,8 @@ shadow-rs.workspace = true rustfs-filemeta.workspace = true rustfs-utils = { workspace = true, features = ["full"] } rustfs-rio.workspace = true +rustfs-signer.workspace = true futures-util.workspace = true -reader = { workspace = true } [target.'cfg(not(windows))'.dependencies] nix = { workspace = true } diff --git a/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index 99392795..62007088 100644 --- a/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -25,6 +25,8 @@ use tracing::{error, info}; use uuid::Uuid; use xxhash_rust::xxh64; +//use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; +//use rustfs_notify::{initialize, notification_system}; use super::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc}; use super::lifecycle::{self, ExpirationOptions, IlmAction, Lifecycle, TransitionOptions}; use super::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats}; @@ -109,6 +111,7 @@ struct ExpiryStats { workers: AtomicI64, } +#[allow(dead_code)] impl ExpiryStats { pub fn missed_tasks(&self) -> i64 { self.missed_expiry_tasks.load(Ordering::SeqCst) @@ -567,32 +570,6 @@ impl TransitionState { } } -struct AuditTierOp { - tier: String, - time_to_responsens: i64, - output_bytes: i64, - error: String, -} - -impl AuditTierOp { - #[allow(clippy::new_ret_no_self)] - pub async fn new() -> Result { - Ok(Self { - tier: String::from("tier"), - time_to_responsens: 0, - output_bytes: 0, - error: String::from(""), - }) - } - - pub fn string(&self) -> String { - format!( - "tier:{},respNS:{},tx:{},err:{}", - self.tier, self.time_to_responsens, self.output_bytes, self.error - ) - } -} - pub async fn init_background_expiry(api: Arc) { let mut workers = num_cpus::get() / 2; //globalILMConfig.getExpirationWorkers() @@ -717,6 +694,16 @@ pub async fn expire_transitioned_object( host: GLOBAL_LocalNodeName.to_string(), ..Default::default() }); + /*let system = match notification_system() { + Some(sys) => sys, + None => { + let config = Config::new(); + initialize(config).await?; + notification_system().expect("Failed to initialize notification system") + } + }; + let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut)); + system.send_event(event).await;*/ Ok(dobj) } @@ -845,4 +832,4 @@ pub struct RestoreObjectRequest { pub output_location: OutputLocation, } -const MAX_RESTORE_OBJECT_REQUEST_SIZE: i64 = 2 << 20; +const _MAX_RESTORE_OBJECT_REQUEST_SIZE: i64 = 2 << 20; diff --git a/ecstore/src/bucket/lifecycle/lifecycle.rs b/ecstore/src/bucket/lifecycle/lifecycle.rs index d4e3ba93..3d4576eb 100644 --- a/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -25,7 +25,7 @@ pub const TRANSITION_PENDING: &str = "pending"; const ERR_LIFECYCLE_TOO_MANY_RULES: &str = "Lifecycle configuration allows a maximum of 1000 rules"; const ERR_LIFECYCLE_NO_RULE: &str = "Lifecycle configuration should have at least one rule"; const ERR_LIFECYCLE_DUPLICATE_ID: &str = "Rule ID must be unique. Found same ID for more than one rule"; -const ERR_XML_NOT_WELL_FORMED: &str = "The XML you provided was not well-formed or did not validate against our published schema"; +const _ERR_XML_NOT_WELL_FORMED: &str = "The XML you provided was not well-formed or did not validate against our published schema"; const ERR_LIFECYCLE_BUCKET_LOCKED: &str = "ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an object locked bucket"; @@ -698,14 +698,6 @@ pub struct ExpirationOptions { } impl ExpirationOptions { - fn marshal_msg(&self, b: &[u8]) -> Result, std::io::Error> { - todo!(); - } - - fn unmarshal_msg(&self, bts: &[u8]) -> Result, std::io::Error> { - todo!(); - } - fn msg_size(&self) -> i64 { 1 + 7 + 10 } diff --git a/ecstore/src/bucket/lifecycle/rule.rs b/ecstore/src/bucket/lifecycle/rule.rs index 2ff61a2d..4bf9f53e 100644 --- a/ecstore/src/bucket/lifecycle/rule.rs +++ b/ecstore/src/bucket/lifecycle/rule.rs @@ -7,11 +7,11 @@ use s3s::dto::{LifecycleRuleFilter, Transition}; -const ERR_TRANSITION_INVALID_DAYS: &str = "Days must be 0 or greater when used with Transition"; -const ERR_TRANSITION_INVALID_DATE: &str = "Date must be provided in ISO 8601 format"; +const _ERR_TRANSITION_INVALID_DAYS: &str = "Days must be 0 or greater when used with Transition"; +const _ERR_TRANSITION_INVALID_DATE: &str = "Date must be provided in ISO 8601 format"; const ERR_TRANSITION_INVALID: &str = "Exactly one of Days (0 or greater) or Date (positive ISO 8601 format) should be present in Transition."; -const ERR_TRANSITION_DATE_NOT_MIDNIGHT: &str = "'Date' must be at midnight GMT"; +const _ERR_TRANSITION_DATE_NOT_MIDNIGHT: &str = "'Date' must be at midnight GMT"; pub trait Filter { fn test_tags(&self, user_tags: &str) -> bool; diff --git a/ecstore/src/bucket/lifecycle/tier_last_day_stats.rs b/ecstore/src/bucket/lifecycle/tier_last_day_stats.rs index ae187a53..64f41dff 100644 --- a/ecstore/src/bucket/lifecycle/tier_last_day_stats.rs +++ b/ecstore/src/bucket/lifecycle/tier_last_day_stats.rs @@ -65,6 +65,7 @@ impl LastDayTierStats { } } + #[allow(dead_code)] fn merge(&self, m: LastDayTierStats) -> LastDayTierStats { let mut cl = self.clone(); let mut cm = m.clone(); diff --git a/ecstore/src/bucket/lifecycle/tier_sweeper.rs b/ecstore/src/bucket/lifecycle/tier_sweeper.rs index a0e720a2..68d7fc2b 100644 --- a/ecstore/src/bucket/lifecycle/tier_sweeper.rs +++ b/ecstore/src/bucket/lifecycle/tier_sweeper.rs @@ -17,6 +17,7 @@ use crate::global::GLOBAL_TierConfigMgr; static XXHASH_SEED: u64 = 0; #[derive(Default)] +#[allow(dead_code)] struct ObjSweeper { object: String, bucket: String, @@ -29,6 +30,7 @@ struct ObjSweeper { remote_object: String, } +#[allow(dead_code)] impl ObjSweeper { #[allow(clippy::new_ret_no_self)] pub async fn new(bucket: &str, object: &str) -> Result { @@ -103,6 +105,7 @@ impl ObjSweeper { } #[derive(Debug, Clone)] +#[allow(unused_assignments)] pub struct Jentry { obj_name: String, version_id: String, diff --git a/ecstore/src/bucket/object_lock/objectlock.rs b/ecstore/src/bucket/object_lock/objectlock.rs index fc53170c..4fd2fb59 100644 --- a/ecstore/src/bucket/object_lock/objectlock.rs +++ b/ecstore/src/bucket/object_lock/objectlock.rs @@ -4,21 +4,15 @@ use time::{OffsetDateTime, format_description}; use s3s::dto::{Date, ObjectLockLegalHold, ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode}; use s3s::header::{X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE}; -//const AMZ_OBJECTLOCK_BYPASS_RET_GOVERNANCE: &str = "X-Amz-Bypass-Governance-Retention"; -//const AMZ_OBJECTLOCK_RETAIN_UNTIL_DATE: &str = "X-Amz-Object-Lock-Retain-Until-Date"; -//const AMZ_OBJECTLOCK_MODE: &str = "X-Amz-Object-Lock-Mode"; -//const AMZ_OBJECTLOCK_LEGALHOLD: &str = "X-Amz-Object-Lock-Legal-Hold"; - -// Commented out unused constants to avoid dead code warnings -// const ERR_MALFORMED_BUCKET_OBJECT_CONFIG: &str = "invalid bucket object lock config"; -// const ERR_INVALID_RETENTION_DATE: &str = "date must be provided in ISO 8601 format"; -// const ERR_PAST_OBJECTLOCK_RETAIN_DATE: &str = "the retain until date must be in the future"; -// const ERR_UNKNOWN_WORMMODE_DIRECTIVE: &str = "unknown WORM mode directive"; -// const ERR_OBJECTLOCK_MISSING_CONTENT_MD5: &str = -// "content-MD5 HTTP header is required for Put Object requests with Object Lock parameters"; -// const ERR_OBJECTLOCK_INVALID_HEADERS: &str = -// "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied"; -// const ERR_MALFORMED_XML: &str = "the XML you provided was not well-formed or did not validate against our published schema"; +const _ERR_MALFORMED_BUCKET_OBJECT_CONFIG: &str = "invalid bucket object lock config"; +const _ERR_INVALID_RETENTION_DATE: &str = "date must be provided in ISO 8601 format"; +const _ERR_PAST_OBJECTLOCK_RETAIN_DATE: &str = "the retain until date must be in the future"; +const _ERR_UNKNOWN_WORMMODE_DIRECTIVE: &str = "unknown WORM mode directive"; +const _ERR_OBJECTLOCK_MISSING_CONTENT_MD5: &str = + "content-MD5 HTTP header is required for Put Object requests with Object Lock parameters"; +const _ERR_OBJECTLOCK_INVALID_HEADERS: &str = + "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied"; +const _ERR_MALFORMED_XML: &str = "the XML you provided was not well-formed or did not validate against our published schema"; pub fn utc_now_ntp() -> OffsetDateTime { OffsetDateTime::now_utc() diff --git a/ecstore/src/bucket/object_lock/objectlock_sys.rs b/ecstore/src/bucket/object_lock/objectlock_sys.rs index 2c0c5096..1e9c6de2 100644 --- a/ecstore/src/bucket/object_lock/objectlock_sys.rs +++ b/ecstore/src/bucket/object_lock/objectlock_sys.rs @@ -17,7 +17,7 @@ impl BucketObjectLockSys { } pub async fn get(bucket: &str) -> Option { - if let Some(object_lock_config) = get_object_lock_config(bucket).await { + if let Ok(object_lock_config) = get_object_lock_config(bucket).await { if let Some(object_lock_rule) = object_lock_config.0.rule { return object_lock_rule.default_retention; } diff --git a/ecstore/src/checksum.rs b/ecstore/src/checksum.rs index 2c3ed30c..cd204735 100644 --- a/ecstore/src/checksum.rs +++ b/ecstore/src/checksum.rs @@ -11,7 +11,7 @@ use std::collections::HashMap; use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart}; use crate::{disk::DiskAPI, store_api::GetObjectReader}; -use reader::hasher::{Hasher, Sha256}; +use rustfs_utils::hasher::{Hasher, Sha256}; use rustfs_utils::crypto::{base64_decode, base64_encode}; use s3s::header::{ X_AMZ_CHECKSUM_ALGORITHM, X_AMZ_CHECKSUM_CRC32, X_AMZ_CHECKSUM_CRC32C, X_AMZ_CHECKSUM_SHA1, X_AMZ_CHECKSUM_SHA256, @@ -234,6 +234,7 @@ pub struct Checksum { computed: bool, } +#[allow(dead_code)] impl Checksum { fn new(t: ChecksumMode, b: &[u8]) -> Checksum { if t.is_set() && b.len() == t.raw_byte_len() { diff --git a/ecstore/src/client/admin_handler_utils.rs b/ecstore/src/client/admin_handler_utils.rs index 91e047f6..42037dd4 100644 --- a/ecstore/src/client/admin_handler_utils.rs +++ b/ecstore/src/client/admin_handler_utils.rs @@ -1,10 +1,10 @@ use http::status::StatusCode; use std::fmt::{self, Display, Formatter}; -#[derive(Default, thiserror::Error, Debug, PartialEq)] +#[derive(Default, thiserror::Error, Debug, Clone, PartialEq)] pub struct AdminError { - pub code: &'static str, - pub message: &'static str, + pub code: String, + pub message: String, pub status_code: StatusCode, } @@ -15,18 +15,18 @@ impl Display for AdminError { } impl AdminError { - pub fn new(code: &'static str, message: &'static str, status_code: StatusCode) -> Self { + pub fn new(code: &str, message: &str, status_code: StatusCode) -> Self { Self { - code, - message, + code: code.to_string(), + message: message.to_string(), status_code, } } - pub fn msg(message: &'static str) -> Self { + pub fn msg(message: &str) -> Self { Self { - code: "InternalError", - message, + code: "InternalError".to_string(), + message: message.to_string(), status_code: StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/ecstore/src/client/api_get_object.rs b/ecstore/src/client/api_get_object.rs index dd86a66f..cf001272 100644 --- a/ecstore/src/client/api_get_object.rs +++ b/ecstore/src/client/api_get_object.rs @@ -61,6 +61,7 @@ impl TransitionClient { } #[derive(Default)] +#[allow(dead_code)] pub struct GetRequest { pub buffer: Vec, pub offset: i64, @@ -72,6 +73,7 @@ pub struct GetRequest { pub setting_object_info: bool, } +#[allow(dead_code)] pub struct GetResponse { pub size: i64, //pub error: error, diff --git a/ecstore/src/client/api_get_options.rs b/ecstore/src/client/api_get_options.rs index 7b0ce537..bf67e111 100644 --- a/ecstore/src/client/api_get_options.rs +++ b/ecstore/src/client/api_get_options.rs @@ -14,6 +14,7 @@ use tracing::warn; use crate::client::api_error_response::err_invalid_argument; #[derive(Default)] +#[allow(dead_code)] pub struct AdvancedGetOptions { replication_deletemarker: bool, is_replication_ready_for_deletemarker: bool, @@ -30,8 +31,6 @@ pub struct GetObjectOptions { pub internal: AdvancedGetOptions, } -type StatObjectOptions = GetObjectOptions; - impl Default for GetObjectOptions { fn default() -> Self { Self { diff --git a/ecstore/src/client/api_list.rs b/ecstore/src/client/api_list.rs index f43c8dd8..4586eaf0 100644 --- a/ecstore/src/client/api_list.rs +++ b/ecstore/src/client/api_list.rs @@ -258,6 +258,7 @@ impl TransitionClient { } } +#[allow(dead_code)] pub struct ListObjectsOptions { reverse_versions: bool, with_versions: bool, diff --git a/ecstore/src/client/api_put_object.rs b/ecstore/src/client/api_put_object.rs index d3df829d..41f0716d 100644 --- a/ecstore/src/client/api_put_object.rs +++ b/ecstore/src/client/api_put_object.rs @@ -12,7 +12,7 @@ use std::{collections::HashMap, sync::Arc}; use time::{Duration, OffsetDateTime, macros::format_description}; use tracing::{error, info, warn}; -use reader::hasher::Hasher; +use rustfs_utils::hasher::Hasher; use s3s::dto::{ObjectLockLegalHoldStatus, ObjectLockRetentionMode, ReplicationStatus}; use s3s::header::{ X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, X_AMZ_REPLICATION_STATUS, @@ -124,6 +124,7 @@ impl Default for PutObjectOptions { } } +#[allow(dead_code)] impl PutObjectOptions { fn set_matche_tag(&mut self, etag: &str) { if etag == "*" { diff --git a/ecstore/src/client/api_put_object_common.rs b/ecstore/src/client/api_put_object_common.rs index 6d5ba2af..ef512578 100644 --- a/ecstore/src/client/api_put_object_common.rs +++ b/ecstore/src/client/api_put_object_common.rs @@ -14,8 +14,6 @@ use crate::client::{ transition_api::TransitionClient, }; -const NULL_VERSION_ID: &str = "null"; - pub fn is_object(reader: &ReaderImpl) -> bool { todo!(); } diff --git a/ecstore/src/client/api_put_object_multipart.rs b/ecstore/src/client/api_put_object_multipart.rs index f3a9d32b..25215f9c 100644 --- a/ecstore/src/client/api_put_object_multipart.rs +++ b/ecstore/src/client/api_put_object_multipart.rs @@ -18,7 +18,7 @@ use tracing::{error, info}; use url::form_urlencoded::Serializer; use uuid::Uuid; -use reader::hasher::Hasher; +use rustfs_utils::hasher::Hasher; use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID}; use s3s::{Body, dto::StreamingBlob}; //use crate::disk::{Reader, BufferReader}; diff --git a/ecstore/src/client/api_put_object_streaming.rs b/ecstore/src/client/api_put_object_streaming.rs index 9f493770..4c8e4219 100644 --- a/ecstore/src/client/api_put_object_streaming.rs +++ b/ecstore/src/client/api_put_object_streaming.rs @@ -27,7 +27,7 @@ use crate::client::{ constants::ISO8601_DATEFORMAT, transition_api::{ReaderImpl, RequestMetadata, TransitionClient, UploadInfo}, }; -use reader::hasher::Hasher; +use rustfs_utils::hasher::Hasher; use rustfs_utils::{crypto::base64_encode, path::trim_etag}; use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID}; diff --git a/ecstore/src/client/api_remove.rs b/ecstore/src/client/api_remove.rs index f62c0889..fe0b6023 100644 --- a/ecstore/src/client/api_remove.rs +++ b/ecstore/src/client/api_remove.rs @@ -24,14 +24,15 @@ use crate::{ disk::DiskAPI, store_api::{GetObjectReader, ObjectInfo, StorageAPI}, }; -use reader::hasher::{sum_md5_base64, sum_sha256_hex}; +use rustfs_utils::hasher::{sum_md5_base64, sum_sha256_hex}; use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH; pub struct RemoveBucketOptions { - forced_elete: bool, + _forced_elete: bool, } #[derive(Debug)] +#[allow(dead_code)] pub struct AdvancedRemoveOptions { replication_delete_marker: bool, replication_status: ReplicationStatus, @@ -426,8 +427,10 @@ impl TransitionClient { } #[derive(Debug, Default)] +#[allow(dead_code)] pub struct RemoveObjectError { object_name: String, + #[allow(dead_code)] version_id: String, err: Option, } diff --git a/ecstore/src/client/api_s3_datatypes.rs b/ecstore/src/client/api_s3_datatypes.rs index 8facd330..110bfdc4 100644 --- a/ecstore/src/client/api_s3_datatypes.rs +++ b/ecstore/src/client/api_s3_datatypes.rs @@ -43,6 +43,7 @@ pub struct ListBucketV2Result { pub start_after: String, } +#[allow(dead_code)] pub struct Version { etag: String, is_latest: bool, @@ -72,12 +73,6 @@ pub struct ListVersionsResult { next_version_id_marker: String, } -impl ListVersionsResult { - fn unmarshal_xml() -> Result<(), std::io::Error> { - todo!(); - } -} - pub struct ListBucketResult { common_prefixes: Vec, contents: Vec, diff --git a/ecstore/src/client/bucket_cache.rs b/ecstore/src/client/bucket_cache.rs index 7d3c3c70..028a726d 100644 --- a/ecstore/src/client/bucket_cache.rs +++ b/ecstore/src/client/bucket_cache.rs @@ -17,8 +17,7 @@ use crate::client::{ api_error_response::{http_resp_to_error_response, to_error_response}, transition_api::{Document, TransitionClient}, }; -use crate::signer; -use reader::hasher::{Hasher, Sha256}; +use rustfs_utils::hasher::{Hasher, Sha256}; use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH; use s3s::Body; use s3s::S3ErrorCode; @@ -151,7 +150,7 @@ impl TransitionClient { } if signer_type == SignatureType::SignatureV2 { - let req_builder = signer::sign_v2(req_builder, 0, &access_key_id, &secret_access_key, is_virtual_style); + let req_builder = rustfs_signer::sign_v2(req_builder, 0, &access_key_id, &secret_access_key, is_virtual_style); let req = match req_builder.body(Body::empty()) { Ok(req) => return Ok(req), Err(err) => { @@ -169,7 +168,7 @@ impl TransitionClient { .headers_mut() .expect("err") .insert("X-Amz-Content-Sha256", content_sha256.parse().unwrap()); - let req_builder = signer::sign_v4(req_builder, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1"); + let req_builder = rustfs_signer::sign_v4(req_builder, 0, &access_key_id, &secret_access_key, &session_token, "us-east-1"); let req = match req_builder.body(Body::empty()) { Ok(req) => return Ok(req), Err(err) => { diff --git a/ecstore/src/client/constants.rs b/ecstore/src/client/constants.rs index 148a43b3..2db84441 100644 --- a/ecstore/src/client/constants.rs +++ b/ecstore/src/client/constants.rs @@ -1,4 +1,3 @@ -#![allow(clippy::map_entry)] #![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_mut)] @@ -28,4 +27,4 @@ pub const ISO8601_DATEFORMAT: &[FormatItem<'_>] = pub const GET_OBJECT_ATTRIBUTES_TAGS: &str = "ETag,Checksum,StorageClass,ObjectSize,ObjectParts"; pub const GET_OBJECT_ATTRIBUTES_MAX_PARTS: i64 = 1000; -const RUSTFS_BUCKET_SOURCE_MTIME: &str = "X-RustFs-Source-Mtime"; +const _RUSTFS_BUCKET_SOURCE_MTIME: &str = "X-RustFs-Source-Mtime"; diff --git a/ecstore/src/client/credentials.rs b/ecstore/src/client/credentials.rs index e08e7d8d..77971f99 100644 --- a/ecstore/src/client/credentials.rs +++ b/ecstore/src/client/credentials.rs @@ -141,28 +141,6 @@ impl ErrorResponse { } } -struct Error { - code: String, - message: String, - bucket_name: String, - key: String, - resource: String, - request_id: String, - host_id: String, - region: String, - server: String, - status_code: i64, -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if self.message == "" { - return write!(f, "{}", format!("Error response code {}.", self.code)); - } - write!(f, "{}", self.message) - } -} - pub fn xml_decoder(body: &[u8]) -> Result { todo!(); } diff --git a/ecstore/src/client/object_api_utils.rs b/ecstore/src/client/object_api_utils.rs index 9f22de8c..c0cc9cc9 100644 --- a/ecstore/src/client/object_api_utils.rs +++ b/ecstore/src/client/object_api_utils.rs @@ -24,16 +24,12 @@ pub struct PutObjReader { //pub sealMD5Fn: SealMD5CurrFn, } +#[allow(dead_code)] impl PutObjReader { pub fn new(raw_reader: HashReader) -> Self { todo!(); } - fn size(&self) -> usize { - //self.reader.size() - todo!(); - } - fn md5_current_hex_string(&self) -> String { todo!(); } diff --git a/ecstore/src/client/transition_api.rs b/ecstore/src/client/transition_api.rs index 957c43d2..2003e17d 100644 --- a/ecstore/src/client/transition_api.rs +++ b/ecstore/src/client/transition_api.rs @@ -45,9 +45,8 @@ use crate::client::{ constants::{UNSIGNED_PAYLOAD, UNSIGNED_PAYLOAD_TRAILER}, credentials::{CredContext, Credentials, SignatureType, Static}, }; -use crate::signer; use crate::{checksum::ChecksumMode, store_api::GetObjectReader}; -use reader::hasher::{MD5, Sha256}; +use rustfs_utils::hasher::{MD5, Sha256}; use rustfs_rio::HashReader; use rustfs_utils::{ net::get_endpoint_url, @@ -57,7 +56,7 @@ use s3s::S3ErrorCode; use s3s::dto::ReplicationStatus; use s3s::{Body, dto::Owner}; -const C_USER_AGENT_PREFIX: &str = "RustFS (linux; x86)"; +const _C_USER_AGENT_PREFIX: &str = "RustFS (linux; x86)"; const C_USER_AGENT: &str = "RustFS (linux; x86)"; const SUCCESS_STATUS: [StatusCode; 3] = [StatusCode::OK, StatusCode::NO_CONTENT, StatusCode::PARTIAL_CONTENT]; @@ -433,9 +432,9 @@ impl TransitionClient { } if signer_type == SignatureType::SignatureV2 { req_builder = - signer::pre_sign_v2(req_builder, &access_key_id, &secret_access_key, metadata.expires, is_virtual_host); + rustfs_signer::pre_sign_v2(req_builder, &access_key_id, &secret_access_key, metadata.expires, is_virtual_host); } else if signer_type == SignatureType::SignatureV4 { - req_builder = signer::pre_sign_v4( + req_builder = rustfs_signer::pre_sign_v4( req_builder, &access_key_id, &secret_access_key, @@ -486,7 +485,7 @@ impl TransitionClient { if signer_type == SignatureType::SignatureV2 { req_builder = - signer::sign_v2(req_builder, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host); + rustfs_signer::sign_v2(req_builder, metadata.content_length, &access_key_id, &secret_access_key, is_virtual_host); } else if metadata.stream_sha256 && !self.secure { if metadata.trailer.len() > 0 { //req.Trailer = metadata.trailer; @@ -494,7 +493,7 @@ impl TransitionClient { req_builder = req_builder.header(http::header::TRAILER, v.clone()); } } - //req_builder = signer::streaming_sign_v4(req_builder, &access_key_id, + //req_builder = rustfs_signer::streaming_sign_v4(req_builder, &access_key_id, // &secret_access_key, &session_token, &location, metadata.content_length, OffsetDateTime::now_utc(), self.sha256_hasher()); } else { let mut sha_header = UNSIGNED_PAYLOAD.to_string(); @@ -509,7 +508,7 @@ impl TransitionClient { req_builder = req_builder .header::("X-Amz-Content-Sha256".parse().unwrap(), sha_header.parse().expect("err")); - req_builder = signer::sign_v4_trailer( + req_builder = rustfs_signer::sign_v4_trailer( req_builder, &access_key_id, &secret_access_key, @@ -614,23 +613,6 @@ impl TransitionClient { } } -struct LockedRandSource { - src: u64, //rand.Source, -} - -impl LockedRandSource { - fn int63(&self) -> i64 { - /*let n = self.src.int63(); - n*/ - todo!(); - } - - fn seed(&self, seed: i64) { - //self.src.seed(seed); - todo!(); - } -} - pub struct RequestMetadata { pub pre_sign_url: bool, pub bucket_name: String, diff --git a/ecstore/src/error.rs b/ecstore/src/error.rs index 98ee78de..8d45994f 100644 --- a/ecstore/src/error.rs +++ b/ecstore/src/error.rs @@ -810,6 +810,7 @@ pub fn error_resp_to_object_err(err: ErrorResponse, params: Vec<&str>) -> std::i } let r_err = err; + let err; let bucket = bucket.to_string(); let object = object.to_string(); let version_id = version_id.to_string(); @@ -870,9 +871,10 @@ pub fn error_resp_to_object_err(err: ErrorResponse, params: Vec<&str>) -> std::i /*S3ErrorCode::ReplicationPermissionCheck => { err = std::io::Error::other(StorageError::ReplicationPermissionCheck); }*/ - _ => std::io::Error::other("err"), + _ => { + err = err_; + } } -} pub fn storage_to_object_err(err: Error, params: Vec<&str>) -> S3Error { let storage_err = &err; diff --git a/ecstore/src/heal/data_usage_cache.rs b/ecstore/src/heal/data_usage_cache.rs index f5286ac7..b2743d62 100644 --- a/ecstore/src/heal/data_usage_cache.rs +++ b/ecstore/src/heal/data_usage_cache.rs @@ -146,6 +146,7 @@ impl TierStats { } } +#[allow(dead_code)] #[allow(dead_code)] struct AllTierStats { tiers: HashMap, diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 955287c7..9b14d888 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -34,7 +34,6 @@ pub mod checksum; pub mod client; pub mod event; pub mod event_notification; -pub mod signer; pub mod tier; pub use global::new_object_layer_fn; diff --git a/ecstore/src/signer/ordered_qs.rs b/ecstore/src/signer/ordered_qs.rs deleted file mode 100644 index 7cc97e2b..00000000 --- a/ecstore/src/signer/ordered_qs.rs +++ /dev/null @@ -1,109 +0,0 @@ -//! Ordered query strings - -use crate::signer::utils::stable_sort_by_first; - -/// Immutable query string container -#[derive(Debug, Default, Clone)] -pub struct OrderedQs { - /// Ascending query strings - qs: Vec<(String, String)>, -} - -/// [`OrderedQs`] -#[derive(Debug, thiserror::Error)] -#[error("ParseOrderedQsError: {inner}")] -pub struct ParseOrderedQsError { - /// url decode error - inner: serde_urlencoded::de::Error, -} - -impl OrderedQs { - /// Constructs [`OrderedQs`] from vec - /// - /// + strings must be url-decoded - #[cfg(test)] - #[must_use] - pub fn from_vec_unchecked(mut v: Vec<(String, String)>) -> Self { - stable_sort_by_first(&mut v); - Self { qs: v } - } - - /// Parses [`OrderedQs`] from query - /// - /// # Errors - /// Returns [`ParseOrderedQsError`] if query cannot be decoded - pub fn parse(query: &str) -> Result { - let result = serde_urlencoded::from_str::>(query); - let mut v = result.map_err(|e| ParseOrderedQsError { inner: e })?; - stable_sort_by_first(&mut v); - Ok(Self { qs: v }) - } - - #[must_use] - pub fn has(&self, name: &str) -> bool { - self.qs.binary_search_by_key(&name, |x| x.0.as_str()).is_ok() - } - - /// Gets query values by name. Time `O(logn)` - pub fn get_all(&self, name: &str) -> impl Iterator + use<'_> { - let qs = self.qs.as_slice(); - - let lower_bound = qs.partition_point(|x| x.0.as_str() < name); - let upper_bound = qs.partition_point(|x| x.0.as_str() <= name); - - qs[lower_bound..upper_bound].iter().map(|x| x.1.as_str()) - } - - pub fn get_unique(&self, name: &str) -> Option<&str> { - let qs = self.qs.as_slice(); - let lower_bound = qs.partition_point(|x| x.0.as_str() < name); - - let mut iter = qs[lower_bound..].iter(); - let pair = iter.next()?; - - if let Some(following) = iter.next() { - if following.0 == name { - return None; - } - } - - (pair.0.as_str() == name).then_some(pair.1.as_str()) - } -} - -impl AsRef<[(String, String)]> for OrderedQs { - fn as_ref(&self) -> &[(String, String)] { - self.qs.as_ref() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn tag() { - { - let query = "tagging"; - let qs = OrderedQs::parse(query).unwrap(); - assert_eq!(qs.as_ref(), &[("tagging".to_owned(), String::new())]); - - assert_eq!(qs.get_unique("taggin"), None); - assert_eq!(qs.get_unique("tagging"), Some("")); - assert_eq!(qs.get_unique("taggingg"), None); - } - - { - let query = "tagging&tagging"; - let qs = OrderedQs::parse(query).unwrap(); - assert_eq!( - qs.as_ref(), - &[("tagging".to_owned(), String::new()), ("tagging".to_owned(), String::new())] - ); - - assert_eq!(qs.get_unique("taggin"), None); - assert_eq!(qs.get_unique("tagging"), None); - assert_eq!(qs.get_unique("taggingg"), None); - } - } -} diff --git a/ecstore/src/signer/request_signature_streaming_unsigned_trailer.rs b/ecstore/src/signer/request_signature_streaming_unsigned_trailer.rs deleted file mode 100644 index ef03015b..00000000 --- a/ecstore/src/signer/request_signature_streaming_unsigned_trailer.rs +++ /dev/null @@ -1,17 +0,0 @@ -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] - -use http::request; -use time::OffsetDateTime; - -pub fn streaming_unsigned_v4( - mut req: request::Builder, - session_token: &str, - data_len: i64, - req_time: OffsetDateTime, -) -> request::Builder { - todo!(); -} diff --git a/ecstore/src/tier/tier.rs b/ecstore/src/tier/tier.rs index e9ed6861..c96d1e67 100644 --- a/ecstore/src/tier/tier.rs +++ b/ecstore/src/tier/tier.rs @@ -53,42 +53,40 @@ pub const TIER_CONFIG_FORMAT: u16 = 1; pub const TIER_CONFIG_V1: u16 = 1; pub const TIER_CONFIG_VERSION: u16 = 1; +const _TIER_CFG_REFRESH_AT_HDR: &str = "X-RustFS-TierCfg-RefreshedAt"; + lazy_static! { - //pub static ref TIER_CONFIG_PATH: PathBuf = path_join(&[PathBuf::from(RUSTFS_CONFIG_PREFIX), PathBuf::from(TIER_CONFIG_FILE)]); + pub static ref ERR_TIER_MISSING_CREDENTIALS: AdminError = AdminError { + code: "XRustFSAdminTierMissingCredentials".to_string(), + message: "Specified remote credentials are empty".to_string(), + status_code: StatusCode::FORBIDDEN, + }; + + pub static ref ERR_TIER_BACKEND_IN_USE: AdminError = AdminError { + code: "XRustFSAdminTierBackendInUse".to_string(), + message: "Specified remote tier is already in use".to_string(), + status_code: StatusCode::CONFLICT, + }; + + pub static ref ERR_TIER_TYPE_UNSUPPORTED: AdminError = AdminError { + code: "XRustFSAdminTierTypeUnsupported".to_string(), + message: "Specified tier type is unsupported".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; + + pub static ref ERR_TIER_BACKEND_NOT_EMPTY: AdminError = AdminError { + code: "XRustFSAdminTierBackendNotEmpty".to_string(), + message: "Specified remote backend is not empty".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; + + pub static ref ERR_TIER_INVALID_CONFIG: AdminError = AdminError { + code: "XRustFSAdminTierInvalidConfig".to_string(), + message: "Unable to setup remote tier, check tier configuration".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; } -const TIER_CFG_REFRESH_AT_HDR: &str = "X-RustFS-TierCfg-RefreshedAt"; - -pub const ERR_TIER_MISSING_CREDENTIALS: AdminError = AdminError { - code: "XRustFSAdminTierMissingCredentials", - message: "Specified remote credentials are empty", - status_code: StatusCode::FORBIDDEN, -}; - -pub const ERR_TIER_BACKEND_IN_USE: AdminError = AdminError { - code: "XRustFSAdminTierBackendInUse", - message: "Specified remote tier is already in use", - status_code: StatusCode::CONFLICT, -}; - -pub const ERR_TIER_TYPE_UNSUPPORTED: AdminError = AdminError { - code: "XRustFSAdminTierTypeUnsupported", - message: "Specified tier type is unsupported", - status_code: StatusCode::BAD_REQUEST, -}; - -pub const ERR_TIER_BACKEND_NOT_EMPTY: AdminError = AdminError { - code: "XRustFSAdminTierBackendNotEmpty", - message: "Specified remote backend is not empty", - status_code: StatusCode::BAD_REQUEST, -}; - -pub const ERR_TIER_INVALID_CONFIG: AdminError = AdminError { - code: "XRustFSAdminTierInvalidConfig", - message: "Unable to setup remote tier, check tier configuration", - status_code: StatusCode::BAD_REQUEST, -}; - #[derive(Serialize, Deserialize)] pub struct TierConfigMgr { #[serde(skip)] @@ -108,20 +106,13 @@ impl TierConfigMgr { pub fn unmarshal(data: &[u8]) -> std::result::Result { let cfg: TierConfigMgr = serde_json::from_slice(data)?; - //let mut cfg = TierConfigMgr(m); - //let mut cfg = m; Ok(cfg) } pub fn marshal(&self) -> std::result::Result { let data = serde_json::to_vec(&self)?; - - //let mut data = Vec::with_capacity(self.msg_size()+4); let mut data = Bytes::from(data); - //LittleEndian::write_u16(&mut data[0..2], TIER_CONFIG_FORMAT); - //LittleEndian::write_u16(&mut data[2..4], TIER_CONFIG_VERSION); - Ok(data) } @@ -144,12 +135,12 @@ impl TierConfigMgr { pub async fn add(&mut self, tier: TierConfig, force: bool) -> std::result::Result<(), AdminError> { let tier_name = &tier.name; if tier_name != tier_name.to_uppercase().as_str() { - return Err(ERR_TIER_NAME_NOT_UPPERCASE); + return Err(ERR_TIER_NAME_NOT_UPPERCASE.clone()); } let (_, b) = self.is_tier_name_in_use(tier_name); if b { - return Err(ERR_TIER_ALREADY_EXISTS); + return Err(ERR_TIER_ALREADY_EXISTS.clone()); } let d = new_warm_backend(&tier, true).await?; @@ -159,19 +150,22 @@ impl TierConfigMgr { match in_use { Ok(b) => { if b { - return Err(ERR_TIER_BACKEND_IN_USE); + return Err(ERR_TIER_BACKEND_IN_USE.clone()); } } Err(err) => { warn!("tier add failed, err: {:?}", err); if err.to_string().contains("connect") { - return Err(ERR_TIER_CONNECT_ERR); + return Err(ERR_TIER_CONNECT_ERR.clone()); } else if err.to_string().contains("authorization") { - return Err(ERR_TIER_INVALID_CREDENTIALS); + return Err(ERR_TIER_INVALID_CREDENTIALS.clone()); } else if err.to_string().contains("bucket") { - return Err(ERR_TIER_BUCKET_NOT_FOUND); + return Err(ERR_TIER_BUCKET_NOT_FOUND.clone()); } - return Err(ERR_TIER_PERM_ERR); + let mut e = ERR_TIER_PERM_ERR.clone(); + e.message.push('.'); + e.message.push_str(&err.to_string()); + return Err(e); } } } @@ -185,21 +179,21 @@ impl TierConfigMgr { pub async fn remove(&mut self, tier_name: &str, force: bool) -> std::result::Result<(), AdminError> { let d = self.get_driver(tier_name).await; if let Err(err) = d { - match err { - ERR_TIER_NOT_FOUND => { - return Ok(()); - } - _ => { - return Err(err); - } + if err.code == ERR_TIER_NOT_FOUND.code { + return Ok(()); + } else { + return Err(err); } } if !force { let inuse = d.expect("err").in_use().await; if let Err(err) = inuse { - return Err(ERR_TIER_PERM_ERR); + let mut e = ERR_TIER_PERM_ERR.clone(); + e.message.push('.'); + e.message.push_str(&err.to_string()); + return Err(e); } else if inuse.expect("err") { - return Err(ERR_TIER_BACKEND_NOT_EMPTY); + return Err(ERR_TIER_BACKEND_NOT_EMPTY.clone()); } } self.tiers.remove(tier_name); @@ -254,7 +248,7 @@ impl TierConfigMgr { pub async fn edit(&mut self, tier_name: &str, creds: TierCreds) -> std::result::Result<(), AdminError> { let (tier_type, exists) = self.is_tier_name_in_use(tier_name); if !exists { - return Err(ERR_TIER_NOT_FOUND); + return Err(ERR_TIER_NOT_FOUND.clone()); } let mut cfg = self.tiers[tier_name].clone(); @@ -276,7 +270,7 @@ impl TierConfigMgr { TierType::RustFS => { let mut rustfs = cfg.rustfs.as_mut().expect("err"); if creds.access_key == "" || creds.secret_key == "" { - return Err(ERR_TIER_MISSING_CREDENTIALS); + return Err(ERR_TIER_MISSING_CREDENTIALS.clone()); } rustfs.access_key = creds.access_key; rustfs.secret_key = creds.secret_key; @@ -284,7 +278,7 @@ impl TierConfigMgr { TierType::MinIO => { let mut minio = cfg.minio.as_mut().expect("err"); if creds.access_key == "" || creds.secret_key == "" { - return Err(ERR_TIER_MISSING_CREDENTIALS); + return Err(ERR_TIER_MISSING_CREDENTIALS.clone()); } minio.access_key = creds.access_key; minio.secret_key = creds.secret_key; @@ -304,7 +298,7 @@ impl TierConfigMgr { Entry::Vacant(e) => { let t = self.tiers.get(tier_name); if t.is_none() { - return Err(ERR_TIER_NOT_FOUND); + return Err(ERR_TIER_NOT_FOUND.clone()); } let d = new_warm_backend(t.expect("err"), false).await?; e.insert(d) @@ -332,6 +326,12 @@ impl TierConfigMgr { Ok(()) } + pub async fn clear_tier(&mut self, force: bool) -> std::result::Result<(), AdminError> { + self.tiers.clear(); + self.driver_cache.clear(); + Ok(()) + } + #[tracing::instrument(level = "debug", name = "tier_save", skip(self))] pub async fn save(&self) -> std::result::Result<(), std::io::Error> { let Some(api) = new_object_layer_fn() else { diff --git a/ecstore/src/tier/tier_config.rs b/ecstore/src/tier/tier_config.rs index 8295f27a..6b1f225b 100644 --- a/ecstore/src/tier/tier_config.rs +++ b/ecstore/src/tier/tier_config.rs @@ -1,10 +1,3 @@ -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - use serde::{Deserialize, Serialize}; use std::fmt::Display; use tracing::info; @@ -12,9 +5,6 @@ use tracing::info; const C_TIER_CONFIG_VER: &str = "v1"; const ERR_TIER_NAME_EMPTY: &str = "remote tier name empty"; -const ERR_TIER_INVALID_CONFIG: &str = "invalid tier config"; -const ERR_TIER_INVALID_CONFIG_VERSION: &str = "invalid tier config version"; -const ERR_TIER_TYPE_UNSUPPORTED: &str = "unsupported tier type"; #[derive(Serialize, Deserialize, Default, Debug, Clone)] pub enum TierType { @@ -128,20 +118,8 @@ impl Clone for TierConfig { } } +#[allow(dead_code)] impl TierConfig { - pub fn unmarshal(data: &[u8]) -> Result { - /*let m: HashMap> = serde_json::from_slice(data)?; - let mut cfg = TierConfig(m); - cfg.set_defaults(); - Ok(cfg)*/ - todo!(); - } - - pub fn marshal(&self) -> Result, std::io::Error> { - let data = serde_json::to_vec(&self)?; - Ok(data) - } - fn endpoint(&self) -> String { match self.tier_type { TierType::S3 => self.s3.as_ref().expect("err").endpoint.clone(), @@ -198,14 +176,14 @@ impl TierConfig { pub struct TierS3 { pub name: String, pub endpoint: String, - #[serde(rename = "accesskey")] + #[serde(rename = "accessKey")] pub access_key: String, - #[serde(rename = "secretkey")] + #[serde(rename = "secretKey")] pub secret_key: String, pub bucket: String, pub prefix: String, pub region: String, - #[serde(rename = "storageclass")] + #[serde(rename = "storageClass")] pub storage_class: String, #[serde(skip)] pub aws_role: bool, @@ -220,6 +198,7 @@ pub struct TierS3 { } impl TierS3 { + #[allow(dead_code)] fn new(name: &str, access_key: &str, secret_key: &str, bucket: &str, options: Vec) -> Result where F: Fn(TierS3) -> Box> + Send + Sync + 'static, @@ -258,14 +237,14 @@ impl TierS3 { pub struct TierRustFS { pub name: String, pub endpoint: String, - #[serde(rename = "accesskey")] + #[serde(rename = "accessKey")] pub access_key: String, - #[serde(rename = "secretkey")] + #[serde(rename = "secretKey")] pub secret_key: String, pub bucket: String, pub prefix: String, pub region: String, - #[serde(rename = "storageclass")] + #[serde(rename = "storageClass")] pub storage_class: String, } @@ -274,9 +253,9 @@ pub struct TierRustFS { pub struct TierMinIO { pub name: String, pub endpoint: String, - #[serde(rename = "accesskey")] + #[serde(rename = "accessKey")] pub access_key: String, - #[serde(rename = "secretkey")] + #[serde(rename = "secretKey")] pub secret_key: String, pub bucket: String, pub prefix: String, @@ -284,6 +263,7 @@ pub struct TierMinIO { } impl TierMinIO { + #[allow(dead_code)] fn new( name: &str, endpoint: &str, diff --git a/ecstore/src/tier/tier_gen.rs b/ecstore/src/tier/tier_gen.rs index 4f2fe120..e7e8cd63 100644 --- a/ecstore/src/tier/tier_gen.rs +++ b/ecstore/src/tier/tier_gen.rs @@ -1,29 +1,7 @@ -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_must_use)] -#![allow(clippy::all)] - use crate::tier::tier::TierConfigMgr; +#[allow(dead_code)] impl TierConfigMgr { - fn decode_msg(/*dc *msgp.Reader*/) -> Result<(), std::io::Error> { - todo!(); - } - - fn encode_msg(/*en *msgp.Writer*/) -> Result<(), std::io::Error> { - todo!(); - } - - pub fn marshal_msg(&self, b: &[u8]) -> Result, std::io::Error> { - todo!(); - } - - pub fn unmarshal_msg(buf: &[u8]) -> Result { - todo!(); - } - pub fn msg_size(&self) -> usize { 100 } diff --git a/ecstore/src/tier/tier_handlers.rs b/ecstore/src/tier/tier_handlers.rs index 7ebe46a6..65136f52 100644 --- a/ecstore/src/tier/tier_handlers.rs +++ b/ecstore/src/tier/tier_handlers.rs @@ -1,50 +1,53 @@ use crate::client::admin_handler_utils::AdminError; use http::status::StatusCode; +use lazy_static::lazy_static; -pub const ERR_TIER_ALREADY_EXISTS: AdminError = AdminError { - code: "XRustFSAdminTierAlreadyExists", - message: "Specified remote tier already exists", - status_code: StatusCode::CONFLICT, -}; +lazy_static! { + pub static ref ERR_TIER_ALREADY_EXISTS: AdminError = AdminError { + code: "XRustFSAdminTierAlreadyExists".to_string(), + message: "Specified remote tier already exists".to_string(), + status_code: StatusCode::CONFLICT, + }; -pub const ERR_TIER_NOT_FOUND: AdminError = AdminError { - code: "XRustFSAdminTierNotFound", - message: "Specified remote tier was not found", - status_code: StatusCode::NOT_FOUND, -}; + pub static ref ERR_TIER_NOT_FOUND: AdminError = AdminError { + code: "XRustFSAdminTierNotFound".to_string(), + message: "Specified remote tier was not found".to_string(), + status_code: StatusCode::NOT_FOUND, + }; -pub const ERR_TIER_NAME_NOT_UPPERCASE: AdminError = AdminError { - code: "XRustFSAdminTierNameNotUpperCase", - message: "Tier name must be in uppercase", - status_code: StatusCode::BAD_REQUEST, -}; + pub static ref ERR_TIER_NAME_NOT_UPPERCASE: AdminError = AdminError { + code: "XRustFSAdminTierNameNotUpperCase".to_string(), + message: "Tier name must be in uppercase".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; -pub const ERR_TIER_BUCKET_NOT_FOUND: AdminError = AdminError { - code: "XRustFSAdminTierBucketNotFound", - message: "Remote tier bucket not found", - status_code: StatusCode::BAD_REQUEST, -}; + pub static ref ERR_TIER_BUCKET_NOT_FOUND: AdminError = AdminError { + code: "XRustFSAdminTierBucketNotFound".to_string(), + message: "Remote tier bucket not found".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; -pub const ERR_TIER_INVALID_CREDENTIALS: AdminError = AdminError { - code: "XRustFSAdminTierInvalidCredentials", - message: "Invalid remote tier credentials", - status_code: StatusCode::BAD_REQUEST, -}; + pub static ref ERR_TIER_INVALID_CREDENTIALS: AdminError = AdminError { + code: "XRustFSAdminTierInvalidCredentials".to_string(), + message: "Invalid remote tier credentials".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; -pub const ERR_TIER_RESERVED_NAME: AdminError = AdminError { - code: "XRustFSAdminTierReserved", - message: "Cannot use reserved tier name", - status_code: StatusCode::BAD_REQUEST, -}; + pub static ref ERR_TIER_RESERVED_NAME: AdminError = AdminError { + code: "XRustFSAdminTierReserved".to_string(), + message: "Cannot use reserved tier name".to_string(), + status_code: StatusCode::BAD_REQUEST, + }; -pub const ERR_TIER_PERM_ERR: AdminError = AdminError { - code: "TierPermErr", - message: "Tier Perm Err", - status_code: StatusCode::OK, -}; + pub static ref ERR_TIER_PERM_ERR: AdminError = AdminError { + code: "TierPermErr".to_string(), + message: "Tier Perm Err".to_string(), + status_code: StatusCode::OK, + }; -pub const ERR_TIER_CONNECT_ERR: AdminError = AdminError { - code: "TierConnectErr", - message: "Tier Connect Err", - status_code: StatusCode::OK, -}; + pub static ref ERR_TIER_CONNECT_ERR: AdminError = AdminError { + code: "TierConnectErr".to_string(), + message: "Tier Connect Err".to_string(), + status_code: StatusCode::OK, + }; +} \ No newline at end of file diff --git a/ecstore/src/tier/warm_backend.rs b/ecstore/src/tier/warm_backend.rs index ecf79d52..8cbed67b 100644 --- a/ecstore/src/tier/warm_backend.rs +++ b/ecstore/src/tier/warm_backend.rs @@ -7,14 +7,14 @@ use bytes::Bytes; use std::collections::HashMap; - +use http::StatusCode; use crate::client::{ admin_handler_utils::AdminError, transition_api::{ReadCloser, ReaderImpl}, }; use crate::error::is_err_bucket_not_found; use crate::tier::{ - tier::{ERR_TIER_INVALID_CONFIG, ERR_TIER_TYPE_UNSUPPORTED}, + tier::ERR_TIER_TYPE_UNSUPPORTED, tier_config::{TierConfig, TierType}, tier_handlers::{ERR_TIER_BUCKET_NOT_FOUND, ERR_TIER_PERM_ERR}, warm_backend_minio::WarmBackendMinIO, @@ -54,7 +54,7 @@ pub async fn check_warm_backend(w: Option<&WarmBackendImpl>) -> Result<(), Admin .put(PROBE_OBJECT, ReaderImpl::Body(Bytes::from("RustFS".as_bytes().to_vec())), 5) .await; if let Err(err) = remote_version_id { - return Err(ERR_TIER_PERM_ERR); + return Err(ERR_TIER_PERM_ERR.clone()); } let r = w.get(PROBE_OBJECT, "", WarmBackendGetOpts::default()).await; @@ -67,11 +67,11 @@ pub async fn check_warm_backend(w: Option<&WarmBackendImpl>) -> Result<(), Admin return Err(ERR_TIER_MISSING_CREDENTIALS); }*/ //else { - return Err(ERR_TIER_PERM_ERR); + return Err(ERR_TIER_PERM_ERR.clone()); //} } if let Err(err) = w.remove(PROBE_OBJECT, &remote_version_id.expect("err")).await { - return Err(ERR_TIER_PERM_ERR); + return Err(ERR_TIER_PERM_ERR.clone()); }; Ok(()) } @@ -82,8 +82,12 @@ pub async fn new_warm_backend(tier: &TierConfig, probe: bool) -> Result { let dd = WarmBackendS3::new(tier.s3.as_ref().expect("err"), &tier.name).await; if let Err(err) = dd { - info!("{}", err); - return Err(ERR_TIER_INVALID_CONFIG); + warn!("{}", err); + return Err(AdminError { + code: "XRustFSAdminTierInvalidConfig".to_string(), + message: format!("Unable to setup remote tier, check tier configuration: {}", err.to_string()), + status_code: StatusCode::BAD_REQUEST, + }); } d = Some(Box::new(dd.expect("err"))); } @@ -91,7 +95,11 @@ pub async fn new_warm_backend(tier: &TierConfig, probe: bool) -> Result Result { - return Err(ERR_TIER_TYPE_UNSUPPORTED); + return Err(ERR_TIER_TYPE_UNSUPPORTED.clone()); } } diff --git a/ecstore/src/tier/warm_backend_minio.rs b/ecstore/src/tier/warm_backend_minio.rs index 7b337332..d54ae404 100644 --- a/ecstore/src/tier/warm_backend_minio.rs +++ b/ecstore/src/tier/warm_backend_minio.rs @@ -23,7 +23,7 @@ use tracing::warn; const MAX_MULTIPART_PUT_OBJECT_SIZE: i64 = 1024 * 1024 * 1024 * 1024 * 5; const MAX_PARTS_COUNT: i64 = 10000; -const MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5; +const _MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5; const MIN_PART_SIZE: i64 = 1024 * 1024 * 128; pub struct WarmBackendMinIO(WarmBackendS3); diff --git a/ecstore/src/tier/warm_backend_rustfs.rs b/ecstore/src/tier/warm_backend_rustfs.rs index 7e79cbe4..f2acc23d 100644 --- a/ecstore/src/tier/warm_backend_rustfs.rs +++ b/ecstore/src/tier/warm_backend_rustfs.rs @@ -22,7 +22,7 @@ use crate::tier::{ const MAX_MULTIPART_PUT_OBJECT_SIZE: i64 = 1024 * 1024 * 1024 * 1024 * 5; const MAX_PARTS_COUNT: i64 = 10000; -const MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5; +const _MAX_PART_SIZE: i64 = 1024 * 1024 * 1024 * 5; const MIN_PART_SIZE: i64 = 1024 * 1024 * 128; pub struct WarmBackendRustFS(WarmBackendS3); diff --git a/ecstore/src/tier/warm_backend_s3.rs b/ecstore/src/tier/warm_backend_s3.rs index 8275fc80..a7684573 100644 --- a/ecstore/src/tier/warm_backend_s3.rs +++ b/ecstore/src/tier/warm_backend_s3.rs @@ -95,15 +95,6 @@ impl WarmBackendS3 { }) } - fn to_object_err(&self, err: ErrorResponse, params: Vec<&str>) -> std::io::Error { - let mut object = ""; - if params.len() >= 1 { - object = params.first().cloned().unwrap_or_default(); - } - - error_resp_to_object_err(err, vec![&self.bucket, &self.get_dest(object)]) - } - pub fn get_dest(&self, object: &str) -> String { let mut dest_obj = object.to_string(); if self.prefix != "" { diff --git a/reader/Cargo.toml b/reader/Cargo.toml deleted file mode 100644 index 6ad7c7ac..00000000 --- a/reader/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "reader" -edition.workspace = true -license.workspace = true -repository.workspace = true -rust-version.workspace = true -version.workspace = true - -[lints] -workspace = true - -[dependencies] -tracing.workspace = true -s3s.workspace = true -thiserror.workspace = true -bytes.workspace = true -pin-project-lite.workspace = true -hex-simd = "0.8.0" -base64-simd = "0.8.0" -md-5.workspace = true -sha2 = { version = "0.11.0-pre.4" } -futures.workspace = true -async-trait.workspace = true -common.workspace = true - -[dev-dependencies] -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/reader/src/error.rs b/reader/src/error.rs deleted file mode 100644 index 9c5017ee..00000000 --- a/reader/src/error.rs +++ /dev/null @@ -1,12 +0,0 @@ -#[derive(Debug, thiserror::Error, PartialEq, Eq)] -pub enum ReaderError { - #[error("stream input error {0}")] - StreamInput(String), - // - #[error("etag: expected ETag {0} does not match computed ETag {1}")] - VerifyError(String, String), - #[error("Bad checksum: Want {0} does not match calculated {1}")] - ChecksumMismatch(String, String), - #[error("Bad sha256: Expected {0} does not match calculated {1}")] - SHA256Mismatch(String, String), -} diff --git a/reader/src/lib.rs b/reader/src/lib.rs deleted file mode 100644 index 433caaa2..00000000 --- a/reader/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod error; -pub mod hasher; -pub mod reader; - -pub fn hex(data: impl AsRef<[u8]>) -> String { - hex_simd::encode_to_string(data, hex_simd::AsciiCase::Lower) -} diff --git a/reader/src/reader.rs b/reader/src/reader.rs deleted file mode 100644 index 1846aa07..00000000 --- a/reader/src/reader.rs +++ /dev/null @@ -1,549 +0,0 @@ -#![allow(unused_imports)] - -use bytes::Bytes; -use s3s::StdError; -use std::any::Any; -use std::io::Read; -use std::{collections::VecDeque, io::Cursor}; - -use std::pin::Pin; -use std::task::Poll; - -use crate::{ - error::ReaderError, - hasher::{HashType, Uuid}, -}; - -// use futures::stream::Stream; -use super::hasher::{Hasher, MD5, Sha256}; -use futures::Stream; -use std::io::{Error, Result}; - -pin_project_lite::pin_project! { - #[derive(Default)] - pub struct EtagReader { - #[pin] - inner: S, - md5: HashType, - checksum:Option, - bytes_read:usize, - } -} - -impl EtagReader { - pub fn new(inner: S, etag: Option, force_md5: Option) -> Self { - let md5 = { - if let Some(m) = force_md5 { - HashType::Uuid(Uuid::new(m)) - } else { - HashType::Md5(MD5::new()) - } - }; - Self { - inner, - md5, - checksum: etag, - bytes_read: 0, - } - } - - pub fn etag(&mut self) -> String { - self.md5.sum() - } -} - -impl Stream for EtagReader -where - S: Stream>, -{ - type Item = std::result::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let this = self.project(); - let poll = this.inner.poll_next(cx); - - if let Poll::Ready(ref res) = poll { - match res { - Some(Ok(bytes)) => { - *this.bytes_read += bytes.len(); - this.md5.write(bytes); - } - Some(Err(err)) => { - return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string()))))); - } - None => { - if let Some(etag) = this.checksum { - let got = this.md5.sum(); - if got.as_str() != etag.as_str() { - return Poll::Ready(Some(Err(Box::new(ReaderError::VerifyError(etag.to_owned(), got))))); - } - } - } - } - } - - poll - } -} - -pin_project_lite::pin_project! { - #[derive(Default)] - pub struct HashReader { - #[pin] - inner: S, - sha256: Option, - md5: Option, - md5_hex:Option, - sha256_hex:Option, - size:usize, - actual_size: usize, - bytes_read:usize, - } -} - -impl HashReader { - pub fn new(inner: S, size: usize, md5_hex: Option, sha256_hex: Option, actual_size: usize) -> Self { - let md5 = { if md5_hex.is_some() { Some(MD5::new()) } else { None } }; - let sha256 = { if sha256_hex.is_some() { Some(Sha256::new()) } else { None } }; - Self { - inner, - size, - actual_size, - md5_hex, - sha256_hex, - bytes_read: 0, - md5, - sha256, - } - } -} - -impl Stream for HashReader -where - S: Stream>, -{ - type Item = std::result::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let this = self.project(); - let poll = this.inner.poll_next(cx); - - if let Poll::Ready(ref res) = poll { - match res { - Some(Ok(bytes)) => { - *this.bytes_read += bytes.len(); - if let Some(sha) = this.sha256 { - sha.write(bytes); - } - - if let Some(md5) = this.md5 { - md5.write(bytes); - } - } - Some(Err(err)) => { - return Poll::Ready(Some(Err(Box::new(ReaderError::StreamInput(err.to_string()))))); - } - None => { - if let Some(hash) = this.sha256 { - if let Some(hex) = this.sha256_hex { - let got = hash.sum(); - let src = hex.as_str(); - if src != got.as_str() { - println!("sha256 err src:{src},got:{got}"); - return Poll::Ready(Some(Err(Box::new(ReaderError::SHA256Mismatch(src.to_string(), got))))); - } - } - } - - if let Some(hash) = this.md5 { - if let Some(hex) = this.md5_hex { - let got = hash.sum(); - let src = hex.as_str(); - if src != got.as_str() { - // TODO: ERR - println!("md5 err src:{src},got:{got}"); - return Poll::Ready(Some(Err(Box::new(ReaderError::ChecksumMismatch(src.to_string(), got))))); - } - } - } - } - } - } - - // println!("poll {:?}", poll); - - poll - } -} - -#[async_trait::async_trait] -pub trait Reader { - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result; - async fn seek(&mut self, offset: usize) -> Result<()>; - async fn read_exact(&mut self, buf: &mut [u8]) -> Result; - async fn read_all(&mut self) -> Result> { - let data = Vec::new(); - - Ok(data) - } - fn as_any(&self) -> &dyn Any; -} - -#[derive(Debug)] -pub struct BufferReader { - pub inner: Cursor>, - pos: usize, -} - -impl BufferReader { - pub fn new(inner: Vec) -> Self { - Self { - inner: Cursor::new(inner), - pos: 0, - } - } -} - -#[async_trait::async_trait] -impl Reader for BufferReader { - #[tracing::instrument(level = "debug", skip(self, buf))] - async fn read_at(&mut self, offset: usize, buf: &mut [u8]) -> Result { - self.seek(offset).await?; - self.read_exact(buf).await - } - #[tracing::instrument(level = "debug", skip(self))] - async fn seek(&mut self, offset: usize) -> Result<()> { - if self.pos != offset { - self.inner.set_position(offset as u64); - } - - Ok(()) - } - #[tracing::instrument(level = "debug", skip(self))] - async fn read_exact(&mut self, buf: &mut [u8]) -> Result { - let _bytes_read = self.inner.read_exact(buf)?; - self.pos += buf.len(); - //Ok(bytes_read) - Ok(0) - } - - async fn read_all(&mut self) -> Result> { - let mut data = Vec::new(); - self.inner.read_to_end(&mut data)?; - - Ok(data) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -pin_project_lite::pin_project! { - pub struct ChunkedStream { - #[pin] - inner: S, - chuck_size: usize, - streams: VecDeque, - remaining:Vec, - } -} - -impl ChunkedStream { - pub fn new(inner: S, chuck_size: usize) -> Self { - Self { - inner, - chuck_size, - streams: VecDeque::new(), - remaining: Vec::new(), - } - } -} - -impl Stream for ChunkedStream -where - S: Stream> + Send + Sync, - // E: std::error::Error + Send + Sync, -{ - type Item = std::result::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let (items, op_items) = self.inner.size_hint(); - let this = self.project(); - - if let Some(b) = this.streams.pop_front() { - return Poll::Ready(Some(Ok(b))); - } - - let poll = this.inner.poll_next(cx); - - match poll { - Poll::Ready(res_op) => match res_op { - Some(res) => match res { - Ok(bytes) => { - let chuck_size = *this.chuck_size; - let mut bytes = bytes; - - // println!("get len {}", bytes.len()); - // 如果有剩余 - if !this.remaining.is_empty() { - let need_size = chuck_size - this.remaining.len(); - // 传入的数据大小需要补齐的大小,使用传入数据补齐 - if bytes.len() >= need_size { - let add_bytes = bytes.split_to(need_size); - this.remaining.extend_from_slice(&add_bytes); - this.streams.push_back(Bytes::from(this.remaining.clone())); - this.remaining.clear(); - } else { - // 不够,直接追加 - let need_size = bytes.len(); - let add_bytes = bytes.split_to(need_size); - this.remaining.extend_from_slice(&add_bytes); - } - } - - loop { - if bytes.len() < chuck_size { - break; - } - let chuck = bytes.split_to(chuck_size); - this.streams.push_back(chuck); - } - - if !bytes.is_empty() { - this.remaining.extend_from_slice(&bytes); - } - - if let Some(b) = this.streams.pop_front() { - return Poll::Ready(Some(Ok(b))); - } - - if items > 0 || op_items.is_some() { - return Poll::Pending; - } - - if !this.remaining.is_empty() { - let b = this.remaining.clone(); - this.remaining.clear(); - return Poll::Ready(Some(Ok(Bytes::from(b)))); - } - Poll::Ready(None) - } - Err(err) => Poll::Ready(Some(Err(err))), - }, - None => { - // println!("get empty"); - if let Some(b) = this.streams.pop_front() { - return Poll::Ready(Some(Ok(b))); - } - if !this.remaining.is_empty() { - let b = this.remaining.clone(); - this.remaining.clear(); - return Poll::Ready(Some(Ok(Bytes::from(b)))); - } - Poll::Ready(None) - } - }, - Poll::Pending => { - // println!("get Pending"); - Poll::Pending - } - } - - // if let Poll::Ready(Some(res)) = poll { - // warn!("poll res ..."); - // match res { - // Ok(bytes) => { - // let chuck_size = *this.chuck_size; - // let mut bytes = bytes; - // if this.remaining.len() > 0 { - // let need_size = chuck_size - this.remaining.len(); - // let add_bytes = bytes.split_to(need_size); - // this.remaining.extend_from_slice(&add_bytes); - // warn!("poll push_back remaining ...1"); - // this.streams.push_back(Bytes::from(this.remaining.clone())); - // this.remaining.clear(); - // } - - // loop { - // if bytes.len() < chuck_size { - // break; - // } - // let chuck = bytes.split_to(chuck_size); - // warn!("poll push_back ...1"); - // this.streams.push_back(chuck); - // } - - // warn!("poll remaining extend_from_slice...1"); - // this.remaining.extend_from_slice(&bytes); - // } - // Err(err) => return Poll::Ready(Some(Err(err))), - // } - // } - - // if let Some(b) = this.streams.pop_front() { - // warn!("poll pop_front ..."); - // return Poll::Ready(Some(Ok(b))); - // } - - // if this.remaining.len() > 0 { - // let b = this.remaining.clone(); - // this.remaining.clear(); - - // warn!("poll remaining ...1"); - // return Poll::Ready(Some(Ok(Bytes::from(b)))); - // } - // Poll::Pending - } - - fn size_hint(&self) -> (usize, Option) { - let mut items = self.streams.len(); - if !self.remaining.is_empty() { - items += 1; - } - (items, Some(items)) - } -} - -#[cfg(test)] -mod test { - - use super::*; - use futures::StreamExt; - - #[tokio::test] - async fn test_etag_reader() { - let data1 = vec![1u8; 60]; // 65536 - let data2 = vec![0u8; 32]; // 65536 - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; - - let mut stream = futures::stream::iter(chunk_results); - - let mut hash_reader = EtagReader::new(&mut stream, None, None); - - // let chunk_size = 8; - - // let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size); - - loop { - match hash_reader.next().await { - Some(res) => match res { - Ok(bytes) => { - println!("bytes: {}, {:?}", bytes.len(), bytes); - } - Err(err) => { - println!("err:{err:?}"); - break; - } - }, - None => { - println!("next none"); - break; - } - } - } - - println!("etag:{}", hash_reader.etag()); - - // 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45 - // println!("md5: {:?}", hash_reader.hex()); - } - - #[tokio::test] - async fn test_hash_reader() { - let data1 = vec![1u8; 60]; // 65536 - let data2 = vec![0u8; 32]; // 65536 - let size = data1.len() + data2.len(); - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; - - let mut stream = futures::stream::iter(chunk_results); - - let mut hash_reader = HashReader::new( - &mut stream, - size, - Some("d94c485610a7a00a574df55e45d3cc0c".to_string()), - Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()), - 0, - ); - - // let chunk_size = 8; - - // let mut chunked_stream = ChunkStream::new(&mut hash_reader, chunk_size); - - loop { - match hash_reader.next().await { - Some(res) => match res { - Ok(bytes) => { - println!("bytes: {}, {:?}", bytes.len(), bytes); - } - Err(err) => { - println!("err:{err:?}"); - break; - } - }, - None => { - println!("next none"); - break; - } - } - } - - // BUG: borrow of moved value: `md5_stream` - - // 9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45 - // println!("md5: {:?}", hash_reader.hex()); - } - - #[tokio::test] - async fn test_chunked_stream() { - let data1 = vec![1u8; 60]; // 65536 - let data2 = vec![0u8; 33]; // 65536 - let data3 = vec![4u8; 5]; // 65536 - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - let chunk3 = Bytes::from(data3); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2), Ok(chunk3)]; - - let mut stream = futures::stream::iter(chunk_results); - // let mut hash_reader = HashReader::new( - // &mut stream, - // size, - // Some("d94c485610a7a00a574df55e45d3cc0c".to_string()), - // Some("9a7dfa2fcd7b69c89a30cfd3a9be11ab58cb6172628bd7e967fad1e187456d45".to_string()), - // 0, - // ); - - let chunk_size = 8; - - let mut etag_reader = EtagReader::new(&mut stream, None, None); - - let mut chunked_stream = ChunkedStream::new(&mut etag_reader, chunk_size); - - loop { - match chunked_stream.next().await { - Some(res) => match res { - Ok(bytes) => { - println!("bytes: {}, {:?}", bytes.len(), bytes); - } - Err(err) => { - println!("err:{err:?}"); - break; - } - }, - None => { - println!("next none"); - break; - } - } - } - - println!("etag:{}", etag_reader.etag()); - } -} diff --git a/rustfs/src/admin/handlers/tier.rs b/rustfs/src/admin/handlers/tier.rs index 2c4ba759..f46037fc 100644 --- a/rustfs/src/admin/handlers/tier.rs +++ b/rustfs/src/admin/handlers/tier.rs @@ -1,5 +1,6 @@ #![allow(unused_variables, unused_mut, unused_must_use)] +use time::OffsetDateTime; use http::{HeaderMap, StatusCode}; //use iam::get_global_action_cred; use matchit::Params; @@ -55,12 +56,11 @@ pub struct AddTierQuery { pub struct AddTier {} #[async_trait::async_trait] impl Operation for AddTier { - #[tracing::instrument(level = "debug", skip(self))] async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { let query = { if let Some(query) = req.uri.query() { let input: AddTierQuery = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; input } else { AddTierQuery::default() @@ -119,39 +119,38 @@ impl Operation for AddTier { let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; //tier_config_mgr.reload(api); match tier_config_mgr.add(args, force).await { - Err(ERR_TIER_ALREADY_EXISTS) => { - return Err(S3Error::with_message( - S3ErrorCode::Custom("TierNameAlreadyExist".into()), - "tier name already exists!", - )); - } - Err(ERR_TIER_NAME_NOT_UPPERCASE) => { - return Err(S3Error::with_message( - S3ErrorCode::Custom("TierNameNotUppercase".into()), - "tier name not uppercase!", - )); - } - Err(ERR_TIER_BACKEND_IN_USE) => { - return Err(S3Error::with_message( - S3ErrorCode::Custom("TierNameBackendInUse!".into()), - "tier name backend in use!", - )); - } - Err(ERR_TIER_CONNECT_ERR) => { - return Err(S3Error::with_message( - S3ErrorCode::Custom("TierConnectError".into()), - "tier connect error!", - )); - } - Err(ERR_TIER_INVALID_CREDENTIALS) => { - return Err(S3Error::with_message( - S3ErrorCode::Custom(ERR_TIER_INVALID_CREDENTIALS.code.into()), - ERR_TIER_INVALID_CREDENTIALS.message, - )); - } - Err(e) => { - warn!("tier_config_mgr add failed, e: {:?}", e); - return Err(S3Error::with_message(S3ErrorCode::Custom("TierAddFailed".into()), "tier add failed")); + Err(err) => { + if err.code == ERR_TIER_ALREADY_EXISTS.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierNameAlreadyExist".into()), + "tier name already exists!", + )); + } else if err.code == ERR_TIER_NAME_NOT_UPPERCASE.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierNameNotUppercase".into()), + "tier name not uppercase!", + )); + } else if err.code == ERR_TIER_BACKEND_IN_USE.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierNameBackendInUse!".into()), + "tier name backend in use!", + )); + } else if err.code == ERR_TIER_CONNECT_ERR.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierConnectError".into()), + "tier connect error!", + )); + } else if err.code == ERR_TIER_INVALID_CREDENTIALS.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom(err.code.clone().into()), + err.message.clone(), + )); + } else { + warn!("tier_config_mgr add failed, e: {:?}", err); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierAddFailed".into()), format!("tier add failed. {}", err.to_string()) + )); + } } Ok(_) => (), } @@ -159,7 +158,6 @@ impl Operation for AddTier { warn!("tier_config_mgr save failed, e: {:?}", e); return Err(S3Error::with_message(S3ErrorCode::Custom("TierAddFailed".into()), "tier save failed")); } - //globalNotificationSys.LoadTransitionTierConfig(ctx); let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); @@ -175,7 +173,7 @@ impl Operation for EditTier { let query = { if let Some(query) = req.uri.query() { let input: AddTierQuery = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed1"))?; + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; input } else { AddTierQuery::default() @@ -189,9 +187,6 @@ impl Operation for EditTier { let (cred, _owner) = check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; - //{"accesskey":"gggggg","secretkey":"jjjjjjj"} - //{"detailedMessage":"failed to perform PUT: The Access Key Id you provided does not exist in our records.","message":"an error occurred, please try again"} - //200 OK let mut input = req.input; let body = match input.store_all_unlimited().await { Ok(b) => b, @@ -211,26 +206,31 @@ impl Operation for EditTier { let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; //tier_config_mgr.reload(api); match tier_config_mgr.edit(&tier_name, creds).await { - Err(ERR_TIER_NOT_FOUND) => { - return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found!")); - } - Err(ERR_TIER_MISSING_CREDENTIALS) => { - return Err(S3Error::with_message( - S3ErrorCode::Custom("TierMissingCredentials".into()), - "tier missing credentials!", - )); - } - Err(e) => { - warn!("tier_config_mgr edit failed, e: {:?}", e); - return Err(S3Error::with_message(S3ErrorCode::Custom("TierEditFailed".into()), "tier edit failed")); + Err(err) => { + if err.code == ERR_TIER_NOT_FOUND.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierNotFound".into()), "tier not found!" + )); + } else if err.code == ERR_TIER_MISSING_CREDENTIALS.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierMissingCredentials".into()), + "tier missing credentials!", + )); + } else { + warn!("tier_config_mgr edit failed, e: {:?}", err); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierEditFailed".into()), format!("tier edit failed. {}", err.to_string()) + )); + } } Ok(_) => (), } if let Err(e) = tier_config_mgr.save().await { warn!("tier_config_mgr save failed, e: {:?}", e); - return Err(S3Error::with_message(S3ErrorCode::Custom("TierEditFailed".into()), "tier save failed")); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierEditFailed".into()), "tier save failed" + )); } - //globalNotificationSys.LoadTransitionTierConfig(ctx); let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); @@ -252,14 +252,13 @@ impl Operation for ListTiers { let query = { if let Some(query) = req.uri.query() { let input: BucketQuery = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?; + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; input } else { BucketQuery::default() } }; - //{"items":[{"minio":{"accesskey":"minioadmin","bucket":"mblock2","endpoint":"http://192.168.1.11:9020","name":"COLDTIER","objects":"0","prefix":"mypre/","secretkey":"REDACTED","usage":"0 B","versions":"0"},"status":true,"type":"minio"}]} let mut tier_config_mgr = GLOBAL_TierConfigMgr.read().await; let tiers = tier_config_mgr.list_tiers(); @@ -277,12 +276,10 @@ pub struct RemoveTier {} #[async_trait::async_trait] impl Operation for RemoveTier { async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { - warn!("handle RemoveTier"); - let query = { if let Some(query) = req.uri.query() { let input: AddTierQuery = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?; + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; input } else { AddTierQuery::default() @@ -313,26 +310,31 @@ impl Operation for RemoveTier { let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; //tier_config_mgr.reload(api); match tier_config_mgr.remove(&tier_name, force).await { - Err(ERR_TIER_NOT_FOUND) => { - return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found.")); - } - Err(ERR_TIER_BACKEND_NOT_EMPTY) => { - return Err(S3Error::with_message(S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used.")); - } - Err(e) => { - warn!("tier_config_mgr remove failed, e: {:?}", e); - return Err(S3Error::with_message( - S3ErrorCode::Custom("TierRemoveFailed".into()), - "tier remove failed", - )); + Err(err) => { + if err.code == ERR_TIER_NOT_FOUND.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierNotFound".into()), "tier not found." + )); + } else if err.code == ERR_TIER_BACKEND_NOT_EMPTY.code { + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used." + )); + } else { + warn!("tier_config_mgr remove failed, e: {:?}", err); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierRemoveFailed".into()), + format!("tier remove failed. {}", err.to_string()), + )); + } } Ok(_) => (), } if let Err(e) = tier_config_mgr.save().await { warn!("tier_config_mgr save failed, e: {:?}", e); - return Err(S3Error::with_message(S3ErrorCode::Custom("TierRemoveFailed".into()), "tier save failed")); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierRemoveFailed".into()), "tier save failed" + )); } - //globalNotificationSys.LoadTransitionTierConfig(ctx); let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); @@ -345,12 +347,10 @@ pub struct VerifyTier {} #[async_trait::async_trait] impl Operation for VerifyTier { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle RemoveTier"); - let query = { if let Some(query) = req.uri.query() { let input: AddTierQuery = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?; + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; input } else { AddTierQuery::default() @@ -381,19 +381,17 @@ pub struct GetTierInfo {} #[async_trait::async_trait] impl Operation for GetTierInfo { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - warn!("handle GetTierInfo"); - let query = { if let Some(query) = req.uri.query() { let input: AddTierQuery = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get body failed"))?; + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; input } else { AddTierQuery::default() } }; - let mut tier_config_mgr = GLOBAL_TierConfigMgr.read().await; + let tier_config_mgr = GLOBAL_TierConfigMgr.read().await; let info = tier_config_mgr.get(&query.tier.unwrap()); let data = serde_json::to_vec(&info) @@ -405,3 +403,71 @@ impl Operation for GetTierInfo { Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) } } + +#[derive(Debug, Deserialize, Default)] +pub struct ClearTierQuery { + pub rand: Option, + pub force: String, +} + +pub struct ClearTier {} +#[async_trait::async_trait] +impl Operation for ClearTier { + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + let query = { + if let Some(query) = req.uri.query() { + let input: ClearTierQuery = + from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; + input + } else { + ClearTierQuery::default() + } + }; + + //let Some(input_cred) = req.credentials else { + // return Err(s3_error!(InvalidRequest, "get cred failed")); + //}; + + //let (cred, _owner) = + // check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + + let mut force: bool = false; + let force_str = query.force; + if force_str != "" { + force = force_str.parse().unwrap(); + } + + let t = OffsetDateTime::now_utc(); + let mut rand = "AGD1R25GI3I1GJGUGJFD7FBS4DFAASDF".to_string(); + rand.insert_str(3, &t.day().to_string()); + rand.insert_str(17, &t.month().to_string()); + rand.insert_str(23, &t.year().to_string()); + warn!("tier_config_mgr rand: {}", rand); + if query.rand != Some(rand) { + return Err(s3_error!(InvalidRequest, "get rand failed")); + }; + + let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; + //tier_config_mgr.reload(api); + match tier_config_mgr.clear_tier(force).await { + Err(err) => { + warn!("tier_config_mgr clear failed, e: {:?}", err); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierClearFailed".into()), format!("tier clear failed. {}", err.to_string()) + )); + } + Ok(_) => (), + } + if let Err(e) = tier_config_mgr.save().await { + warn!("tier_config_mgr save failed, e: {:?}", e); + return Err(S3Error::with_message( + S3ErrorCode::Custom("TierEditFailed".into()), "tier save failed" + )); + } + + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + } +} diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 85df4f29..e12d8165 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -343,6 +343,11 @@ fn register_user_route(r: &mut S3Router) -> std::io::Result<()> format!("{}{}", ADMIN_PREFIX, "/v3/tier/{tiername}").as_str(), AdminOperation(&tier::EditTier {}), )?; + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/tier/clear").as_str(), + AdminOperation(&tier::ClearTier {}), + )?; Ok(()) }